DAG Dependencies in Apache Airflow: The Ultimate Guide

DAG Dependencies in Apache Airflow might be one of the most popular topics. I received countless questions about DAG dependencies, is it possible? How? What are the best practices? and the list goes on. It’s funny because it comes very naturally to wonder how to do that even when we are absolute beginners. Do we like to complexify things by nature? Maybe, but that’s another question 😉 At the end of this article, you will be able to spot when you need to create DAG Dependencies, which method to use, and what are the best practices so you don’t fall into the classic traps. By the way, if you are new to Airflow, check my courses here; you will get at a special discount. Ready? Let’s goooooo!

DAG Dependencies Examples

Why DAG dependencies? As usual, let me give you a very concrete example:

DAG dependencies
DAG Dependencies (wait)

You have three DAGs on the left and one DAG on the right in the example above. The DAGs on the left are doing the exact same steps, extract, transform and store but for three different data sources. Now, once those DAGs are completed, you may want to consolidate this data into one table or derive statistics from it. This is done by the DAG on the right. The only caveat here is that you have to wait for the three DAGs on the left before moving to the “merge” task, and that’s the role of the “check” task.

That’s my friend, DAG dependencies.

The role of the “check” task is to wait for other DAGs to complete before moving forward.

Want another example? Sure!

DAG Dependencies (trigger)

The example above looks very similar to the previous one. But, if you carefully look at the red arrows, there is a major change. The three DAGs on the left are still doing the same stuff that produces metadata (XComs, task instances, etc). The DAG on the right is in charge of cleaning this metadata as soon as one DAG on the left completes. How? With the “trigger” tasks! Notice that each DAG on the left has the “trigger” task at the end.

The role of the “trigger” task is to trigger another DAG when a condition is met.

That’s why the arrows are in the opposite way, unlike in the previous example.

All right, now you have the use cases in mind, let’s see how to implement them!

TriggerDagRunOperator

The TriggerDagRunOperator is the easiest way to implement DAG dependencies in Apache Airflow. It allows you to have a task in a DAG that triggers another DAG in the same Airflow instance.

How does it work? Fairly easy. Let’s take a look at the parameters you can define and what they bring.

trigger_dag_id

The trigger_dag_id parameter defines the DAG ID of the DAG to trigger. For example, if trigger_dag_id=”target_dag”, the DAG with the DAG id “target_dag” will be triggered.

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag', # the dag to trigger
)

Notice that the DAG “target_dag” and the DAG where the TriggerDagRunOperator is implemented must be in the same Airflow environment. If DAG A triggers DAG B, DAG A and DAG B must be in the same Airflow environment.

trigger_dag_id is also a templated parameter. That means you can inject data at run time that comes from Variables, Connections, etc. The example below can be useful if you version your target DAG and don’t want to push a new DAG where the TriggerDagRunOperator is just to change the version. In this case, you would have a variable target_dag_version with the values 1.0, 2.0, etc.

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
)

conf

The conf parameter is very useful as it allows you to pass information/data to the triggered DAG. This parameter expects a JSON dictionary and is templated. Like the trigger_dag_id parameter, you can inject data at runtime. The example below shows you how to pass an XCom created from the DAG where the TriggerDagRunOperator is to the target DAG.

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
conf={
"data": "{{ ti.xcom_pull(task_ids='start') }}"
},
)

execution_date

This one is particularly important. It allows you to define the execution date (=logical_date,=data_interval_end) to the triggered DAG. Usually, you want to keep the same execution date as the DAG where the TriggerDagRunOperator is. Why? Because you want to process data on the same data interval.

For example, if the execution_date of your DAG is 2022-01-01 00:00, the target DAG will have the same execution date so you process the same chunk of data in both DAGs. However, you can set another execution date if you want. Like trigger_dag_id and conf, this parameter is templated. Here is how to add the current execution date of your DAG:

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
conf={
"data": "{{ ti.xcom_pull(task_ids='start') }}"
},
execution_date='{{ ds }}'
)

reset_dag_run

reset_dag_run is a boolean parameter that defines whether or not you want to clear already triggered target DAG Runs. By default, this parameter is False. That means if you trigger your target DAG with the TriggerDagRunOperator on the execution date 2022-01-01 00:00 and for whatever reason you want to retry or rerun it on the same execution date, you can’t. You will receive an exception DagRunAlreadyExists. By default, you cannot run twice the same DAG on the same execution_date unless it is cleared first. That’s exactly what reset_dag_run allows you. When set to true, the TriggerDagRunOperator automatically clears the already triggered DAG Run of the target DAG. If it does not exist, that doesn’t raise any exceptions. By the way, this is absolutely needed if you want to backfill your DAG (rerun past already triggered DAG Runs). My recommendation: Always set it to True.

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
conf={
"data": "{{ ti.xcom_pull(task_ids='start') }}"
},
execution_date='{{ ds }}',
reset_dag_run=True,
)

