ShortCircuitOperator in Apache Airflow: The guide

The ShortCircuitOperator in Apache Airflow is simple but powerful. It allows skipping tasks based on the result of a condition. There are many reasons why you may want to stop running tasks. Let’s see how to use the ShortCircuitOperator and what you should be aware of. By the way, if you are new to Airflow, check my courses here; you will get at a special discount. Ready? Let’s goooooo!

ShortCircuitOperator example

Let’s say you have a DAG with different branches and want to execute a branch according to the current day.

You can implement that use case with the ShortCircuitOperator as shown below:

DAG with the ShortCircuitOperator
DAG with the ShortCircuitOperator

Notice that the BranchPythonOperators also works in this case. From the DAG above, you can clearly see which path runs according to the current day. If we are on Monday, the path with Monday tasks runs, and we skip the other paths.

Another example would be to use the ShortCircuitOperator as a sanity check. In the following DAG, you have a task that lists files in an SFTP folder and is_empty that skips the downstream tasks if no files exist for the current day.

Sanity checks with the ShortCircuitOperator

Overall, this operator is straightforward to use and allows to avoid running tasks that don’t need to run. That helps to save you time, money, and resources.

How does it work?

The ShortCiruitOperator comes from the PythonOperator and executes a python_callable. That python_callable can be a function or a script that returns either True or False. True means the operator continues to run the downstream tasks and creates an XCOM with the returned value.

Successful DAG
Successful DAG

False means the Operator skips ALL (not only direct) downstream tasks and marks them with the state “skipped.”

Skipped tasks
Skipped tasks

To use the Operator, you need to:

  1. Import the Operator
  2. Define the python function/script that returns False or a truthy value. That corresponds to the condition you want to evaluate to skip downstream tasks or not.
  3. Implement the ShortCircuitOperator

Here is an example:

from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import ShortCircuitOperator
from airflow.decorators import task
from airflow.utils.trigger_rule import TriggerRule
def _is_ok():
if 42:
return True
return False
with DAG(dag_id='basic', start_date=datetime(2022, 1, 1), catchup=False) as dag:
_is_ok = ShortCircuitOperator(
task_id='_is_ok',
python_callable=_is_ok,
)

If you run this code, the ShortCircuitOperator succeeds and creates an XCOM with the value returned by _is_ok()

ShortCircuitOperator with XCOM from _is_ok()
XCOM from _is_ok()

Notice that if the function returns any other value, you would also see it in the XCOM. That can be useful for downstream tasks if you want to make more complex conditional branches based on the result of the ShortCircuitOperator.

That’s it, it is as simple as that.

ShortCircuitOperator with XCOMs

Let’s move a little bit further. What if you want to skip downstream tasks based on the XCOM pushed by a previous task?

Simple! You just need to push your XCOM from a task and get it back as a parameter of the Operator. For that, op_kwargs is your friend.

Look at the code below

from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import ShortCircuitOperator
from airflow.decorators import task
from airflow.utils.trigger_rule import TriggerRule
def _is_empty(files, **kwargs):
print(files)
if len(files) == 0:
return False
return True
with DAG(dag_id='short_sftp', start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task
def list_sftp_files():
return ['file_a', 'file_b', 'file_c']
is_empty = ShortCircuitOperator(
task_id='is_empty',
python_callable=_is_empty,
op_kwargs={
'files': '{{ti.xcom_pull(task_ids="list_sftp_files")}}'
}
)
@task
def process():
print('process')
list_sftp_files() >> is_empty >> process()

This code corresponds to the second DAG example with the SFTP files.

First, the task list_sftp_files returns a list of files. In the real world, that could be a task that uses the SFTPHook to scan a folder with the describe_directory method. The task creates an XCOM with the list of files. Next, the ShortCircuitOperator executes the function _is_empty with the parameters defined in op_kwargs. Thanks to templating, we pull the XCOM from the task list_sftp_files and pass it as a parameter of the function _is_empty. That parameter named files contains the list of files returned by list_sftp_files. And just like that, we can use an XCOM as a condition to skip downstream tasks or not.

ShortCircuitOperator with Trigger Rules

Let me start with a DAG example:

from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import ShortCircuitOperator
from airflow.decorators import task
from airflow.utils.trigger_rule import TriggerRule
def _is_empty(files, **kwargs):
print(files)
if len(files) == 0:
return True
return False
with DAG(dag_id='short_sftp', start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task
def list_sftp_files():
return ['file_a', 'file_b', 'file_c']
is_empty = ShortCircuitOperator(
task_id='is_empty',
python_callable=_is_empty,
op_kwargs={
'files': '{{ti.xcom_pull(task_ids="list_sftp_files")}}'
},
)
@task
def process():
print('process')
@task(trigger_rule='all_done')
def clean():
print('clean')
list_sftp_files() >> is_empty >> process() >> clean()

that gives the following task states if you run it:

ShortCircuitOperator in action
ShortCircuitOperator in action

And that is an issue for us. Why? Look at the task clean.

Let’s say we want to execute clean regardless of the state of the process task. How can we do that?

Process task has the trigger rule all_done therefore, it should always run, but that’s not the case. Instead, it gets skipped like the others.

Indeed, by default, the ShortCircuitOperator skips ALL downstream tasks regardless of the trigger rule defined for them.

The solution? Use the ignore_downstream_trigger_rules parameter. When set to False, the direct downstream tasks are still skipped, but the trigger rules for other subsequent downstream tasks are respected.

Let’s fix the DAG:

        is_empty = ShortCircuitOperator(
task_id='is_empty',
python_callable=_is_empty,
op_kwargs={
'files': '{{ti.xcom_pull(task_ids="list_sftp_files")}}'
},
ignore_downstream_trigger_rules=True
)

And now, if you run the DAG:

ShortCircuitOperator with ignore_downstream_trigger_rules
ShortCircuitOperator with ignore_downstream_trigger_rules

it works 😎

Conclusion

The ShortCircuitOperator is a simple yet powerful operator. It is helpful to prevent running tasks for various reasons. You just put it between tasks, and it short-circuits your DAG based on your condition. On top of that, trigger rules can now be respected, allowing you to match with complex use cases that you might have.

I hope you enjoyed this article; if you want to learn more about Airflow, take a look at my course here.

See you! 😉

Leave a Comment

Your email address will not be published.