Airflow TaskGroups: All you need to know!

Airflow TaskGroups have been introduced to make your DAG visually cleaner and easier to read. They are meant to replace SubDAGs which was the historic way of grouping your tasks. The problem with SubDAGs is that they are much more than that. They bring a lot of complexity as you need to create a DAG in a DAG, import the SubDagOperator which is in fact a sensor, define the parameters properly, and so on. Therefore, SubDAGs are going to be deprecated and it’s time for you to make the BIG CHANGE! Ultimately, TaskGroups help you to better organize your tasks and maintain your DAGs without the harsh of SubDAGs. By the way, if you are new to Airflow, check my course here, you will get it with a special discount. Ready? Let’s goooooo!

Use Case

To better illustrate a concept, let’s start with the following use case:

DAG Example

In the DAG above you have the same two steps for a and b: process and extract. When you end up with a DAG like this, you have four solutions:

  • You keep the DAG as it is but if you have a ton of different steps for a ton of different sources (a, b, c, d, etc) you may regret your choice.
  • Or you group the tasks per step (process and export)
  • Or you group the tasks per source (a and b)
  • You group the tasks per path (process_a -> export_a)

How you choose to group your tasks is more a matter of preference than a technical issue. As a result, I tend to prefer step-by-step grouping over other ways. Ultimately, it’s a design issue that you need to resolve based on your use case and how your DAG may evolve in the future.

For this example, your goal is to end up with a DAG like this:

DAG Example with TaskGroups

where each blue box is a group of tasks as shown below

DAG Example with expanded TaskGroups

Well guess what, that’s exactly what the goal of the Airflow TaskGroups is! Let’s find out what they are and what you can do with them!

Airflow TaskGroups

A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped together when the DAG is displayed graphically.

Unlike SubDAGs where you had to create a DAG, a TaskGroup is only a visual-grouping feature in the UI. Keep in mind this. Currently, a TaskGroup is a visual-grouping feature nothing more, nothing less. Therefore, this implies two things: TaskGroups are very lightweight, and easy to use but, you don’t have as much control over the tasks in the group as you had in SubDAGs. You will see the limitations later in the article.

That being said, creating a TaskGroup is extremely simple.

Let’s say you have the following DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('my_dag', 
schedule_interval='@daily', 
start_date=datetime(2022, 1, 1), 
catchup=False) as dag:
start = BashOperator(task_id='start', bash_command='exit 0')
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
end = BashOperator(task_id='end', bash_command='exit 0')
start >> task_process_a >> task_process_b >> end
my_dag with no groups

and your goal is to group the tasks task_process_a and task_process_b together. With Airflow TaskGroups you just need to:

Make an import

from airflow.utils.task_group import TaskGroup

Define the TaskGroup and put your tasks under it.

with TaskGroup(group_id="processes") as processes:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')

Add the task group in your dependencies

start >> processes >> end

You end up with the following DAG:

from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('my_dag', 
schedule_interval='@daily', 
start_date=datetime(2022, 1, 1), 
catchup=False) as dag:
start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id="processes") as processes:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
end = BashOperator(task_id='end', bash_command='exit 0')
start >> processes >> end
my_dag with a taskgroup

That’s it. At this point, you know what is an Airflow TaskGroup and how to group your tasks with it. Well done 😎

Ok, that’s not all. Now it’s time to dive into the details! Let’s go!

The TaskGroup Basics

With Airflow TaskGroups they are some basic but important parameters to take care of. The first one is the group_id.

group_id

The group id is a unique, meaningful id for the TaskGroup. Why meaningful? Because this is what you will see displayed on the UI but not only… (teasing 😉)

The following code:

with TaskGroup(group_id='processes') as group_processes:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')

displays like this on the UI:

TaskGroup on the UI with group_id=processes

Notice that group_processes that corresponds to TaskGroup’s instance object doesn’t impact the name of the group on the UI. It is used only in the code to define the dependencies for example.

