Airflow – How to pass xcom variable into Python function

Upvoted both the question and the answer, but I think that this can be made a little more clear for those users who just want to pass small data objects between PythonOperator tasks in their DAGs. Referencing this question and this XCom example got me to the following solution. Super simple:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval="@once"
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids="push_task")
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

I’m not sure why this works, but it does. A few questions for the community:

  • What’s happening with ti here? How is that built in to **kwargs?
  • Is provide_context=True necessary for both functions?

Any edits to make this answer clearer are very welcome!

Leave a Comment