How to use templates and macros in Apache Airflow

Templates and Macros in Apache Airflow are the way to pass dynamic data to your DAGs at runtime. Let’s imagine that you would like to execute a SQL request using the execution date of your DAG? How can you do that? How could you use the DAG id of your DAG in your bash script to generate data? Maybe you need to know when your next DagRun will be? How could you get this value in your tasks? Well, all of these questions can be answered using macros and templates.  While those two concepts work well together, we are going to first define them separately and then, in combination to show you how powerful they are. We will finish this tutorial by creating a beautiful data pipeline composing of a BashOperator, PythonOperator and PostgresOperator using templates and macros. One more thing, if you like my posts, you can support my work by becoming my Patron right here. No obligation but if you want to help me, I will thank you a lot.
 
Let’s go!

“Wait what? I thought we were going to talk about macros and templates, not variables!” Don’t worry I didn’t lie to you. But before moving to the use of macros and templates in Apache Airflow, you absolutely need to know what are variables and how to use them. If you take a look at the documentation, variables are defined as a generic way to store and retrieve arbitrary content within Airflow. They are represented as a simple key value stored into the meta database of Airflow. 

They can be extremely useful as all of your DAGs can access the same information at the same location and you can even use them to pass settings in JSON format. Think about the DockerOperator with its parameters such as cpus, mem_limit, auto_remove and so on. You could configure all of those parameters at one place using a variable containing those settings in JSON. Then, you would just need to call that variable each time a task uses the DockerOperator avoiding to duplicate the same settings over and over. 

By the way, if you want to learn more about the DockerOperator, I made a very useful tutorial with a data pipeline using Spark right here

How to define a variable in Apache Airflow

They are two ways of defining variables in Apache Airflow. Either programmatically or with the user interface. In this article, I’m gonna focus on the UI. To define a variable, it’s really easy. You just have to go to the Airflow’s UI, then click on “Admin” and “Variables” as show by the screenshot below.

variables apache airflow

Once it’s done, you should land to the following screen

variable_view_apache_airflow

This screen contains a table where your variables will be displayed. Notice that this table has three columns:

  • Key: The key is used to get or set the value of the variable from your DAG.
  • Value: Corresponds to the value associated with the key.
  • Is Encrypted: Specify if the variable is encrypted or not in the meta database (Notice that the crypto package must be installed).

Alright, now, let’s create our first variable that we gonna use in our data pipeline. To do so, click on “Create” and you should get the following view 

create_variable_view_apache_airflow

In the field “Key” we set “source_path” (without the double quotes) and in the field “Val” we set “/usr/local/airflow” (again without the double quotes). You should end up with the following view:

create_variable_view_filled_apache_airflow

Now click on Save and we get our first variable “source_path” listed from the table as shown below

variable_created_view_apache_airflow

And that’s it. You just have created your first variable that you will be able to get from any task you want just by using its key. Don’t worry, you gonna discover how to do it soon. Now let’s move to the topic of templating.

Getting started with templates in Apache Airflow

Let’s first define what is templating actually. Templating allows you to interpolate values at run time in static files such as HTML or SQL files, by placing special placeholders in them indicating where the values should be and/or how they should be displayed. To make things clearer, imagine that you have the following HTML file: 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<!doctype html>

<html lang="en">
<head>
  <meta charset="utf-8">

  <title>{{ title_to_insert }}</title>
  <meta name="description" content="The HTML5 Herald">
  <meta name="author" content="SitePoint">

  <link rel="stylesheet" href="css/styles.css?v=1.0">

</head>

<body>
  <script src="js/scripts.js"></script>
</body>
</html>

Notice the placeholder {{ title_to_insert }}. Well, when the page get rendered, the HTML code is processed by a template engine which replaces this placeholder by the value having the key title_to_insert. Templating is a really powerful concept as you can insert data in static files where you don’t know yet the value and so make your code even more dynamic.

By the way, the pair of curly brackets {{ }} indicates where the template engine should render the values.

