How to get last two successful execution dates of Airflow job?

You can leverage SQLAlchemy magic for retrieving execution_dates against last ‘n’ successfull runs

from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance

def last_execution_date(
    dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[Pendulum]:
    """
    This function is to queries against airflow table and
    return the most recent execution date
    Args:
        dag_id: dag name
        task_id : task name
        n : number of runs
        session: Session to connect airflow postgres db
    Returns:
        list of execution date of most of recent n runs
    """
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS,
        )
        .order_by(TaskInstance.execution_date.desc())
        .limit(n)
    )
    execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

# Above function can be used as utility function and can be leveraged with provide_session as below:
 
last_success_date_fn = provide_session(last_execution_date) # can use provide session decorator as is.

This snippet is tested end to end and can be used in prod.

I’ve referred to tree() method of views.py for coming up with this script.


Alternatively, you can fire this SQL query to the Airflow’s meta-db to retrieve last n execution dates with successful runs

SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
  AND task_id = 'my_task_id'
  AND state="success"
ORDER BY execution_date DESC
LIMIT n

Leave a Comment