shortcircuitoperator_blog

The ShortCircuitOperator in Apache Airflow is simple but powerful. It allows skipping tasks based on the result of a condition. There are many reasons why you might want to stop running tasks. Let’s see how to use the ShortCircuitOperator and what you should know about it. If you are new to Airflow, check my courses here; you will get a special discount. Ready? Let’s goooooo!

A ShortCircuitOperator example

Imagine you have a data pipeline with different paths of tasks that must run according to the current day. Something like this:

In the example above, there is one ShortCircuitOperator task per day. If it’s Monday, the Operator completes, and tasks on Monday run while the other ShortCircuitOperators succeed and skip the downstream tasks. It is worth mentioning you can have a BranchPythonOperator in place of task_a that chooses between different paths.

The ShortCircuitOperator is often used to run data quality/sanity checks. A task in the following DAG below lists files in an SFTP folder. If empty, the ShortCircuitOperator skips the downstream tasks to stop the execution flow.

Overall, this operator is straightforward and allows you to avoid running tasks that don’t need to run. It helps to save time, money, and compute resources.

How does it work?

The ShortCiruitOperator comes from the PythonOperator and executes a python_callable. This python_callable can be a function or a script that returns True or False. True means the Operator succeeds and continues to run downstream tasks:

Successful DAG

With the following XCOM:

shortcircuitoperator xcom

False means the Operator skips ALL (not only direct) downstream tasks and marks them with the state “skipped.”

In this case, the ShortCircuitOperator creates two XComs:

The first one tells the tasks to skip.

How to use the ShortCircuitOperator?

To use the Operator, you must:

1️⃣ Import the Operator from the Python module
2️⃣ Define the Python function/script that checks a condition and returns a boolean
3️⃣ Implement the ShortCircuitOperator that calls the Python function/script

Here is an example:

from airflow.decorators import task, dag
from airflow.operators.python import ShortCircuitOperator
from airflow.models.baseoperator import chain
from datetime import datetime

def my_evaluation():
    return True

@dag(
    start_date=datetime(2023, 1, 1),
    schedule='@daily',
    catchup=False,
)
def test_shortcircuit():
    
    @task
    def start():
        print('hi')
        
    short_circuit = ShortCircuitOperator(
        task_id='short_circuit',
        python_callable=my_evaluation,
    )
    
    @task
    def end():
        print('bye')
        
    chain(start(), short_circuit, end())
    
test_shortcircuit()

Notice that my_evaluation doesn’t have to return an explicit True value. It can be a string, an int, anything that isn’t falsy. This is useful to pass additional data to downstream tasks.

The ShortCircuitOperator with XCOMs

Let’s move a little bit further. What if you want to skip downstream tasks based on the XCOM pushed by a previous task?

Simple!

from airflow.decorators import task, dag
from airflow.operators.python import ShortCircuitOperator
from airflow.models.baseoperator import chain
from datetime import datetime

def my_evaluation(value):
    return value

@dag(
    start_date=datetime(2023, 1, 1),
    schedule='@daily',
    catchup=False,
)
def test_shortcircuit():
    
    @task
    def start():
        return True
        
    short_circuit = ShortCircuitOperator(
        task_id='short_circuit',
        python_callable=my_evaluation,
        op_args=[start()],
    )
    
    @task
    def end():
        print('bye')
        
    chain(short_circuit, end())
    
test_shortcircuit()

In the example above, start returns True which creates an XCom with this value.
Next, we call in the op_args parameter of the ShortCircuitOperator to create a dependency between start and short_circuit and get the XCom that start returns. If you’re unfamiliar with this syntax, look at TaskFlow. Finally, my_evaluation takes that XCom as the value to return to the ShortCircuitOperator.

ShortCircuitOperator with Taskflow

TaskFlow is a new way of authoring DAGs in Airflow. It makes DAGs easier to write and read by providing a set of decorators that are equivalent to the classic operator. In the previous example, the task decorator is the equivalent of the PythonOperator.

That’s said, what about the ShortCircuitOperator?

Well, you can use @task.short_circuit. Here is how:

from airflow.decorators import task, dag
from airflow.models.baseoperator import chain
from datetime import datetime

