You can leverage SQLAlchemy
magic for retrieving execution_date
s against last ‘n’ successfull runs
from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
def last_execution_date(
dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[Pendulum]:
"""
This function is to queries against airflow table and
return the most recent execution date
Args:
dag_id: dag name
task_id : task name
n : number of runs
session: Session to connect airflow postgres db
Returns:
list of execution date of most of recent n runs
"""
query_val = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
TaskInstance.state == State.SUCCESS,
)
.order_by(TaskInstance.execution_date.desc())
.limit(n)
)
execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, query_val))
return execution_dates
# Above function can be used as utility function and can be leveraged with provide_session as below:
last_success_date_fn = provide_session(last_execution_date) # can use provide session decorator as is.
This snippet is tested end to end and can be used in prod.
I’ve referred to tree()
method of views.py
for coming up with this script.
Alternatively, you can fire this SQL query to the Airflow’s meta-db to retrieve last n execution dates with successful runs
SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
AND task_id = 'my_task_id'
AND state="success"
ORDER BY execution_date DESC
LIMIT n