airflow templates macros

Templates and Macros in Apache Airflow allow passing data to your DAGs at runtime. Imagine that you want to execute an SQL request with the execution date of your DAG. How can you do that? How can you use the DAG ID when you send notifications to know which DAG to look at? Or what if you need to know when the next DAG run will be? Well, macros and templates answer these questions. While those two concepts work well together, we will first define them separately and then, use them in combination to show you how powerful they are. We will finish this tutorial by creating a stunning data pipeline composed of a BashOperator, PythonOperator, and PostgresOperator using templates and macros. If you want to dive deeper into Airflow, here is a course I made for you.

Why Airflow templating?

Let’s start with a use case. Imagine you have DAG that extracts data from directories where their names are defined by date:

airflow folder sources

How would you implement the task that extracts the data from these directories? I’m sure we all agree the code below won’t work:

@task
def extract_data():
  date = '2023-01-01'
  extract(date)

Hardcoding the date means changing it every day, which makes no sense.

What about that instead:

from datetime import date

@task
def extract_data():
  current_date = date.today()
  extract(current_date)

It is a little better as the date isn’t hardcoded anymore but based on the current day. However, this solution has a severe limitation. What if you miss a day and want to rerun the task for that day? Are you gonna hardcode that date for this specific day? What if you missed a week? A month? You get it. We need a better solution.

Jinja templating! You will discover the solution later in the article. Stick with me 😉

What is Jinja templating?

Jinja is a Python web template engine. A template engine is a library that combines templates with data models to produce documents. Templates are widely used for creating dynamic HTML pages. Here is a template:

<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>{{ title_to_insert }}</title>
<meta name="description" content="The HTML5 Herald">
<meta name="author" content="SitePoint">
<link rel="stylesheet" href="css/styles.css?v=1.0">
</head>
<body>
<script src="js/scripts.js"></script>
</body>
</html>

And the data model:

title_to_insert="My Super Page"

Note the placeholder {{ tittle_to_insert }} in the HTML page that matches the variable title_to_insert in the data model. The two pairs of curly brackets {{}} are Jinja delimiters to tell the template engine to replace the value in between with the corresponding value in the data model. Under the hood, the engine calls render() to join the template with the data and we say that the final output is rendered.

<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>My Super Page</title>
<meta name="description" content="The HTML5 Herald">
<meta name="author" content="SitePoint">
<link rel="stylesheet" href="css/styles.css?v=1.0">
</head>
<body>
<script src="js/scripts.js"></script>
</body>
</html>

How to use Airflow templates

Now that you know what’s Jinja templating, let’s return to the use case. Remember that the goal is to extract data from directories named with dates. However, neither we can hardcode the date as we will always extract the same chunk of data or use datetime.now() as we won’t be able to retry/rerun DAG runs. Therefore, what’s the solution? How can we use the date attached to a DAG run to always extract the data from the correct directory regardless of when we rerun it? Templating!

Here is how:

@task
def extract_data(ds=None):
  current_dag_run_date = ds
  extract(current_dag_run_date)

The parameter ds is a built-in parameter that gives access to the data_interval_start/logical_date of the current DAG Run with the following format YYYY-MM-DD. To be honest, that’s not templating as we don’t use Jinja here 🥹, but this is:

def _extract_data(current_dag_run_date):
  print(current_dag_run_date)

PythonOperator(
  task_id="extract_task",
  python_callable=_extract_data,
  op_kwargs={
    "current_dag_run_date": "{{ds}}"
  }
)

This code is equivalent to the previous task without the Taskflow API (another topic). The critical part is {{ds}} in op_kwargs. That tells Airflow to replace the placeholder between the curly brackets with the corresponding value at runtime. You can check the output of this template in the Airflow UI:

airflow template rendered task

Select your task, click “Details” and “Rendered Template”. You will see your task’s template rendered with Jinja:

airflow templates

You can see from the screenshot above that ds is replaced by the actual date of the DAG run. Magic. What if we want to use another Operator like the BashOperator?

Airflow templates with scripts

Suppose we want to run a Bash script instead of a Python function. Therefore, we want to render the Bash script and that the BashOperator executes. Here is an example of a Bash script:

#!/bin/sh
echo "Extract data for the {{ ds }}"

Note the Airflow template is in the Bash script in the command echo. Here is the corresponding task in the DAG:

extract_data = BashOperator(
    task_id="extract_data",
    bash_command="script.sh",
)

If we run this task, we will see “Extract data for the 2023-10-19” on the standard output, with that date changing depending on when the DAG Run runs. That works the same with a Python script or a SQL file. Airflow templates allow you to inject DAG Run and Task Instance metadata in your tasks at runtime.

The next question is, does that work for any Operator’s parameter?

Templateable fields and scripts

Airflow templates are not available for every Operator’s parameter. To know if you can use templating or not, you must look at… the documentation 😅

Let’s retake the BashOperator. Here are the BashOperator parameters with their description:

Parameters

bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed. (templated)

env (dict[str, str] | None) – If env is not None, it must be a dict that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated)

append_env (bool) – If False(default) uses the environment variables passed in env params and does not inherit the current process environment. If True, inherits the environment variables from current passes and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it

output_encoding (str) – Output encoding of bash command

skip_on_exit_code (int | Container[int] | None) – If task exits with this exit code, leave the task in skipped state (default: 99). If set to None, any non-zero exit code will be treated as a failure.

cwd (str | None) – Working directory to execute the command in. If None (default), the command is run in a temporary directory.

If you carefully look at the descriptions, some have “(templated)” at the end. That’s what you should look for to know whether or not you can use templating for a parameter. Another way is by looking at the code:

class BashOperator(BaseOperator):
  template_fields = ('bash_command', 'env')
  template_ext = ('.sh', '.bash')

  def __init__(...):

