execution_date in airflow: need to access as a variable

The BashOperator‘s bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it. Using the following as your BashOperator bash_command string: # pass in the first of the current month some_command.sh {{ execution_date.replace(day=1) }} … Read more

How to get last two successful execution dates of Airflow job?

You can leverage SQLAlchemy magic for retrieving execution_dates against last ‘n’ successfull runs from pendulum import Pendulum from typing import List, Dict, Any, Optional from airflow.utils.state import State from airflow.settings import Session from airflow.models.taskinstance import TaskInstance def last_execution_date( dag_id: str, task_id: str, n: int, session: Optional[Session] = None ) -> List[Pendulum]: “”” This function is … Read more

Apache airflow macro to get last dag run execution time

Yes, you can define your own custom macro for this as follows: # custom macro function def get_last_dag_run(dag): last_dag_run = dag.get_last_dagrun() if last_dag_run is None: return “no prev run” else: return last_dag_run.execution_date.strftime(“%Y-%m-%d”) # add macro in user_defined_macros in dag definition dag = DAG(dag_id=”my_test_dag”, schedule_interval=”@daily”, user_defined_macros={ ‘last_dag_run_execution_date’: get_last_dag_run } ) # example of using it in … Read more

Make custom Airflow macros expand other macros

Here are some solutions: 1. Override BashOperator to add some values to the context class NextExecutionDateAwareBashOperator(BashOperator): def render_template(self, attr, content, context): dag = context[‘dag’] execution_date = context[‘execution_date’] context[‘next_execution_date’] = dag.following_schedule(execution_date) return super().render_templates(attr, content, context) # or in python 2: # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context) The good part with this approach: you can capture some … Read more

Why is it recommended against using a dynamic start_date in Airflow?

First run would be at start_date+schedule_interval. It doesn’t run dag on start_date, it always runs on start_date+schedule_interval. As they mentioned in document if you give start_date dynamic for e.g. datetime.now() and give some schedule_interval(1 hour), it will never execute that run as now() moves along with time and datetime.now()+ 1 hour is not possible

For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?

You can pass parameters from the CLI using –conf ‘{“key”:”value”}’ and then use it in the DAG file as “{{ dag_run.conf[“key”] }}” in templated field. CLI: airflow trigger_dag ‘example_dag_conf’ -r ‘run_id’ –conf ‘{“message”:”value”}’ DAG File: args = { ‘start_date’: datetime.utcnow(), ‘owner’: ‘airflow’, } dag = DAG( dag_id=’example_dag_conf’, default_args=args, schedule_interval=None, ) def run_this_func(ds, **kwargs): print(“Remotely received … Read more

Airflow: how to delete a DAG?

Edit 8/27/18 – Airflow 1.10 is now released on PyPI! https://pypi.org/project/apache-airflow/1.10.0/ How to delete a DAG completely We have this feature now in Airflow ≥ 1.10! The PR #2199 (Jira: AIRFLOW-1002) adding DAG removal to Airflow has now been merged which allows fully deleting a DAG’s entries from all of the related tables. The core … Read more