Airflow Taskflow API: The Guide

Airflow Taskflow is a new way of writing DAGs at ease. As you will see, you need to write fewer lines than before to obtain the same DAG. That helps to make DAGs easier to build, read, and maintain. The Taskflow API has three main aspects: XCOM Args, Decorator, and XCOM backends. In this tutorial, you will learn what the Taskflow API is, why it is crucial for you, and how to create your DAGs. Ready? Let’s go!

By the way, if you are new to Airflow, check my courses here; you will get at a special discount.

Before Airflow 2.0

The Airflow Taskflow API is not available before Apache Airflow 2.0. What does that mean? Well, I guess your DAGs usually look like that:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def _extract():
return 42
def _transform(ti):
data = ti.xcom_pull(task_ids='extract')
return data + 42
def _load(ti):
print(ti.xcom_pull(task_ids='transform'))
with DAG('old_dag', 
start_date=datetime(2022, 1, 1), 
schedule_interval='@daily', 
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=_extract
)
transform = PythonOperator(
task_id='transform',
python_callable=_transform
)
load = PythonOperator(
task_id='load',
python_callable=_load
)
extract >> transform >> load

Your DAGs would be way more complex, but you get the idea. You have a DAG with three tasks that share data using XCOMs. Those three tasks call three python functions _extract, _transform, and _load.

This way of writing DAGs was the only one for a long time, and I’m happy to tell you that this time is over. What if I tell you that you can get the same DAG with half less code making it more readable and faster to build?

Here is another use case of why the Taskflow API is incredible!

Look at the following DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def _a():
return 42
def _b():
return 100
def _c(ti):
print(ti.xcom_pull(task_ids=['a' , 'b']))
with DAG('old_dag', 
start_date=datetime(2022, 1, 1), 
schedule_interval='@daily', 
catchup=False
) as dag:
a = PythonOperator(
task_id='a',
python_callable=_a
)
b = PythonOperator(
task_id='b',
python_callable=_b
)
c = PythonOperator(
task_id='c',
python_callable=_c
)
a >> b >> c

Nothing fancy here. Task c needs data that tasks a and b return to print on the standard output. However, if you look at the dependencies, you see that:

Airflow Taskflow API
DAG dependencies

Don’t you see a problem here? Indeed, task c needs data from a and b, but it’s impossible to know that by looking at the dependencies. It’s an implicit dependency. A task needs data from another task, but you can’t see that on the dependencies. This example is simple, but imagine how hard it can be to debug your DAG if you have implicit dependencies with thousands of tasks.

Therefore, if you want to make implicit dependencies explicit and write DAGs at ease while making them more readable…

Say welcome to the Airflow Taskflow API 👋

What is the Airflow Taskflow API?

The Airflow Taskflow API makes it easier to author DAGs without extra code. Therefore, you get a sort of natural flow to define tasks and dependencies. I’m not saying it becomes a pleasure to write DAGs, but almost.

In addition, the Taskflow API makes it easy to share data between tasks as you don’t need to use xcom_pull and xcom_push methods anymore.

The Airflow Taskflow API has three main components:

  • XComArgs that allow task dependencies to be abstracted and inferred as a result of the Python function invocation (no worries, you are going to understand that in a minute)
  • Decorators that automatically create tasks for some Operators such as the PythonOperator, BranchPythonOperator, etc.
  • Custom XCom Backends to use another storage than the Airflow metadatabase to store your XCOMs (data)

In this tutorial, we will see XComArgs and Decorators. Custom XCom Backends will have a dedicated tutorial.

How to use XComArgs?

An XComArg represents an XCom push from a previous operator. It is as simple as that. Let me show you an example:

