airflow taskgroups

An Airflow TaskGroup helps make a complex DAG easier to organize and read. Airflow taskgroups are meant to replace SubDAGs, the historical way of grouping your tasks. Indeed, SubDAGs are too complicated only for grouping tasks. They bring a lot of complexity as you must create a DAG in a DAG, import the SubDagOperator (which is a sensor), define the parameters correctly, and so on. As of today, SubDAGs are deprecated, and you should stop using them. If you are new to Airflow, check out my course here; you will get it at a special discount. Ready? Let’s go!

Use Case

To better illustrate a concept, let’s begin with a use case:

giant dag for airflow taskgroup

In this tutorial, we will use taskgroups to organize the giant DAG above. Amazing, isn’t it?

I’m kidding. 🥹

But you get the point. With taskgroups, you can:

  • Apply default arguments to a set of tasks.
  • Turn task groups into modules to reuse them in different DAGs.
  • Visually group tasks to organize complex data pipelines.

Let’s take another example:

In the DAG above, you have two paths a and b that run the same steps process and store. When you end up with in this situation, you have four solutions:

  • Keeping the DAG as it is, but it can quickly become more and more complex.
  • Grouping the tasks per step (process and store)
  • Grouping the tasks per path (a and b)

How you group your tasks is a matter of preference rather than a technical question. As a best practice, you should group tasks with a common attribute (same source/actions etc.)

For this example, your goal is to end up with a DAG like this:

airflow dag with taskgroup

and if you expand the taskgroups:

expanded taskgroups

With the example in mind, let’s find out how to do it!

What is an Airflow Taskgroup?

A taskgroup is a collection of closely related tasks in a DAG visually grouped on the user interface.

Unlike SubDAGs where you had to create a DAG, a TaskGroup is only a visual-grouping feature in the UI. Therefore, Taskgroups are lightweight and easy to use.

There are two ways to define a taskgroup:

  • The TaskGroup class
  • The @task_group decorator (Taskflow API)

Which one to use depends on your personal preferences and use cases.

With the class:

from airflow.utils.task_group import TaskGroup

with DAG(...):

  with TaskGroup('process') as process:
    task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
    )

    task_process_b = PythonOperator(
      task_id='process_b',
      python_callable=lambda: print('process_b'),
    )

With the taskgroup decorator:

from airflow.decorators import task_group

with DAG(...):

  @task_group(group_id='my_task_group')
  def tg():
    task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
    )

    task_process_b = PythonOperator(
      task_id='process_b',
      python_callable=lambda: print('process_b'),
    )

  tg()

To generate taskgroups based on unknown inputs with dynamic task mapping, you must use the @task_group decorator instead of the class.

When to use an Airflow Taskgroup?

Here is a list of a few airflow taskgroup examples:

  • Visually better organize complex DAGs
  • Grouping tasks per machine learning model
  • Creating reusable sets of tasks in different DAGs
  • Applying default arguments to sets of tasks
  • Colocating dbt models and dbt tests with Astronomer Cosmos
  • Generating groups based on unknown inputs with dynamic task mapping.

Airflow taskgroup parameters

There are a few parameters to know with task groups

group_id

The group_id parameter is the unique identifier of a Taskgroup. It has to be meaningful as you will see it on the user interface but not only… (teasing 😉)

Look at the following code:

with TaskGroup(group_id='processes') as group_processes:
  task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')

It shows like that on the user interface:

simple airflow taskgroup example

Notice that group_processes corresponding to the Taskgroup’s instance object doesn’t impact the group’s name on the UI. It is used only in the code to define the dependencies for example.

In addition, make sure that the group_id is unique. It must not conflict with the group_id of another Taskgroup or the task_id of another task. Otherwise, you will get an error.

Finally, you must define the group_id. It’s a required positional argument that expects a String made of alphanumeric characters, dashes, and underscore exclusively no longer than 200 characters. However, it can be set to None only if the Taskgroup is a root Taskgroup.

