The ShortCircuitOperator in Apache Airflow is simple but powerful. It allows skipping tasks based on the result of a condition. There are many reasons why you might want to stop running tasks. Let’s see how to use the ShortCircuitOperator and what you should know about it. If you are new to Airflow, check my courses here; you will get a special discount. Ready? Let’s goooooo!
A ShortCircuitOperator example
Imagine you have a data pipeline with different paths of tasks that must run according to the current day. Something like this:
In the example above, there is one ShortCircuitOperator task per day. If it’s Monday, the Operator completes, and tasks on Monday run while the other ShortCircuitOperators succeed and skip the downstream tasks. It is worth mentioning you can have a BranchPythonOperator in place of task_a
that chooses between different paths.
The ShortCircuitOperator is often used to run data quality/sanity checks. A task in the following DAG below lists files in an SFTP folder. If empty, the ShortCircuitOperator skips the downstream tasks to stop the execution flow.
Overall, this operator is straightforward and allows you to avoid running tasks that don’t need to run. It helps to save time, money, and compute resources.
How does it work?
The ShortCiruitOperator comes from the PythonOperator and executes a python_callable. This python_callable
can be a function or a script that returns True or False. True
means the Operator succeeds and continues to run downstream tasks:
With the following XCOM:
False
means the Operator skips ALL (not only direct) downstream tasks and marks them with the state “skipped.”
In this case, the ShortCircuitOperator creates two XComs:
The first one tells the tasks to skip.
How to use the ShortCircuitOperator?
To use the Operator, you must:
1️⃣ Import the Operator from the Python module
2️⃣ Define the Python function/script that checks a condition and returns a boolean
3️⃣ Implement the ShortCircuitOperator that calls the Python function/script
Here is an example:
from airflow.decorators import task, dag
from airflow.operators.python import ShortCircuitOperator
from airflow.models.baseoperator import chain
from datetime import datetime
def my_evaluation():
return True
@dag(
start_date=datetime(2023, 1, 1),
schedule='@daily',
catchup=False,
)
def test_shortcircuit():
@task
def start():
print('hi')
short_circuit = ShortCircuitOperator(
task_id='short_circuit',
python_callable=my_evaluation,
)
@task
def end():
print('bye')
chain(start(), short_circuit, end())
test_shortcircuit()
Notice that my_evaluation
doesn’t have to return an explicit True value. It can be a string, an int, anything that isn’t falsy. This is useful to pass additional data to downstream tasks.
The ShortCircuitOperator with XCOMs
Let’s move a little bit further. What if you want to skip downstream tasks based on the XCOM pushed by a previous task?
Simple!
from airflow.decorators import task, dag
from airflow.operators.python import ShortCircuitOperator
from airflow.models.baseoperator import chain
from datetime import datetime
def my_evaluation(value):
return value
@dag(
start_date=datetime(2023, 1, 1),
schedule='@daily',
catchup=False,
)
def test_shortcircuit():
@task
def start():
return True
short_circuit = ShortCircuitOperator(
task_id='short_circuit',
python_callable=my_evaluation,
op_args=[start()],
)
@task
def end():
print('bye')
chain(short_circuit, end())
test_shortcircuit()
In the example above, start
returns True which creates an XCom with this value.
Next, we call in the op_args
parameter of the ShortCircuitOperator to create a dependency between start
and short_circuit
and get the XCom that start
returns. If you’re unfamiliar with this syntax, look at TaskFlow. Finally, my_evaluation
takes that XCom as the value to return to the ShortCircuitOperator.
ShortCircuitOperator with Taskflow
TaskFlow is a new way of authoring DAGs in Airflow. It makes DAGs easier to write and read by providing a set of decorators that are equivalent to the classic operator. In the previous example, the task decorator is the equivalent of the PythonOperator.
That’s said, what about the ShortCircuitOperator?
Well, you can use @task.short_circuit
. Here is how:
from airflow.decorators import task, dag
from airflow.models.baseoperator import chain
from datetime import datetime
def my_evaluation(value):
return value
@dag(
start_date=datetime(2023, 1, 1),
schedule='@daily',
catchup=False,
)
def test_shortcircuit():
@task
def start():
return True
@task.short_circuit
def my_evaluation(value):
return value
@task
def end():
print('bye')
chain(my_evaluation(start()), end())
test_shortcircuit()
Pretty clean isn’t it? I encourage you to look at TaskFlow, you will love this new implementation!
The issue with Trigger Rules
Let me show you an example:
from airflow.decorators import task, dag
from airflow.models.baseoperator import chain
from datetime import datetime
def my_evaluation(value):
return value
@dag(
start_date=datetime(2023, 1, 1),
schedule='@daily',
catchup=False,
)
def test_shortcircuit():
@task
def start():
return False
@task.short_circuit
def my_evaluation(value):
return value
@task
def extract():
print('extract')
@task(trigger_rule='all_done')
def clean():
print('clean')
chain(my_evaluation(start()), extract(), clean())
test_shortcircuit()
If you run this DAG, you get:
However, there is an issue here. Can you spot it?
clean
should have been executed! Not skipped as we defined the trigger rule all_done
.
Why is that?
By default, the ShortCircuitOperator skips ALL downstream tasks regardless of the trigger rule defined for them. To fix that, you can use the ignore_downstream_trigger_rules
parameter. When set to False
, the direct downstream tasks are still skipped, but the trigger rules for other subsequent downstream tasks are respected.
Let’s do it again:
...
@task.short_circuit(ignore_downstream_trigger_rules=True)
def my_evaluation(value):
return value
...
And now it works:
ShortCircuitOperator vs BranchPythonOperator
While both Operators look similar, here is a summary of each one with key differences:
BranchPythonOperator
- Functionality: The BranchPythonOperator is used to dynamically decide between multiple DAG paths. It determines which path or paths should be taken based on the execution of a Python function.
- Use Case: Ideal for scenarios where a DAG has multiple potential branches and the decision of which branch to follow depends on a condition evaluated at runtime.
- Behavior: This operator allows multiple branches to be followed. The Python callable returns the task_id or a list of task_ids to follow. Tasks not returned by the callable are skipped.
ShortCircuitOperator
- Functionality: The ShortCircuitOperator is a specialized form of the PythonOperator. It evaluates a condition and decides whether to continue executing the downstream tasks in the DAG.
- Use Case: Best suited for scenarios where there is a need to stop the execution of downstream tasks based on a specific condition, essentially acting as a ‘circuit breaker’.
- Behavior: Unlike the BranchPythonOperator, the ShortCircuitOperator does not choose between paths. Instead, it returns either True or False. If False, all downstream tasks are skipped; if True, the workflow continues as normal.
Key Differences?
- Branching vs. Halting: The BranchPythonOperator is for branching into different paths, while the ShortCircuitOperator is for halting the execution of downstream tasks based on a condition.
- Output: The BranchPythonOperator outputs the task_id(s) of the next task(s) to execute, whereas the ShortCircuitOperator outputs a boolean value.
- Flexibility: The BranchPythonOperator offers more flexibility in complex workflows where multiple branches might need to be followed. In contrast, the ShortCircuitOperator is more binary – either continue or stop.
Therefore, while both operators serve to control the flow based on conditions, the BranchPythonOperator is more about selecting paths, and the ShortCircuitOperator is about stopping the flow altogether under certain conditions. The choice between them depends on your specific needs.
Conclusion
The ShortCircuitOperator is a simple yet powerful operator. It is helpful to prevent running tasks for various reasons. You just put it between tasks, and it halts your DAG flow execution based on your condition. On top of that, it can now respect trigger rules allowing you to build more complex use cases than before.
I hope you enjoy this article; if you want to learn more about Airflow, take a look at my course here.
See you! 😉