By default, your tasks get executed once all the parent tasks succeed. this behaviour is what you expect in general. But what if you want something more complex? What if you would like to execute a task as soon as one of its parents succeeds? Or maybe you would like to execute a different set of tasks if a task fails? Or act differently according to if a task succeeds, fails or event gets skipped? In order to address a ton of use cases with your data pipelines you need to master one concept, the Airflow Trigger Rules. Those rules define why your tasks get triggered. There are many different trigger rules, some of them are easy to use, others are needed for solving specific issues. So, ready? Let’s dive into the incredible world of trigger rules!
By the way, if you want to master Apache Airflow and learn more about its incredible features, take a look at my courses here
As usual, let’s begin with some concrete use cases so you get a better picture of why this feature might be useful for you.
Reacting when a task fails
What can you do if a task fails? Airflow offers different mechanisms but the common one to react in case of failure are the callbacks. It’s pretty simple, you pass a function to the operator’s argument on_failure_callback and as soon as your task fails, the function gets called. It’s great but there are some limitations.
- Callbacks are not managed by the scheduler, so if they fail, you cannot retry them neither be warned.
- Callbacks run simple, lightweight tasks like cleaning data. If you need a more a complex workflow with multiple tasks to run, you need something else.
Another way that addresses those downsides is by using Airflow Trigger Rules!
Solving the BranchPythonOperator pitfall
Let’s take a look at the data pipeline below:
What do you think happens for the task “storing” if “Is inaccurate” got skipped?
Well, storing gets skipped as well! If you don’t know why, take a look at the following post I made about the BranchPythonOperator. The problem is that it’s not what you want. Indeed, as the task “Is accurate” succeeded, then you want to trigger “storing”. How to solve this? By using Airflow trigger rules!
Airflow Trigger Rules: What are they?
I think trigger rules is the easiest concept to understand in Airflow.
Basically, a trigger rule defines why a task gets triggered, on which condition. By default, all tasks have the same trigger rule all_success set which means, if all parents of a task succeed, then the task gets triggered. Only one trigger rule at a time can be specified for a given task. Here is an example:
my_task = PythonOperator( task_id='my_task', trigger_rule='all_success' )
They are many trigger rules, let’s see each one of them.
This one is pretty straightforward, and you’ve already seen it, your task gets triggered when all upstream tasks (parents) have succeeded.
One caveat though, if one of the parents gets skipped, then the task gets skipped as well as shown below:
To change this behaviour, you need to set a different trigger rule to Task C that you are going to discover down below.
Pretty clear, your task gets triggered if all of its parent tasks have failed.
Can be really useful if you want to do some cleaning or something more complex that you can’t put within a callback. You can literally define a path of tasks to execute if some tasks failed.
Like with all_success, if Task B gets skipped, Task C gets skipped as well.
You just want to trigger your task once all upstream tasks (parents) are done with their execution whatever their state.
This trigger rule might be useful if there is a task that you always want to execute regardless of the upstream task’s states
As soon as one of the upstream tasks fails, your task gets triggered.
Can be useful if you have some long running tasks and want to do something as soon as one fails.
Like with one_failed, but the opposite. As soon as one of the upstream tasks succeeds, your task gets triggered.
Your task gets triggered if all upstream tasks have succeeded or been skipped.
Only useful if you want to handle the skipped status.
Before known as “none_failed_or_skipped” (before Airflow 2.2), with this trigger rule, your task gets triggered if all upstream tasks haven’t failed and at least one has succeeded.
This, is the rule you must set to handle the BranchPythonOperator pitfall 😎
With this simple trigger rule, your task gets triggered if no upstream tasks are skipped. If they are all in success or failed.
Airflow trigger rules are simple to use but yet, extremely powerful. They allow you to make more complex data pipelines and address real use cases. Don’t hesitate to use them in order to handle error in a better more reliable way that just with a callback.
Hope you enjoyed this new article!
PS: If you want to get started with Airflow now, take a look at the course I made for you here
See you ❤️