What is a root Taskgroup? It’s a task group with no group_id and no parents. By default, every DAG has a root task group. This matters when you nest multiple task groups as you will see later in the tutorial.

prefix_group_id

Remember when I said that the group_id is not only used on the UI? Well, this is it. In addition to the group_id, another parameter that you can use is prefix_group_id. This parameter adds the group_id as a prefix for all tasks within the airflow taskgroup.

For example, the tasks that belong to the taskgroup below have the task ids process.process_a and process.process_b:

from airflow.utils.task_group import TaskGroup

with DAG(...):

  with TaskGroup('process') as process:
    task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
    )

    task_process_b = PythonOperator(
      task_id='process_b',
      python_callable=lambda: print('process_b'),
    )

However, you don’t see the prefix on the user interface but if you list the tasks of your DAG with airflow tasks list <dag_id>, you will get the ids with the prefix.

Pay attention when you mix taskgroups with XCOMs or a BranchOperator. Either way, you get an error if you forget to add the prefix to the task ids in the task group.

As a best practice, I recommend keeping this parameter to True (by default) as it reduces many of the risks of conflicts between your task ids, especially with nested task groups. That’s why it’s even more critical to define a meaningful group id.

Airflow Taskgroup default arguments

Wouldn’t applying default arguments to all tasks within a TaskGroup would be nice? Imagine that you have some pretty resource-consuming tasks grouped in the same Taskgroup. Therefore, you would like to execute one task at a time, but only for this group.

Can you do that? Of course, you can!

With Airflow Taskgroups you can define a dictionary of default parameters that will be applied for every task within the group. This dictionary overrides the default dictionary (if any) defined at the DAG level.

Let me give you an example:

DEFAULT_ARGS = {
  'pool': 'general'    
}

with DAG('my_dag', 
schedule_interval='@daily', 
start_date=datetime(2023, 1, 1), 
catchup=False,
default_args=DEFAULT_ARGS):
  start = BashOperator(task_id='start', bash_command='exit 0')

  with TaskGroup(group_id='processes', default_args={'pool': 'sequential'}) as group:
    task_process_a  = BashOperator(task_id='task_process_a', bash_command='exit 0')
    task_process_b  = BashOperator(task_id='task_process_b', bash_command='exit 0')

In the example above, you define the default arguments at the DAG level with pool=general. That means Airflow will run the tasks with the pool “general”. Since another set of default arguments exists at the Taskgroup level (look at the parameter default_args of the Taskgroup), the tasks within the group will use “sequential” instead of “general”. This is an example with pools but you can use any other arguments such as failure_callback, retries, etc.

Nested Airflow Taskgroups

What if you want to create a taskgroup in a taskgroup? Or create multiple taskgroups in a taskgroup?

Suppose you have two “paths” with the same steps to process and store data from two sources. You might want a DAG like this:

airflow nested taskgroups

You can see in the DAG above that the two task groups A and B are in the taskgroup sources. When you close all taskgroups, you get the following DAG:

closed taskgroups

Nice and clean isn’t it?

Here is the corresponding code:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('taskgroup', 
  start_date=datetime(2023, 1, 1), 
  schedule='@daily', 
  catchup=False):

    start = PythonOperator(
        task_id='start',
        python_callable=lambda: print('start'),
    )

    with TaskGroup('sources') as sources:

        with TaskGroup('A') as a:
            task_process_a = PythonOperator(
                task_id='process_a',
                python_callable=lambda: print('process_a'),
            )
            task_store_a = PythonOperator(
                task_id='store_a',
                python_callable=lambda: print('store_a'),
            )

        with TaskGroup('B') as b:
            task_process_b = PythonOperator(
                task_id='process_b',
                python_callable=lambda: print('process_b'),
            )

            task_store_b = PythonOperator(
                task_id='store_b',
                python_callable=lambda: print('store_b'),
            )

    end = PythonOperator(
        task_id='end',
        python_callable=lambda: print('end'),
    )

    chain(start, sources, end)