In addition, make sure that the group_id is unique. It must not conflict with the group_id of another TaskGroup or the task_id of another task. Otherwise, you will end up with an error.

Finally, you MUST define the group_id. It’s a required positional argument that expects a String made of alphanumeric characters, dashes, and underscore exclusively no longer than 200 characters. However, it can be set to None only if the TaskGroup is a root TaskGroup.

What is a root TaskGroup? It’s a task group with no group_id and no parent. By default, every DAG has a root Task Group. This really matters when you start nesting multiple task groups as you will see later in the article.

prefix_group_id

Remember when I said that the group_id is not only used on the UI? Well, this is it. In addition to the group_id, another parameter that you can use is prefix_group_id. Set to True by default, it adds the group_id as a prefix of all tasks within the group.

Concretely, those tasks:

with TaskGroup(group_id='processes') as group_processes:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')

have the tasks ids processes.task_process_a and processes.task_process_b.

Notice that on the Graph, you don’t see that:

Graph UI task group

but if you take a look at the task instances on the UI or if you list the tasks with the command airflow list tasks <dag_id>, you will see that.

You should be particularly careful with this with XCOMs and Branch. In the first case, you specify the task id of the task to pull the data from. On the other hand, you specify the task id of the next task to execute. In both cases, if you forget to put the prefix automatically added to your task in the task group then you will end up with an error.

As a best practice, I tend to recommend keeping this parameter to True as it reduces a lot of the risks of conflicts between your task ids. Especially with nested task groups. That’s why it’s even more important to define a meaningful group id.

TaskGroup Default Arguments

Wouldn’t be nice to apply default arguments to all tasks that are within a TaskGroup?

Indeed, let’s imagine that you have some pretty resource-consuming tasks grouped in the same TaskGroup. For that reason, you would like to execute one task at a time but only for this group.

Can you do that? Of course, you can!

With Airflow TaskGroups you can define a dictionary of default parameters to be used for every task. This dictionary overrides the default dictionary defined at the DAG level.

Let me give you a concrete example:

DEFAULT_ARGS = {
'pool': 'general'    
}
with DAG('my_dag', 
schedule_interval='@daily', 
start_date=datetime(2022, 1, 1), 
catchup=False,
default_args=DEFAULT_ARGS) as dag:
start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='processes', default_args={'pool': 'sequential'}) as group_processes:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')

In the above example, you define the default arguments at the DAG level with pool = general. That means every task of your DAG are executed in the pool “general”. Now, as you defined another set of default arguments at the TaskGroup level (look at the parameter default_args of the TaskGroup), only the tasks of this TaskGroup will be executed in the pool “sequential”.

By the way, this example shows you how to control the concurrency of a set of tasks in a TaskGroup. If the pool “sequential” has only one slot, then the tasks in that TaskGroup will be executed sequentially. One after the other.

Nested TaskGroups

What if you want to create a task group in a task group?

What if you want to create multiple TaskGroups in a TaskGroup?

Can you? Of course you can!

Let’s say you have two “paths” in which you have the same steps to process and store data from two different sources. You may want to have a DAG like this:

Nested TaskGroups

In the DAG above, you have two TaskGroups path_a and path_b. Each having two tasks task_process_ and task_store_. Those TaskGroups are themselves in one TaskGroup called paths. If you close all groups, you end up with the following DAG:

Nested TaskGroups (collapsed)

Nice and clean isn’t it? How to do this?

Simple, here is the code:

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
with TaskGroup(group_id='path_a') as path_a:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_store_a  = BashOperator(task_id='task_store_a', bash_command='exit 0')       
with TaskGroup(group_id='path_b') as path_b:
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
task_store_b  = BashOperator(task_id='task_store_b', bash_command='exit 0')
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

As you can see, the way you create and indent your TaskGroups defines the way they get nested. It is as simple as that.

As far as I know, there is no hard limit in terms of nesting your task groups, but as a best practice, I wouldn’t go beyond 3 levels.

Now, what if you want to execute task_process then task_store for each path. Then, you want to execute path_a first, then path_b?