Finally, you can see the very simple schema below representing the processing flow a template engine.

template_engine

How to use templates in Apache Airflow

Now you know the basics, you may ask yourself how can you use templates in Apache Airflow. Well, exactly as I showed you with the HTML example, by putting a pair of curly brackets where you want to template a value inside your DAG. 

Let’s start with a very simple example,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo {{ execution_date }}",
            )

    t1

This DAG is composed of only one task using the BashOperator. What that task does is to display the execution date of the DAG. Notice the special notation here, {{ execution_date }}. The curly brackets indicate to Jinja (the template engine used by Airflow) that there is something to interpolate here. In our case, we tell Jinja to replace {{ execution_date }} by the value of the variable named execution_date inside the bash_command parameter. 

Once the DAG is rendered, we obtain the following code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo 2019-01-01T00:00:00+00:00",
            )

    t1

You can check the execution of that task by using the command below without having to run the DAG:

1
airflow test macro_and_template display 2019-01-01                                             

And you should end up with an output looking like the following:

output_airflow_test_template

By the way, if you want to learn more about using the CLI in Airflow, you can early access my new Apache Airflow course for only 1$ by clicking here.

Alright, now you know how to add templates in your tasks, you may wonder where the variable execution_date comes from and can we template other parameters than bash_command.

Predefined variables in Apache Airflow

Apache Airflow brings predefined variables that you can use in your templates. They are very useful since they allow you to have information about the current executing DAG and task. 

Here is the non-exhaustive list:

Variable Description
{{ ds }}
The execution date of the running DAG as YYYY-MM-DD
{{ prev_ds }}
The previous execution date of the running DAG as YYYY-MM-DD
{{ next_ds }}
The next execution date of the running DAG as YYYY-MM-DD
{{ dag }}
The DAG object
{{ task }}
The Task object
{{ ti }}
The task_instance object
{{ params }}
A reference to the user-defined params dictionary defined in the calling task
{{ var.value.my_var }}
global defined variables represented as a dictionary

If you want the exhaustive list, I strongly recommend you to take a look at the documentation.

Before moving forward, let me stress out three important points from the table above. 

As you may have noticed, some values of those variables are objects and not literal values such as a string, date or number. It means that you can access object’s attributes and methods using the dot notation. For example, if you want to get the dag_id of your DAG you could type: {{ dag.dag_id }}

Second thing; Do you remember the variable we created earlier with the key “source_path”? Well, if you want to access it from your DAG you would need to type: {{ var.value.source_path }}. Any variables that you create will be get by using the notation {{ var.value.var_key }}. If we take back the DAG example, the task “display” will look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo {{ var.value.source_path }}",
            )

    t1

If you test the task using the command airflow test, you will get the following output:

output_airflow_variable_own

Last thing I want to show you is the predefined variable “params”. Operators allow you to pass custom options using the parameter “params” when you define a task. Again, let’s use a concrete example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG, macros
from airflow.operators.bash_operator import BashOperator

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo {{ params.my_param }}",
            params={"my_param": "Hello world"}
            )

    t1

As you can see from the DAG above, I reused the BashOperator but this time I added the parameter “params”. Params takes a dictionary defining the custom parameters as key-value pairs. Here, I created the custom parameter named my_param with the value ‘Hello world’. Then in the bash_command, I tell to the template engine to get the value from “params.my_param”. If you test the task “display” you will get the following result:

output_param_apache_airflow

Final note: What do you think you will get if you try to change ‘Hello world’ by {{ execution_date }}?

I let you 5 min to make the modification and test the task before showing you the solution.

I hope you tried 🙂 So, after modifying the DAG you should have this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG, macros
from airflow.operators.bash_operator import BashOperator

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo {{ params.my_param }}",
            params={"my_param": "{{ execution_date }}"}
            )

    t1

Execute the task with the command below:

1
airflow test macro_and_template display 2019-01-01                                             

And …

output_params_apache_airflow_template

it doesn’t work. As you can observe, the template {{ execution_date }} has not been interpolated. Why? Let’s discover this in the next section.