Every Operator has those two parameters:

  • template_fields: Defines which operator parameters can use templated values.
  • template_ext: Defines which file extensions can use templated values.

By looking at the code of the BashOperator, you can use Airflow templates with scripts with .sh or .bash extensions. Or, with the bash_command and env parameters. I prefer checking the code instead of the documentation, but it is up to you.

By the way, you can make a parameter that isn’t templateable, templateable. Override the Operator like CustomBashOperator and add the parameter in the template_fields:

class CustomBashOperator(BashOperator):
  template_fields = ('bash_command', 'env', 'my_other_parameter')
  template_ext = ('.sh', '.bash')

  def __init__(...):

Airflow TemplateNotFound

This is a common error with templates. Suppose you have a script extract.sh in /my/airflow/scripts/. Then your DAG with the BashOperator is in /my/airflow/dags/. Sounds great. However, if you run this task, you will get a TemplateNotFound error.

Why?

By default, Airflow searches for templates in the directories relative to the directory where the DAG file is. Therefore, if your DAG is in /my/airflow/dags/my_dag.py and your script is in /my/airflow/scripts/extract.sh, you must use:

extract_data = BashOperator(
  task_id="extract_data",
  bash_command="scripts/extract.sh",
)

If that doesn’t work, an alternative is to define the template_searchpath DAG’s parameter that tells Airflow to look in the specified directories for templates:

with DAG(..., template_searchpath=["/my/airflow/scripts"]):

  extract_data = BashOperator(
    task_id="extract_data",
    bash_command="extract.sh",
  )

And if that still doesn’t work, then you might need to add an extract space after .sh:

with DAG(..., template_searchpath=["/my/airflow/scripts"]):

  extract_data = BashOperator(
    task_id="extract_data",
    bash_command="extract.sh ",
  )

But the last solution is specific to the BashOperator.

The bottom line is if you have template files in a directory that isn’t within the dags directory, you might need to define template_searchpath.

Built-in Template Variables and Macros

Airflow brings many variables and macros you can use in your templates.

The most commonly used are:

VariableTypeDescription
{{ data_interval_start }}pendulum.DateTimeStart of the data interval for the current DAG run
{{ data_interval_end }}pendulum.DateTimeEnd of the data interval for the current DAG run
{{ ds }}strThe DAG run’s logical date as YYYY-MM-DD. (same as data_interval_start)
{{ ds_nodash }}strSame as ds with YYYYMMDD
{{ prev_data_interval_start_success }}pendulum.DateTime | NoneStart of the data interval of the prior successful DAG run. It is helpful to prevent running the current DAG run if the previous one failed.
{{ params }}dict[str, Any]The user-defined params from the DAG object.
{{ var.value.my_var }}Access Airflow Variables

In addition, Airflow macros help to modify the output of these variables. Imagine you want to format a date differently; then you can do:

extract_data = BashOperator(
  task_id="extract_data",
  bash_command="echo 'The date is {{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}'",
)

Don’t hesitate to look at the documentation for an exhaustive list of those Airflow macros and variables.

Last but not least, var.value.my_var allows you to access Airflow variables in your tasks. If you don’t know what Airflow variables are, check out the tutorial here.

Airflow DAG with Templates and Macros

Finally, here is a DAG mixing everything you’ve learned so far:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

class CustomPostgresOperator(PostgresOperator):
  template_fields = ('sql', 'parameters')

templated_log_dir = """{{ var.value.source_path }}/data/{{ macros.ds_format(ts_nodash, "%Y%m%dT%H%M%S", "%Y-%m-%d-%H-%M") }}"""

with DAG("my_dag", start_date=datetime(2023, 1, 1), schedule="@daily", template_searchpath=["/my/path/scripts", "/my/path/sql"]):

  t1 = BashOperator(
         task_id="generate_new_logs",
         bash_command="generate_new_logs.sh ",
  )

  t2 = BashOperator(
         task_id="logs_exist",
         bash_command=f"test -f {{{{templated_log_dir}}}}/log.csv",
  )

  t3 = PythonOperator(
         task_id="process_logs",
         python_callable=_process_logs,
         op_args=[templated_log_dir + "log.csv"]
  )

  t4 = CustomPostgresOperator(
         task_id="save_logs",
         sql="insert_log.sql",
         parameters={
           'log_dir': templated_log_dir + '/processed_log.csv'}
  )

  t1 >> t2 >> t3 >> t4

Let me give you a quick explanation

  1. First, we create a variable templated_log_dir with an Airflow variable source_path. Then, with the macro ds_format, we change the output format of ts_nodash. Once rendered, the path will look like that: my_var_value/data/2023-01-01-11-59/
  2. T1 generates fake logs using the BashOperator that calls the script generate_new_logs.sh. This script creates a file with logs at the following location: my_var_value/data/2023-01-01-11-59/log.csv.
  3. T2 checks that “log.csv” exists by running a bash command. Note the use of four pairs of curly brackets. That’s because we use a template within a f-string and need to espace the curly brackets for Jinja. More information here.
  4. T3 executes a Python script “process_log.py” to process and clean “log.csv”. We give the path to the file through the op_args parameter with is templateable.
  5. Finally, T4 creates a table to load the processed logs in Postgres. Note we extend the PostgresOperator with CustomPostgresOperator to override the template_fields variable and make parameters templateable as it isn’t by default.

Conclusion

That’s it about Airflow templates and macros. I hope you enjoyed this tutorial. Those concepts are essential, you will almost always use templates in your DAGs. If you begin with Airflow or an advanced user, I made the following courses available at a discount to dive deeper into this fantastic tool!

See you!

Leave a Reply

Your email address will not be published. Required fields are marked *