from airflow.models.xcom_arg import XComArg
from airflow.operators.python import PythonOperator
def _a():
return 42
a = PythonOperator(
task_id='a',
python_callable=_a
)
a
>>> <Task(PythonOperator): a>
a.output
>>> XComArg(<Task(PythonOperator): a>)
print(a.output)
>>> {{ task_instance.xcom_pull(task_ids='a', dag_id='adhoc_airflow', key='return_value') }}
print(a.output['my_key']
>>> {{ task_instance.xcom_pull(task_ids='a', dag_id='adhoc_airflow', key='my_key') }}

Remember that when a function/operator returns a value, it pushes that value as an XCom. Task a with the PythonOperator executes the function _a and returns the value 42. Here, 42 becomes the value of an XCom with the key return_value. Nothing new here.

The interesting part is the .output property. You can access the pushed XCom of any operator by using the .output property. That property is an XComArg that translates into

{{ task_instance.xcom_pull(task_ids=’a’, dag_id=’adhoc_airflow’, key=’return_value’) }}

If you don’t understand that notation, look at the article I made here and here.

You see, output does exactly what you would do to get the XCom pushed by the task a with xcom_pull.

In addition, the curly brackets show that output expects to be used in a templated field.

# Works, sql is a template field
task = PostgresOpertor(
task_id='task',
sql=f'SELECT COUNT(1) FROM table WHERE data={previous_task.output}'
)
# Doesn't work, parameters is not a template field
task = PostgresOpertor(
task_id='task',
sql=f'SELECT COUNT(1) FROM table WHERE data=%(my_param)s'
parameters={
'my_param': f'{a.output}'
}
)

You can use the output of a task as input to another task if its parameter is a template field. Otherwise, it doesn’t work.

To specify the key of the XCom you want to pull, pass a value to the key parameter of the XComArg

print(a.output['my_key']
>>> {{ task_instance.xcom_pull(task_ids='a', dag_id='adhoc_airflow', key='my_key') }}

That’s it. Now you know precisely what XComArgs are and how to use them. Think of them as an easier way to manipulate XComs.

Now it’s time to move to the second part of the Airflow Taskflow API, the Decorators.

Airflow Taskflow API and Decorators

Sometimes you have to produce a lot of code just to write simple tasks. Take the PythonOperator. The only thing it does is run a python callable. However, you have to do that:

from airflow.operators.python import PythonOperator
def _a():
return 42
with DAG(...) as dag:
a = PythonOperator(
task_id='a',
python_callable=_a
)

That’s a lot of code.

Before giving you any explanation, here is the same code with the Decorators:

from airflow.decorators import task
with DAG(...) as dag:
@task
def a():
return 42

BOOM! Amazing, isn’t it? Less code, clearer, for the same task.

The purpose of decorators is to simplify DAG authoring by removing the boilerplate code required by classic operators. Whether to use decorators or not is up to you. Bare in mind that they are not available for all operators. A few have their decorator equivalents, such as the BranchPythonOperator, PythonOperator, DockerOperator, KubernetesOperators, etc.

To access decorators, make the following import:

from airflow.decorators import task

For the complete list, take a look here https://github.com/apache/airflow/blob/main/airflow/decorators/init.py

XCOMs with Decorators

Ok, we know about XComArgs and Decorators. Now it’s time mix those two concepts together.

Here is how to share data between two tasks, without the decorators and XComArgs:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def _extract():
return 42
def _transform(ti):
data = ti.xcom_pull(task_ids='extract')
print(data)
with DAG(...) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=_extract
)
transform = PythonOperator(
task_id='transform',
python_callable=_transform
)
extract >> transform

Nothing new here. You must get the ti object (taskinstance) to call xcom_pull and pull the XCom from extract. There is a lot of boilerplate code here, and thanks to the XComArgs and Decorators we can remove all of that.

Here is the new version with the Airflow Taskflow API:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(...) as dag:
@task
def extract():
return 42
@task
def transform(value_from_extract):
print(value_from_extract)
transform(extract())

Beautiful! Do you see? No need to get ti anymore, no need to have all the useless boilerplate code. You focus only on what your tasks do and you define the dependencies in a way that it makes explicit that transform needs data from extract.

That’s it. That’s how to share data between tasks with the Taskflow API.

What if you want to share extract’s data with two tasks instead of one?

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('old_dag', schedule_interval='@daily', start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task
def extract():
return 42
@task
def transform_a(value_from_extract):
print(value_from_extract)
@task
def transform_b(value_from_extract):
print(value_from_extract)
data = extract()
transform_a(data)
transform_b(data)

You store extract’s data in a variable. Here data. Then you use the variable to pass the data to transform_a and transform_b.

Can you guess the value of the data if you print it?

print(data) = {{ task_instance.xcom_pull(task_ids='extract', dag_id='old_dag', key='return_value') }}

That’s right, it’s an XComArg will be used to pull data from extract into transform_a and transform_b.

You end up with the following dependencies:

XComArg dependencies
XComArg dependencies

Can you guess what the dependencies are if you do that:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('old_dag', schedule_interval='@daily', start_date=datetime(2022, 1, 1), catchup=False) as dag:
@task
def extract():
return 42
@task
def transform_a(value_from_extract):
print(value_from_extract)
@task
def transform_b(value_from_extract):
print(value_from_extract)
transform_a(extract())
transform_b(extract())

You actually create two distinct extract tasks. extract and extract_1. Notice that Airflow automatically adds a suffix to tasks are generated: _1, _2, _3, etc.

XComArg dependencies
XComArg dependencies

Pay attention when you manipulate Decorators and XComArgs that you have the dependencies you expect.

Conclusion

That’s it about the Airflow Taskflow API. There are many other things to know that you will discover in a dedicated tutorial about Airflow Decorators.

I hope you enjoyed this article; if you want to learn more about Airflow, take a look at my course here.

And stay up to date with Airflow by giving your email below 👇 (no spam)

See you! 😉

Leave a Comment

Your email address will not be published.