Wiring top-level DAGs together

Taking hints from @Viraj Parekh‘s answer, I was able to make TriggerDagRunOperator work in the intended fashion. I’m hereby posting my (partial) answer; will update as and when things become clear.


How to overcome limitation of parent_id prefix in dag_id of SubDags?

As told @Viraj, there’s no straight way of achieving this. Extending SubDagOperator to remove this check might work but I decided to steer clear of it


How to force TriggerDagRunOperators to await completion of DAG?

  • Looking at the implementation, it becomes clear that the job of TriggerDagRunOperator is just to trigger external DAG; and that’s about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I’m observing is understandable.

  • ExternalTaskSensor is the obvious way out. However while learning basics of Airflow I was relying on manual triggering of DAGs (schedule_interval=None). In such case, ExternalTaskSensor makes it difficult to accurately specify execution_date for the external task (who’s completion is being awaited), failing which the sensor gets stuck.

  • So taking hint from implementation, I made minor adjustment to behaviour of ExternalTaskSensor by awaiting completion of all task_instances of concerned task having

    execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

    This achieves the desired result: external DAGs run one-after-other in sequence.


Is there a workaround for my approach of creating separate files
(for DAGs that differ only in input) for each top-level DAG?

Again going by @Viraj this can be done by assigning DAGs to global scope using globals()[dag_id] = DAG(..)


EDIT-1

Maybe I was referring to incorrect resource (the link above is already dead), but ExternalTaskSensor already includes the params execution_delta & execution_date_fn to easily restrict execution_date(s) for the task being sensed.

Leave a Comment