As far as I know, there is no hard limit regarding how deep you can go with nested taskgroups. However, I wouldn’t go beyond three levels.

Airflow Taskgroup dependencies

Taking the previous DAG example, how can you define dependencies in the task groups so that process tasks run before the store tasks?

nested taskgroup with dependencies

Simple, define the dependencies in the taskgroups:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG('taskgroup', 
  start_date=datetime(2023, 1, 1), 
  schedule='@daily', 
  catchup=False):

    start = PythonOperator(
        task_id='start',
        python_callable=lambda: print('start'),
    )

    with TaskGroup('sources') as sources:

        with TaskGroup('A') as a:
            task_process_a = PythonOperator(
                task_id='process_a',
                python_callable=lambda: print('process_a'),
            )
            task_store_a = PythonOperator(
                task_id='store_a',
                python_callable=lambda: print('store_a'),
            )

            task_process_a >> task_store_a

        with TaskGroup('B') as b:
            task_process_b = PythonOperator(
                task_id='process_b',
                python_callable=lambda: print('process_b'),
            )

            task_store_b = PythonOperator(
                task_id='store_b',
                python_callable=lambda: print('store_b'),
            )

            task_process_b >> task_store_b

    end = PythonOperator(
        task_id='end',
        python_callable=lambda: print('end'),
    )

    chain(start, sources, end)

parent_group

Little secret, you don’t have to put a task group in a task group to nest them. Another way is with the parent_group parameter.

  with TaskGroup('sources') as sources:
    pass

  with TaskGroup('A', parent_group=sources) as a:
    task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
    )
    task_store_a = PythonOperator(
      task_id='store_a',
      python_callable=lambda: print('store_a'),
    )

    task_process_a >> task_store_a

  with TaskGroup('B', parent_group=sources) as b:
      task_process_b = PythonOperator(
          task_id='process_b',
          python_callable=lambda: print('process_b'),
      )
      task_store_b = PythonOperator(
          task_id='store_b',
          python_callable=lambda: print('store_b'),
      )

      task_process_b >> task_store_b

This code gives the same DAG as the previous one using the parent_group parameter.

Taskgroup with the context manager

You must know that you can create task groups without the context manager “with”.

For example:

with DAG(...):

  start = PythonOperator(
      task_id='start',
      python_callable=lambda: print('start'),
  )

  sources = TaskGroup('sources')

  a = TaskGroup('A', parent_group=sources)
    
  task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
  )
  task_store_a = PythonOperator(
      task_id='store_a',
      python_callable=lambda: print('store_a'),
  )

  task_process_a >> task_store_a

  b = TaskGroup('B', parent_group=sources)
        
  task_process_b = PythonOperator(
      task_id='process_b',
      python_callable=lambda: print('process_b'),
  )

  task_store_b = PythonOperator(
      task_id='store_b',
      python_callable=lambda: print('store_b'),
  )

  task_process_b >> task_store_b

  end = PythonOperator(
      task_id='end',
      python_callable=lambda: print('end'),
  )

  chain(start, sources, end)

If you try this DAG you will see that:

messy task groups

The problem is that Airflow doesn’t know to which taskgroup the different tasks belong. As you don’t use a context manager, indentation doesn’t anymore. You need to explicitly indicate that a task belongs to a task group with the task_group parameter.

Here is the code fixed:

with DAG(...):

  start = PythonOperator(
      task_id='start',
      python_callable=lambda: print('start'),
  )

  sources = TaskGroup('sources')

  a = TaskGroup('A', parent_group=sources)
    
  task_process_a = PythonOperator(
      task_id='process_a',
      python_callable=lambda: print('process_a'),
      task_group=a,
  )
  task_store_a = PythonOperator(
      task_id='store_a',
      python_callable=lambda: print('store_a'),
      task_group=a,
  )

  task_process_a >> task_store_a

  b = TaskGroup('B', parent_group=sources)
        
  task_process_b = PythonOperator(
      task_id='process_b',
      python_callable=lambda: print('process_b'),
      task_group=b,
  )

  task_store_b = PythonOperator(
      task_id='store_b',
      python_callable=lambda: print('store_b'),
      task_group=b,
  )

  task_process_b >> task_store_b

  end = PythonOperator(
      task_id='end',
      python_callable=lambda: print('end'),
  )

  chain(start, sources, end)

How to generate Airflow taskgroups dynamically?

As you might know, in Airflow you can generate tasks using loops, so you don’t have to write the tasks repeatedly. A simple use case is when having different sources but the same steps. You will generate the tasks in a loop while going through the list of sources.

Let’s say you want to generate three task groups with two tasks for each one. Here is how to do it:

start = BashOperator(task_id='start', bash_command='exit 0')
with TaskGroup(group_id='paths') as paths:
  for gid in {'a', 'b', 'c'}:
    with TaskGroup(group_id=f'path_{gid}') as path:
      task_process  = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
      task_store  = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
      task_process >> task_store
end = BashOperator(task_id='end', bash_command='exit 0')
start >> paths >> end

This code gives:

dynamic taskgroups

What if you want to execute path_a, path_b, and path_c in this order?

start = BashOperator(task_id='start', bash_command='exit 0')
  with TaskGroup(group_id='paths') as paths:
    previous = None
    for gid in {'a', 'b', 'c'}:
      with TaskGroup(group_id=f'path_{gid}') as path:
        task_process = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
        task_store = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
        task_process >> task_store
      if previous:
        previous << path
      previous = path
  end = BashOperator(task_id='end', bash_command='exit 0')
  start >> paths >> end

The Taskgroup factory

To add reusability and modularity, you can create a taskgroup factory. This function returns a task group (or multiple task groups) based on inputs.

This is useful when you have the same pattern across different DAGs and you don’t want to copy and paste the same code everywhere.

First, create a new file factory_task_group.py. Put this file in another folder like “include” that is not in your folder “dags“. Like that:

airflow folders

Copy and paste the following code:

from typing import List
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator

def make_taskgroup(dag: DAG, sources: List[str]) -> TaskGroup:
  with TaskGroup(group_id='paths', dag=dag) as paths:
    previous = None
    for gid in sources:
      with TaskGroup(group_id=f'path_{gid}') as path:
        task_process = BashOperator(task_id=f'task_process_{gid}', bash_command='exit 0')
        task_store = BashOperator(task_id=f'task_store_{gid}', bash_command='exit 0')
        task_process >> task_store
      if previous:
        previous >> path
      previous = path
  return paths

This code is more or less what you’ve done in the previous section;

  • You create a new Python function that returns a task group.
  • That Python function expects a parameter dag which the DAG the generated task group belongs to.
  • You can add as many parameters as you want to your function. Here we have sources which is a list of strings like {“a”, “b”, “c”}

In your DAG, you import the factory function and you call it this way:

from include.factory_task_group import make_taskgroup

  with DAG('my_dag', 
    schedule_interval='@daily', 
    start_date=datetime(2022, 1, 1), 
    catchup=False):
    
    start = BashOperator(task_id='start', bash_command='exit 0')
    paths = make_taskgroup(dag, {'e', 'f', 'g'})
    end = BashOperator(task_id='end', bash_command='exit 0')
    start >> paths >> end

Taskgroup in action!

Conclusion

As you can see, Airflow TaskGroups are extremely powerful. They are a lightweight and flexible way of grouping tasks in your DAGs that is easier than SubDAGs. Play with taskgroups you will fall in love with them!

Take care! ❤️

P.S: If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow here. Or if you already know Airflow and want to go way much further, enroll in my 12 hours course here

Leave a Reply

Your email address will not be published. Required fields are marked *