Nested TaskGroups with Dependencies

Simple! Define the dependencies as usual

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
with TaskGroup(group_id='path_a') as path_a:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_store_a  = BashOperator(task_id='task_store_a', bash_command='exit 0')
task_process_a >> task_store_a       
with TaskGroup(group_id='path_b') as path_b:
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
task_store_b  = BashOperator(task_id='task_store_b', bash_command='exit 0')
task_process_b >> task_store_b
path_a >> path_b
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

I tend to advise defining your dependencies at the end of your DAG, TaskGroups are the only exception 😉

parent_group

You don’t have to explicitly put a task group under another task group to nest them. Another way is by using the parameter parent_group.

If task group B has the parameter parent_group=A then A nests B or task group A is the parent of task group B.

The code below gives you the exact same DAG like before but with the parameter parent_group.

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
None
with TaskGroup(group_id='path_a', parent_group=paths) as path_a:
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_store_a  = BashOperator(task_id='task_store_a', bash_command='exit 0')
task_process_a >> task_store_a       
with TaskGroup(group_id='path_b', parent_group=paths) as path_b:
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
task_store_b  = BashOperator(task_id='task_store_b', bash_command='exit 0')
task_process_b >> task_store_b
path_a >> path_b
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

Task Groups Without The Context Manager

So far you’ve created Task Groups with the context manager “with“. I tend to prefer this way but it is possible to create task groups without the context manager.

How?

Here is an example:

    start = BashOperator(task_id='start', bash_command='exit 0')
paths = TaskGroup(group_id='paths')
path_a = TaskGroup(group_id='path_a', parent_group=paths)
task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
task_store_a    = BashOperator(task_id='task_store_a', bash_command='exit 0')
task_process_a >> task_store_a       
path_b = TaskGroup(group_id='path_b', parent_group=paths)
task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')
task_store_b    = BashOperator(task_id='task_store_b', bash_command='exit 0')
task_process_b >> task_store_b
path_a >> path_b
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

That’s it. You just have a declare a variable and instantiate your task group. Cool, but is there something wrong here?

Indeed, if you try that code you will end up with this DAG:

Badly designed Task Groups

I’m not sure this is what you wanted 😅

The problem here is that Airflow doesn’t know to which Task Group the different tasks belong. As you don’t use a context manager, it’s not about indentation anymore. You need to explicitly indicate that a task belongs to a task group.

How? By using a parameter that every operator has, task_group.

Here is the fixed code:

start = BashOperator(task_id='start', bash_command='exit 0')
paths = TaskGroup(group_id='paths')
path_a = TaskGroup(group_id='path_a', parent_group=paths)
task_process_a  = BashOperator(task_id='task_process_a', task_group=path_a, bash_command='exit 0')
task_store_a    = BashOperator(task_id='task_store_a', task_group=path_a, bash_command='exit 0')
task_process_a >> task_store_a       
path_b = TaskGroup(group_id='path_b', parent_group=paths)
task_process_b  = BashOperator(task_id='task_process_b', task_group=path_b, bash_command='exit 0')
task_store_b    = BashOperator(task_id='task_store_b', task_group=path_b, bash_command='exit 0')
task_process_b >> task_store_b
path_a >> path_b
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

Notice the task_group parameter for each task indicating that task_process_a and task_store_a belong to the task group path_a, and task_process_b and task_store_b belong to the task group path_b.

With that code you get back your beautiful DAG:

Nested TaskGroups with Dependencies

Dynamically Generating Task Groups

Creating manually the same tasks over and over is not a funny thing to do. To avoid this, you can dynamically generate tasks in your DAGs.

Wouldn’t be nice to do the same for the Airflow TaskGroups?

Well, you can. Let’s say you would like to generate three task groups with two tasks for each one. One thing you could do is the following:

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
for gid in {'a', 'b', 'c'}:
with TaskGroup(group_id=f'path_{gid}') as path:
task_process  = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
task_store  = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
task_process >> task_store
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

