Dynamic Task Mapping in Apache Airflow

Dynamic Task Mapping is a new feature of Apache Airflow 2.3 that puts your DAGs to a new level. Now, you can create tasks dynamically without knowing in advance how many tasks you need. This feature is for you if you want to process various files, evaluate multiple machine learning models, or process a varied number of data based on a SQL request. Excited? Let’s dive into it! By the way, if you are new to Airflow, check my courses here; you will get at a special discount. Ready? Let’s goooooo!

Use Case

Why Dynamic Task Mapping? Let me give you an example. Suppose you have the following DAG in charge of fetching files from an S3 bucket.

DAG example for Dynamic Task Mapping (Files)
DAG example for Dynamic Task Mapping (Files)

How do you create tasks that process each file retrieved by the “fetching files” task when you don’t know in advance what these files will be? Hard to solve right?

Another example. Suppose you have the following DAG with a task that executes a SQL request and you want to create a task for each output record.

DAG example for Dynamic Task Mapping (SQL)
DAG example for Dynamic Task Mapping (SQL)

Again, how do you know the number of tasks you need to create based on an output you don’t know in advance?

Simple. Before Airflow 2.3, those use cases were not possible. You had to know the output of your tasks in advance to create the corresponding tasks first and then run your DAG. Since Airflow 2.3, you can solve those use cases thanks to the Dynamic Task Mapping feature! 😎

Dynamic Task Mapping: Expand

In order to dynamically create tasks, you need to call the method expand.

This is a new method available for any operator that multiplies the task that calls for it.

It expects a dict, a list, or one of those types stored in XCom as the result of a task.

That method works for both the task decorators and the classic operators.

Let’s say you have a DAG responsible for processing a varied number of files. The first step is to upload these files, but you don’t know how many you will get each day. On Monday you could have 5, on Tuesday 3, and so on.

The first step is to define the task that downloads every file. Prior to Airflow 2.3, you would have done something like this:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def download_files():
files = ["file_a", "file_b", "file_c"]
for file in files:
download(file)
download_files()

The problem with this approach is that if one file download fails, the entire task fails even if the previous downloads were successful.

That means a waste of time and resources. Instead, you want to have one task for every file to download so that if the download of one file fails, you only restart the corresponding task and not the others.

That is where expand comes in.

Since Airflow 2.3, you can do something like this:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def download_file(file: str):
download(file)
files = download_file.expand(file=["file_a", "file_b", "file_c"])

Which gives the following output:

expand on the Airflow UI

When you see a task with a pair of square brackets, it means multiple tasks generated by expand.

How many you ask? Just run your DAG:

expand after running DAG

The number 3 here is the number of generated tasks by expand as defined in the files list.

Ok great, but the files are still static isn’t it? This list doesn’t change over the DAG Runs so what about with various numbers of files?

Well, let’s see 👇

Dynamic Task Mapping: XComs

Here, we are going to create a new task that fetches a list of random files to download and put that list into an XCOM

from airflow import DAG
from airflow.decorators import task
import random
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def generate_files():
return [f"file_{nb}" for nb in range(random.randint(2, 5))]
@task
def download_file(file: str):
print(file)
files = download_file.expand(file=generate_files())

In the example above, the function generate_files returns a list of random filenames (between 2 and 5 included) such as [“file_0”, “file_1”, “file_2”, “file_3”] or [“file_0”, “file_1”].

If you copy the code and run it multiple times, you can see the number of download_file tasks changing as shown below:

The file parameter is a list of random filenames that you can find on the XCOM page as shown below

Dynamic task mapping with XComs

This list gets broken down to dynamically create one task per filename. That’s the power of Dynamic Task Mapping. It is not necessary to know in advance how many tasks you want to generate. 😎

Dynamic Task Mapping Over Multiple Parameters

So far, we’ve seen expand() with only one parameter but you can have many. A very simple example of that is shown below:

from airflow import DAG
from airflow.decorators import task
import random
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def generate_files():
return [f"file_{nb}" for nb in range(random.randint(2, 5))]
@task
def download_file(path: str, file: str):
print(f"{path}/{file}")
files = download_file.expand(path=["/usr/partner_a", "/usr/partner_b"], file=generate_files())

Let’s say you process files from two different partners A and B. As you can see, you can pass the list of paths as a new parameter of expand and that’s it.

By doing this, each file creates two tasks corresponding to the two paths. It gives a “cross product” mapped task with each combination of parameters.

Pretty powerful isn’t it?

But that’s not all!

Constant Parameters With Dynamic Task Mapping

What if you have a value that does not change when your task gets expanded at run-time? Like a path for example.

You may say: “easy! I put one single value for the path parameter, and that’s it”

Well not really.

  • For mapped parameters (used to generate X number of tasks) use expand()
  • For unmapped parameters (not used to generate X number of tasks) use partial()

Use partial() to define required arguments such as the task_id or others, depending on the Operator you use.

As we used the task decorator so far, it applies the function name as the default task_id but if you use a classic Operator, here is what you must do:

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import random
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def generate_files():
return [f"file_{nb}" for nb in range(random.randint(2, 5))]
@task
def download_file(path: str, file: str):
return (f"echo '{path}/{file}'")
print_file = BashOperator.partial(task_id="bash", do_xcom_push=False).expand(
bash_command=download_file.partial(path="/usr/partner_a").expand(file=generate_files())
)

Take a pause here because this piece of code is pretty amazing.

First, we use partial() to define constant parameters of the BashOperator such as the task_id and do_xcom_push. If you try to define those parameters in expand, that won’t work.

Next, we use partial() for download_file as well since the path parameter is not mapped. There is only one value here.

Then, we use the random number of mapped tasks for download_file as the input of the bash_command parameter, which is mapped itself.

For example, if four download_file tasks are generated, then four print_file tasks are generated as well.

Dynamic Task Mapping with classic operators

Notice that download_file returns the actual bash command that will be executed (echo ‘path/file’)

ZIP-like Dynamic Task Mapping

We’ve seen that you can expand over multiple arguments that generate tasks in a cartesian-product fashion.

What if, instead of generating tasks like that, you would like to do it in a zipped fashion (cf: Python zip)

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import random
from datetime import datetime
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
@task
def generate_paths():
return ['/usr/partner_a', '/usr/partner_b']
@task
def generate_files():
return ['file_a', 'file_b']
@task
def aggregate(path: str, file: str):
return list(zip(path, file))
@task
def download_file(filepath: str):
print(filepath)
aggregated = aggregate(path=generate_paths(), file=generate_files())
download_file.expand(filepath=aggregated)

Here, there is a little “trick” with aggregate(). The role of aggregate() is to manually perform the zip operation and unpack the zipped value.

It’s not ideal for the reasons described here, but it’s the only way for now. Notice that it doesn’t work for classic operators.

If you run this code, you end up with the following DAG:

zip fashion dynamic task mapping

The output is:

[“/usr/parter_a”, “file_a”]

[“/usr/parter_b”, “file_b”]

Conclusion

Dynamic task mapping is a powerful feature introduced in Airflow 2.3.0 that so many people have been waiting for. Now you can generate tasks at run-time based on unknown upfront values. There are so many use cases where you might need this. It is worth mentioning that this functionality will improve over time. Stay tuned!

I hope you enjoyed this article; if you want to learn more about Airflow, take a look at my course here.

Have a great day! 😉

Leave a Comment

Your email address will not be published. Required fields are marked *