Make custom Airflow macros expand other macros

Here are some solutions:

1. Override BashOperator to add some values to the context

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

The good part with this approach: you can capture some repeated code in your custom operator.

The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.

2. Do your computation in a user defined macro

Macros are not necessarily values. They can be functions.

In your dag :

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval="0 21 * * *",
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc…), and make your function result available to render a template.

The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.

3. Call your statement directly in your template

This solution is the simplest (as mentioned by Ardan’s answer), and probably the good one in your case.

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

Ideal for simple calls like this one. And they are some other objects directly available as macros (like task, task_instance, etc…); even some standard modules are available (like macros.time, …).

Leave a Comment