which gives you:

Dynamically Generated Task Groups

Great! Wait a second, what if I want to execute the task groups in the following order: path_a, path_b, and finally path_c?

Here we go:

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
previous = None
for gid in {'a', 'b', 'c'}:
with TaskGroup(group_id=f'path_{gid}') as path:
task_process  = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
task_store  = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
task_process >> task_store
if previous:
previous << path
previous = path
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

which produces:

Dynamically Generated Nested Task Groups with Dependencies

Task Group Factory

To add reusability and modularity you may want to create a Task Group Factory. Like with the SubDAGs, you could create a factory function that is in charge of generating a task group (or multiple task groups) with multiple tasks.

That is especially useful when you have the same pattern across different DAGs and you don’t want to copy the same code everywhere.

So, how could you create a factory function that returns a task group to use in all of your DAGs?

Simple!

First, create a new file factory_task_group.py for example. Put this file in another folder like “include” that is not in your folder “dags“. Like that:

Files and folders

make sure the PYTHONPATH is aware of this folder so you can include your factory function in your DAG. (If you don’t know what the PYTHONPATH is, take a look here)

Copy and paste the following code:

from typing import List
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
def make_taskgroup(dag: DAG, sources: List[str]) -> TaskGroup:
with TaskGroup(group_id='paths', dag=dag) as paths:
previous = None
for gid in sources:
with TaskGroup(group_id=f'path_{gid}') as path:
task_process  = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
task_store  = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
task_process >> task_store
if previous:
previous >> path
previous = path
return paths

This code is more or less what you’ve done in the previous section. The only important pieces to notice here are:

  • You create a new python function that returns a TaskGroup
  • That python function expects a parameter dag which the DAG the generated task group belongs.
  • You can add as many parameters as you want to your function. Here we have sources which is a list of string like {“a”, “b”, “c”}

In your DAG, you import the factory function and you call it.

from include.factory_task_group import make_taskgroup
with DAG('my_dag', 
schedule_interval='@daily', 
start_date=datetime(2022, 1, 1), 
catchup=False) as dag:
start = BashOperator(task_id='start', bash_command='exit 0')
paths = make_taskgroup(dag, {'e', 'f', 'g'})
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

and that’s it. If you go on the Airflow UI, you will end up with the following DAG:

Dynamically Generated Nested Task Groups with a Factory Function

Pretty impressive isn’t it? Now you can call this function wherever you want, with different parameters and you will generate Task Groups.

Just keep in mind that a Task Group has a parameter dag so you can specify to which DAG that task group belongs.

The Decorator

In addition to the two classic ways of creating a task group, either with a context manager or not, there is a third way which is by using the decorator @task_group.

I like this one as it makes your code even cleaner/clearer than the classic ways.

This decorator is part of the broadly new way of creating your DAGs in Airflow with the Taskflow API.

To create Airflow TaskGroups with the decorator is even easier than with the other ways. Make the import, call the decorator, define your group under it and that’s it.

Here is an example:

from airflow.decorators import task_group
@task_group(group_id=f'path_a')
def path():
task_process  = BashOperator(task_id=f'task_process_a', bash_command='exit 0')
task_store  = BashOperator(task_id=f'task_store_a', bash_command='exit 0')
task_process >> task_store
path()

This code creates a task group called path_a with the two tasks task_process_a and task_store_a.

TaskGrous in Action!

Conclusion

As you can see, Airflow TaskGroups are extremely powerful. They are a very lightweight and flexible way of grouping your tasks in your DAGs in a much easier way than with SubDAGs. Now, it’s true that you have less control over your grouped tasks than with SubDAGs. You cannot retry an entire TaskGroup in one click nor clean its tasks at once but that’s really minor downsides compared to the complexity that SubDAGs bring. I strongly encourage to play with TaskGroups, you gonna fall in love with them and I see you in another article!

Take care! ❤️

P.S: If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go way much further, enroll in my 12 hours course here

Leave a Comment

Your email address will not be published. Required fields are marked *