Dynamic DAGs in Apache Airflow: The Ultimate Guide

Airflow dynamic DAGs can save you a ton of time. As you know, Apache Airflow is written in Python, and DAGs are created via Python scripts. That makes it very flexible and powerful (even complex sometimes). By leveraging Python, you can create DAGs dynamically based on variables, connections, a typical pattern, etc. This very nice way of generating DAGs comes at the price of higher complexity and subtle tricky things that you must know. By the way, if you are new to Airflow, check my course here; you will get it at a special discount. Ready? Let’s goooooo!

Use Case

Why might you need dynamic DAGs? It’s a good question. Let’s find out through an example.

Let’s imagine that you have a DAG that extracts, processes, and stores statistics derived from your data.

dynamic dag
DAG

Very simple DAG. Now, let’s say this DAG has different configuration settings. For example:

  • source (could be a different FTP server, API route etc.)
  • staticstics (could be mean, median, standard deviation, all of them or only one of those)
  • destination table (could be a different table for each API route, folder etc)

Also, you could have different settings for each of your environments: dev, staging, and prod.

The bottom line is that you don’t want to create the same DAG, the same tasks repeatedly with just slight modifications.

Why?

  • you waste your time (and your time is precious).
  • duplicating code.
  • it’s harder to maintain as each time something change, you will need to update all of your DAGs one by one.
  • increase the chances of getting bugs.
  • if you move from a legacy system to Apache Airflow, porting your DAGs may be a nightmare without dynamic DAGs.

Guess what? That’s what dynamic DAGs solve. 🤩

The confusion with Airflow Dynamic DAGs

The beauty of Airflow is that everything is in Python, which brings the powerfulness and flexibility of this language. Thanks to that, it’s pretty easy to generate DAGs dynamically.

Before I show you how to do it, it’s important to clarify one thing.

Dynamic DAGs are NOT dynamic tasks. It’s a common confusion. The former is when you create DAGs based on static, predefined, already known values (configuration files, environments, etc.). The latter is when you make tasks based on the output of previous tasks. Today, it’s not possible (yet) to do that. Apache Airflow needs to know what your DAG (and so the tasks) will look like to render it. Notice that an AIP Dynamic Task Mapping is coming soon. Stay tuned 😎

Ok, now let me show you the easiest way to generate your DAGs dynamically.

Python globals with Apache Airflow

You must know that Airflow loads any DAG object it can import from a DAG file. That means the DAG must appear in globals().

Hugh, What? 🤔 Let me show you.

Let’s say you want to get the price of specific stock market symbols such as APPL (Apple), FB (Meta), and GOOGL (Google). Create a Python file in your folder dags/ and paste the code below:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
def create_dag(symbol):
with DAG(f'get_price_{symbol}', start_date=datetime(2022,1, 1), 
schedule_interval='@daily', catchup=False) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract(symbol)))
return dag
for symbol in ("APPL", "FB", "GOOGL"):
dag = create_dag(symbol)

If you take a look at the Airflow UI, you obtain this

Wrong-way of generating DAGs

As you can see, it doesn’t work. You iterate over the symbols to generate a DAG for each, but you end up with only one DAG instead of three.

So, why is that?

Well, that’s because Airflow stores your DAG references in globals(). In Python, globals() is a built-in function that returns a dictionary of global variables. You can set or get variables as shown below (here, the variable my_dag):

>>> print(globals())
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>}
>>> globals()['my_dag'] = "mydag"
>>> print(my_dag)
mydag
>>> print(globals())
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, 'my_dag': 'mydag'}

Python stores a variable in globals() when you create it outside of a function, in the global scope.

Back to the DAG example, what happens is that the dag variable changes reference for each loop (symbol). Therefore, only the last DAG for GOOGL is created.

The bottom line: For dynamic DAGs,, you need to have a different variable name for each one.

You can learn more about globals() here.

So, what’s the correct way for having dynamic DAGS? The answer just below 👇

The single-file method

The single-file method is the easiest way to generate DAGs dynamically. With this method, you have:

  • a single python file
  • a function that returns a DAG
  • input parameters
  • a loop that generates the DAGs

Without further waiting, here is an example:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
def create_dag(symbol):
with DAG(f'get_price_{symbol}', start_date=datetime(2022,1, 1), 
schedule_interval='@daily', catchup=False) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract(symbol)))
return dag
for symbol in ("APPL", "FB", "GOOGL"):
globals()[f"dag_{symbol}"] = create_dag(symbol)