Did you learn something? Become my Patron and get more high quality tutorials

Templated parameters in Apache Airflow

Just to make a quick recap, we have seen that templates work with the parameter “bash_command” but not with the parameter “params” of the BashOperator. The reason is that Airflow defines which parameter can be templated or not. All parameters can’t be templated. In order to know if you can use templates with a given parameter, you have two ways:

The first way is by checking at the documentation. For example, if you take look for the BashOperator right here, you obtain the following description about the parameter bash_command:

  • bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed. (templated)

The word”(templated)” at the end indicates that you can use templates with this parameter. Notice that the parameter “env” is also templated.

The second way is by looking at the source code of the operator. For example, if we take the source code of the BashOperator, we can see the following lines:

code_bashoperator_template_airflow

The variable for which we are interested is “template_fields”. This variable indicates which parameters are templated. You may ask why do I show you this as you could simply use the documentation? Well, because you will see that you can “override” this variable for existing operators in order to make some parameters templated that aren’t by default. We will use this technique with the PostgresOperator.

Final note, the “template_ext” variable defines which file extensions to look for in the templated fields. Meaning, the template engine will render the files having those extensions if they are used in the bash_command templated parameter. Yes, using templates and macros in Apache Airflow, you will be able to directly inject data in your script files too. You will see that in the data pipeline we gonna make.

Getting started with macros in Apache Airflow

What are macros? We have seen what are the variables and how can we use them in combination with templates. So what is the last part of this recipe to make your DAGs more dynamic? From the documentation of Airflow,

” Macros are a way to expose objects to your templates and live under the macros namespace in your templates. “

In other words, macros are functions that take an input, modify that input and give the modified output. Macros can be used in your templates by calling them with the following notation: macro.macro_func().

As usual, let me show you a quick example to grasp the notion with the following DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from airflow import DAG, macros
from airflow.operators.bash_operator import BashOperator

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:
    
    t1 = BashOperator(
            task_id="display",
            bash_command="echo 'execution date : {{ ds }} modified by macros.ds_add to add 5 days : {{ macros.ds_add(ds, 5) }}'"
            )

    t1

If you execute that task with airflow test, you will get the following output:

output_macro_bashoperator_airflow

As you can observe, the execution date has been modified from 2019-01-01 to 2019-01-06 using the macro “ds_add()”. Notice also that I didn’t use the variable execution_date for this example as the macro ds_add expects a string as first parameter which execution_date is not. It is a date object. If you want the string version, you have to use the variable ds. Airflow brings its own macros that you can find here. Don’t hesitate to take a look at the documentation. Let’s move the final section where you will discover the DAG related to the templates and macros in Apache Airflow.

Templates and Macros in Apache Airflow

Maybe you didn’t even notice it but you have just used templates and macros in combination. Templates and macros in Apache Airflow are really powerful. You are now able to add and modify data to your DAGs at runtime. The things we haven’t seen yet is how to use templates and macros in script files such as SQL file or BASH file and how can we extend existing operators to make some parameters template compatible.

Workflow of the DAG

 Let me present you what our DAG does.

It is composed of 4 tasks, each of them using templates and macros in different ways.

  1. First, a variable named templated_log_dir is built using the variable “source_path” we created earlier and some macros and variables to have a path according to the execution date of the DAG. This path will look like this once rendered: <variable source_path>/data/macros(<execution_date>)/
  2. Then, task 1 generates logs using the BashOperator by calling a script named generate_new_logs.sh. This script creates new logs in the templated_log_dir folder given by the variable {{ ds }}. Basically we will end up with the following path: /usr/local/airflow/data/2019-01-01-00-00/log.csv
  3. Task 2 checks that “log.csv” has been created as expected. To do so, it executes a bash command with the templated path where the file should be located.
  4. Task 3 uses the PythonOperator to execute an external python script named “process_log.py” in order to process and clean “log.csv” using the library Pandas. This script needs to know where the file is and so the templated path is given as parameter to the task. Again, this path is rendered at runtime. The task produces a new file named “processed_log.csv”
  5. Finally, once the task 3 is finished, task 4 creates a table corresponding to the data contained in “processed_log.csv”, gets the data and loads them into a PostgreSQL database. All of these steps are described in a script named insert_log.sql. Like the previous task, the SQL script needs to know where “processed_log.csv” is located. To do so, we extend the PostgresOperator and override the variable template_fields to make the parameter “parameters” compatible with templates.