def my_evaluation(value):
    return value

@dag(
    start_date=datetime(2023, 1, 1),
    schedule='@daily',
    catchup=False,
)
def test_shortcircuit():
    
    @task
    def start():
        return True
    
    @task.short_circuit
    def my_evaluation(value):
        return value
    
    @task
    def end():
        print('bye')
        
    chain(my_evaluation(start()), end())
    
test_shortcircuit()

Pretty clean isn’t it? I encourage you to look at TaskFlow, you will love this new implementation!

The issue with Trigger Rules

Let me show you an example:

from airflow.decorators import task, dag
from airflow.models.baseoperator import chain
from datetime import datetime

def my_evaluation(value):
    return value

@dag(
    start_date=datetime(2023, 1, 1),
    schedule='@daily',
    catchup=False,
)
def test_shortcircuit():
    
    @task
    def start():
        return False
    
    @task.short_circuit
    def my_evaluation(value):
        return value
    
    @task
    def extract():
        print('extract')
        
    @task(trigger_rule='all_done')
    def clean():
        print('clean')
        
    chain(my_evaluation(start()), extract(), clean())
    
test_shortcircuit()

If you run this DAG, you get:

shortcircuitoperator trigger rule

However, there is an issue here. Can you spot it?

clean should have been executed! Not skipped as we defined the trigger rule all_done.

Why is that?


By default, the ShortCircuitOperator skips ALL downstream tasks regardless of the trigger rule defined for them. To fix that, you can use the ignore_downstream_trigger_rules parameter. When set to False, the direct downstream tasks are still skipped, but the trigger rules for other subsequent downstream tasks are respected.

Let’s do it again:

...

    @task.short_circuit(ignore_downstream_trigger_rules=True)
    def my_evaluation(value):
        return value

...

And now it works:

ignore_downstream_trigger_rules

ShortCircuitOperator vs BranchPythonOperator

While both Operators look similar, here is a summary of each one with key differences:

BranchPythonOperator

  • Functionality: The BranchPythonOperator is used to dynamically decide between multiple DAG paths. It determines which path or paths should be taken based on the execution of a Python function.
  • Use Case: Ideal for scenarios where a DAG has multiple potential branches and the decision of which branch to follow depends on a condition evaluated at runtime.
  • Behavior: This operator allows multiple branches to be followed. The Python callable returns the task_id or a list of task_ids to follow. Tasks not returned by the callable are skipped.

ShortCircuitOperator

  • Functionality: The ShortCircuitOperator is a specialized form of the PythonOperator. It evaluates a condition and decides whether to continue executing the downstream tasks in the DAG.
  • Use Case: Best suited for scenarios where there is a need to stop the execution of downstream tasks based on a specific condition, essentially acting as a ‘circuit breaker’.
  • Behavior: Unlike the BranchPythonOperator, the ShortCircuitOperator does not choose between paths. Instead, it returns either True or False. If False, all downstream tasks are skipped; if True, the workflow continues as normal.

Key Differences?

  • Branching vs. Halting: The BranchPythonOperator is for branching into different paths, while the ShortCircuitOperator is for halting the execution of downstream tasks based on a condition.
  • Output: The BranchPythonOperator outputs the task_id(s) of the next task(s) to execute, whereas the ShortCircuitOperator outputs a boolean value.
  • Flexibility: The BranchPythonOperator offers more flexibility in complex workflows where multiple branches might need to be followed. In contrast, the ShortCircuitOperator is more binary – either continue or stop.

Therefore, while both operators serve to control the flow based on conditions, the BranchPythonOperator is more about selecting paths, and the ShortCircuitOperator is about stopping the flow altogether under certain conditions. The choice between them depends on your specific needs.

Conclusion

The ShortCircuitOperator is a simple yet powerful operator. It is helpful to prevent running tasks for various reasons. You just put it between tasks, and it halts your DAG flow execution based on your condition. On top of that, it can now respect trigger rules allowing you to build more complex use cases than before.

I hope you enjoy this article; if you want to learn more about Airflow, take a look at my course here.

See you! 😉

Leave a Reply

Your email address will not be published. Required fields are marked *