If you go on the Airflow UI:

Dynamic DAGs with the Single-File method

As you can see, you get the three DAGs get_price_APPL, get_price_FB, get_price_GOOGL. And yes, it’s the exact same example as before but we fixed the issue with globals() if you carefully look at the loop. Simple isn’t it?

What to know about the single-file method

Now, run the DAG get_price_GOOGL one time and once it is completed, remove the “GOOGL” symbol from the loop and refresh the page again. The DAG get_price_GOOGL disappears.

Dynamic DAGs without the GOOGL Dag

First thing to know, before Apache Airflow 2.2, DAGs that were dynamically generated and then removed didn’t disappear automatically. You had to remove them manually by clicking on the red trash. That caused a lot of confusion as you had DAGs on the UI that didn’t exist anymore. This has been fixed.

Second thing to know, removing an already triggered dynamic DAG doesn’t NOT remove its metadata. In fact, if you add the “GOOGL” symbol again. You get back the get_price_GOOGL DAG with the already triggered DAG Run as shown below:

Dynamic DAG metadata

In addition to those details, there are two major drawbacks with this method:

  • You have no visibility on the code of the generated DAGs. The example we use is quite easy, but imagine that you have a lot of tasks with many different inputs. Without being able to look at the generated code, debugging your DAGs may become really hard.
  • DAGs in the folder dags/ are parsed every min_file_process_interval. By default, the value is set to 30 seconds. That means, every 30 seconds your DAGs are generated. If you have a lot of DAGs to create, that may lead to serious performance issues. By the way, increasing the value means changes made on your DAGs will take more time to be reflected.

It worth to mention that you should never generate your DAGs based on inputs that come from DB or API requests.

Ultimately, I would recommend this method if you just have few simple DAGs to generate. Otherwise, there is another method that I love.

The multiple-file method

My favourite way (and the one I recommend) is the multiple-file method. With this method, you have:

  • a template file (the DAG skeleton)
  • a input file (where your inputs are)
  • a script file, in charge of generating your DAGs by merging the inputs with the template

It’s a little bit more work but,

  • less prone to errors. Typically, the script is part of a CI/CD pipeline.
  • it is scalable. Your DAGs generate once, not every 30 seconds.
  • you have full access to the generated code. Easier to debug.
  • ultimately it is a more reliable method

If you run Airflow in production, I would definitely advise you to use this method.

Airflow Dynamic DAGs with JSON files

Maybe one of the most common way of using this method is with JSON inputs/files. Let’s see how.

The first step is to create the template file. The DAG from which you will derive others by adding the inputs.

Notice that you should put this file outside of the folder dags/

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(f"get_price_DAG_ID_HOLDER", start_date=datetime(2022,1, 1), 
schedule_interval="SCHEDULE_INTERVAL_HOLDER", catchup=False) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract(INPUT_HOLDER)))

As you can see, it’s a pretty simple DAG with placeholders such as DAG_ID_HOLDER, INPUT_HOLDER or SCHEDULE_INTERVAL_HOLDER. Those placeholders will be replaced by the corresponding values in the JSON files.

The second step is to create the JSON files.

config_dag_appl.json

{
"dag_id": "APPL",
"schedule_interval": "@daily",
"input": "126"
}

config_dag_fb.json

{
"dag_id": "FB",
"schedule_interval": "@daily",
"input": "198"
}

config_dag_googl.json

{
"dag_id": "GOOGL",
"schedule_interval": "@daily",
"input": "3243"
}

Basically, for each DAG you want to generate, there is an associated JSON file.

The third and last step is to create the script that will replace the placholders in the template by the values in the config files and generate the DAGs.

import json
import os
import shutil
import fileinput
TEMPLATE_FILE = 'include/dynamic_dag/template.py'
for filename in os.listdir('include/dynamic_dag/'):
if filename.endswith('.json'):
config  = json.load(open(f"include/dynamic_dag/{filename}"))
new_dagfile = f"dags/get_price_{config['dag_id']}.py"
shutil.copyfile(TEMPLATE_FILE, new_dagfile)
for line in fileinput.input(new_dagfile, inplace=True):
line = line.replace("DAG_ID_HOLDER", config['dag_id'])
line = line.replace("SCHEDULE_INTERVAL_HOLDER", config['schedule_interval'])
line = line.replace("INPUT_HOLDER", config['input'])
print(line, end="")