The DAG using templates and macros in Apache Airflow

Enough talking, here is the DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import sys
import airflow
from airflow import DAG, macros
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

# Would be cleaner to add the path to the PYTHONPATH variable
sys.path.insert(1, '/usr/local/airflow/dags/scripts')

from process_logs import process_logs_func

class CustomPostgresOperator(PostgresOperator):
    template_fields = ('sql', 'parameters')

default_args = {
            "owner": "Airflow",
            "start_date": datetime(2019, 7, 29),
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com",
            "retries": 1
        }

templated_log_dir = """{{ var.value.source_path }}/data/{{ macros.ds_format(ts_nodash, "%Y%m%dT%H%M%S", "%Y-%m-%d-%H-%M") }}"""

with DAG(dag_id="macro_and_template", schedule_interval="*/10 * * * *", default_args=default_args) as dag:

    # Notice that passing templated_log_dir to params won't have any effects
    # templated_log_dir won't be templated in the script generate_new_logs.sh
    # as params is not in template_fields.
    t1 = BashOperator(
            task_id="generate_new_logs",
            bash_command="./scripts/generate_new_logs.sh",
            params={'filename': 'log.csv', 'no_effect': templated_log_dir})

    t2 = BashOperator(
            task_id="logs_exist",
            bash_command="test -f " + templated_log_dir + "/log.csv",
            )

    t3 = PythonOperator(
            task_id="process_logs",
            python_callable=process_logs_func,
            provide_context=True,
            templates_dict={'log_dir': templated_log_dir},
            params={'filename': 'log.csv'}
            )

    t4 = CustomPostgresOperator(
            task_id="save_logs",
            sql="./scripts/insert_log.sql",
            parameters={'log_dir': templated_log_dir + '/processed_log.csv'}
            )

    t1 >> t2 >> t3 >> t4

With everything we have seen before, it shouldn’t be difficult to understand what the DAG does and where the templated values are rendered. If so, please let me know in the comment section 🙂

Conclusion

Congratulation if you have reached this point! I hope you have a learned new exciting concepts about Airflow. Templates and macros in Apache Airflow are really powerful to make your tasks dynamic and idempotent when you need time as input. You can’t hard code a date as the task won’t work anymore if you want to run it in the past or in the future. This date must be dynamic and must change according to when the DAG is executed. Templates, variables and macros are the way to do it.

Finally, this tutorial is not fully complete. There are other nice things I still didn’t mention and I didn’t provide the scripts as I’m working on them. If you want to learn more, in video, using the DAG I made with all the materials required you have two options. Either you get an EARLY ACCESS to my course: Apache Airflow: The Complete Hands-On Course to Master Apache Airflow V2 for $1  by clicking here. 

Or, you can learn about mastering Airflow and wait for the update to come out by taking my course right here.

Alright, I hope you enjoyed the tutorial and see you for the next one! 

Have a nice day 🙂

Interested by learning more? Stay tuned and get special promotions!

Liked it? Join the Patreon Community and get an access to exclusive content now!

3 thoughts on “How to use templates and macros in Apache Airflow”

  1. Hi Marc ,

    I have to use prev_execution_date_success to pass it to sql statement to pick incremental data ,
    but i am getting error prev_execution_date_success is not defined

    :-

    LAST_LOADED_DAY = “{{macros.ds_format(prev_execution_date_success, “%Y/%m/%d”)}}”
    LAST_LOADED_DAY.replace(minute=0, hour=0, second=0, microsecond=0)

    br,
    Jonu Chauhan

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top