wait_for_completion

Wow this one, I LOVE IT. It is simple but useful, it allows you to wait for the triggered DAG to complete before moving to the next task in your DAG where the TriggerDAGRunOperator is. Extremely useful if it’s actually not the last task to execute like: TASK A -> TriggerDagRunOperator -> Task B

I usually recommend setting it to True.

    trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
conf={
"data": "{{ ti.xcom_pull(task_ids='start') }}"
},
execution_date='{{ ds }}',
reset_dag_run=True,
wait_for_completion=True,
poke_interval=60
)

In addition to this parameter, don’t hesitate to set the poke_interval parameter that defines the interval of time to check if the triggered DAG is completed or not. By default, every 60 seconds. Be careful as this implies that your TriggerDagRunOperator now behaves as a Sensor which means, a worker slot is taken as long as the target DAG is not completed. If you don’t know what I’m talking about take a look at the article I made here

failed_states

The last parameter that you must fill in is failed_states. Failed_states expects a list of failed states to indicate to the TriggerDagRunOperator that the triggered DAG has failed, otherwise it would wait forever. By default, if you don’t set any value it is defined as [State.FAILED] which is what you usually want.

Implementation of the TriggerDagRunOperator for DAG Dependencies

Here is a full example of the implementation of TriggerDagRunOperator for DAG dependencies.

The DAG below implements the TriggerDAGRunOperator to trigger the DAG target_dag_1_0 as defined in the variable (that you have to create) target_dag_version.

# Trigger DAG
from airflow.models import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.decorators import task
from datetime import date, datetime
with DAG('trigger_dag', schedule_interval='@daily', start_date=datetime(2022, 1, 1), 
catchup=False) as dag:
@task
def start():
print("nothing special here")
return 42
trigger = TriggerDagRunOperator(
task_id='trigger',
trigger_dag_id='target_dag_{{ var.value.target_dag_version }}',
conf={
"data": "{{ ti.xcom_pull(task_ids='start') }}"
},
execution_date='{{ ds }}',
reset_dag_run=True,
wait_for_completion=True,
poke_interval=60,
)
@task
def end():
print("same here, just a placeholder")
start() >> trigger >> end()

Then the target DAG:

# Target DAG
from airflow.models import DAG
from airflow.decorators import task
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('target_dag_1_0', schedule_interval=None, 
start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task()
def start(dag_run=None):
# Data received from the TriggerDagRunOperator
print(dag_run.conf)
start()

The TriggerDagRunOperator is perfect if you want to trigger another DAG between two tasks like with SubDAGs (don’t use them 😉). I tend to use it especially for cleaning metadata generated by DAG Runs over time.

The ExternalTaskSensor for Dag Dependencies

Ideal when a DAG depends on multiple upstream DAGs, the ExternalTaskSensor is the other way to create DAG Dependencies in Apache Airflow. Harder to use than the TriggerDagRunOperator, it is still very useful to know. Instead of triggering explicitly another DAG, the ExternalTaskSensor allows you to wait for a DAG to complete before moving to the next task.

Let’s discover how it works.

external_dag_id

Very straightforward, this parameter expects the DAG id of the DAG where the task you are waiting for is. This parameter is required.

    waiting_for_dag = ExternalTaskSensor(
task_id='waiting_for_dag',
external_dag_id='target_dag',
)

external_task_id and external_task_ids

Well, that looks confusing isn’t it? The external_task_id parameter expects the Task id of the Task you are waiting for, whereas external_task_ids expects the list of Task ids for the Tasks you are waiting for. With the former you can wait for one task whereas for the second you can wait for multiple tasks in the same DAG. You must define one of the two but not both at the same time. Notice that behind the scene, the Task id defined for external_task_id is in fact passed to external_task_ids. In my opinion, stick with external_task_ids.

Another important thing to remember is that you can wait for an entire DAG Run to complete and not only Tasks by setting those parameters to None.

    waiting_for_dag = ExternalTaskSensor(
task_id='waiting_for_dag',
external_dag_id='target_dag',
external_task_ids=['end']
)

allowed_states and failed_states

The parameter allowed_states expects a list of states that mark the ExternalTaskSensor as success. By default it is set to state.SUCCESS which is usually what you want. You could as state.SKIPPED as well. However, the failed_states has no default value. What does it mean? It means that if the task you are waiting for fails, then your sensor will keep running forever. Therefore, always, always define the failed_states parameters with the value state.FAILED as shown below:

    waiting_for_dag = ExternalTaskSensor(
task_id='waiting_for_dag',
external_dag_id='target_dag',
external_task_ids=['end'],
failed_states=[State.FAILED]
)

execution_delta and execution_date_fn

Those parameters are very important. There is something that you absolutely need to take care of with the ExternalTaskSensor… the execution date! The execution date / logical date of the DAG where the ExternalTaskSensor is and the DAG where the task you are waiting for is, MUST MATCH. If not, then you have to define the delta either with execution_delta or execution_date_fn (not both) so they match. IT IS REQUIRED otherwise the ExternalTaskSensor will wait forever.

For Example, if the DAG with the ExternalTaskSensor is triggered with the logical date 2022-01-01 00:00, the logical date of the DAG where the task you are waiting for is, must have the same logical date 2022-01-01 00:00.

So, how to set the delta if the two DAGs don’t run on the same schedule interval?

The DAG below has the task end that you will monitor with the ExternalTaskSensor. This DAG is triggered every day at 10AM.

# target_dag with the task end
# DAG triggered every day at 10AM
with DAG('target_dag', schedule_interval='0 10 * * *', 
start_date=datetime(2022, 1 , 1), catchup=False) as dag:
@task()
def end():
print("does nothing")

The DAG below has the ExternalTaskSensor and waits for task end in target_dag to complete. Since this DAG is triggered every day at 10:05AM, there is a delta of 5 minutes that we must define. That’s what you can see in the execution_delta parameter. Notice that a positive timedelta means you go backward whereas a negative timedelta means you go forward. Behind the scene Airflow does logical date – timedelta(minutes=5) which gives 0 10 * * * like with target_dag.

from airflow.models import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta
with DAG('waiting_dag', schedule_interval='05 10 * * *', 
start_date=datetime(2022, 1 , 1), catchup=False) as dag:
waiting_for_dag = ExternalTaskSensor(
task_id='waiting_for_dag',
external_dag_id='target_dag',
external_task_ids=['end'],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=5),
)

Like execution_delta, execution_date_fn expects a timedelta which in this case returned by a function. That helps to define more complex timedelta if needed.

check_existence

When set to True, the ExternalTaskSensor checks if the task or the DAG you are waiting for exists. It not, it fails immediately. As best practice, always set it to True.

Implementation of the ExternalTaskSensor for DAG dependencies

Now you know exactly what every parameters do and why you need them, let’s see a concrete example of the ExternalTaskSensor.

# DAG with the ExternalTaskSensor
from airflow.models import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta
with DAG('waiting_dag', schedule_interval='05 10 * * *', 
start_date=datetime(2022, 1 , 1), catchup=False) as dag:
waiting_for_dag = ExternalTaskSensor(
task_id='waiting_for_dag',
external_dag_id='target_dag',
external_task_ids=['end'],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=5),
)
# Target DAG
from airflow.models import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('target_dag', schedule_interval='0 10 * * *', 
start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task()
def end():
print("last task")
end()

Like with the TriggerDagRunOperator, make sure both DAGs are unpaused. Otherwise, it doesn’t work.

The DAG Dependencies view

DAG dependencies can quickly become hard to manage. That’s why I strongly recommend you to use them carefully. Minimize as much as possible the number of DAG dependencies. That being said, since Airflow 2.1 a new view has been introduced: The DAG Dependencies view. This view shows all dependencies between DAGs in your Airflow instance. Notice that only the dependencies created either with the ExternalTaskSensor or the TriggerDagRunOperator.

To access the DAG dependencies view, go to Browse -> DAG Dependencies

Conclusion

DAG depencies in Apache Airflow are powerful. They allow you to avoid duplicating your code (think of a DAG in charge of cleaning metadata executed after each DAG Run) and make possible complex workflows. However, always ask yourself if you truly need this dependency. The more DAG depencies, the harder to debug if something wrong happens. It’s easy to get lost especially if you use the ExternalTaskSensor with different logical dates.

I hope you enjoyed this article, if you want to learn more about Airflow take a look at my course here.

Have a great day! 😉

Leave a Comment

Your email address will not be published. Required fields are marked *