At the end, you should have the following files and folders:

Files and folders

All right. Everything is ready, time to test!

Run the script with the command

python include/dynamic_dag/generate_dag.py

and you should obtain three new DAG files as shown below:

Files and folders

get_price_APPL, get_price_FB and get_price_GOOGL!

Awesome isn’t it? 🤩 I really recommend you this way of generating your DAGs. It’s reliable, sustainable, scalable and easier to debug. Yes, there is a little bit of work at first but the reward far exceeds the simplicity of the first method.

Dynamic DAGs with YAML and Jinja

Maybe you don’t know it but Apache Airflow uses Jinja to build its webpages as well as to render values in DAG files at run time.

So, what is Jinja?

Jinja is a template engine that takes a template file with special placehoders and replace them with data coming from a source.

For example, the code below leverages Jinja to fetch variables from the Airflow database.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "Airflow",
"start_date": datetime(2019, 7, 29),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "youremail@host.com",
"retries": 1
}
with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
t1 = BashOperator(
task_id="display",
bash_command="echo {{ execution_date }}",
)

I wrote an article about macros, variables and templating that I do recommend you to read here.

That being said, how can you leverage Jinja to generate DAGs dynamically? Ready? Go! 😎

The first step is to create the template file which is NOT a python file, but a jinja2 file like template_dag.jinja2. Again, it should be outside of the folder dags.

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG("get_price_{{ dag_id }}", start_date=datetime(2022,1, 1), 
schedule_interval="{{ schedule_interval }}", 
catchup={{ catchup or False }}) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def send_email(symbol):
print(symbol)
return symbol
send_email(process(extract({{ input }})))

If you carefully take a look at the template above, you can see placeholders with a weird notation. Two pairs of curly brackets. This notation is used by Jinja to identify that there is a value to put here. Basically, {{ dag_id_holder }} will be replaced by the corresponding value coming from your configuration file.

Notice the addition of {{ catchup or False }} for the catchup parameter. That’s the beauty of Jinja. It’s much more than just a way to replace placeholders at run time. It’s a powerful language that allows you to make conditions, for loops, filters, and much more. I cannot emphasize enough how important it is to take a look at its documentation here. For this example, you say that if the catchup value doesn’t exist in your configuration file, then False will be used.

Great! You have your template, the second step is to create the configuration files:

config_dag_appl.yaml

dag_id: 'APPL'
schedule_interval: '@daily'
catchup: False
input: 126

config_dag_fb.yaml

dag_id: 'FB'
schedule_interval: '@daily'
catchup: False
input: 198

config_dag_googl.yaml

dag_id: 'GOOGL'
schedule_interval: '@daily'
catchup: False
input: 3243

This time the config files are in YAML and not in JSON. You could perfectly stick with JSON but I would like to show how to do it with YAML as I feel it’s an easier to read language.

The Generator

Final step, the generator script for the dynamic DAGs!

from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('template_dag.jinja2')
for filename in os.listdir(file_dir):
if filename.endswith('.yaml'):
with open(f"{file_dir}/{filename}", "r") as configfile:
config = yaml.safe_load(configfile)           
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))

The code above is slightly different that the one before but the logic is identical. You load the template template_dag.jinja2, you loop over the folder where the config files are. Then the jinja template engine renders the template file with the values of each config file.

If you run this script, you will obtain the exact same three DAGs as before. However, you benefit from the powerfulness of the Jinja template engine and the readableness of the YAML language. Personally, I love this method!

In Action!

Conclusion

Waouh! That was a lot! Well done if you reached that far. In this article, you learned how to create dynamic DAGs in three different ways. The single-file method, the multiple-files method, and the “jinja method”. My advise is to stick with one of the two multiple-files methods if you run Airflow in production. There are really the most reliable and scalable ways. I would recommend a little more the jinja method as Jinja gives you a lot of flexibility in the code you can generate. Also, the YAML language is really easy to read and you can even add a validator to check the syntax of your config files. All right, that’s it for now!

Take care! ❤️

P.S: If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go way much further, enroll in my 12 hours course here

Leave a Comment

Your email address will not be published.