Apache Airflow Best Practices – Part 1

Since I started creating courses a year ago, I got so many messages asking me what are the best practices in Apache Airflow. As engineer, we always seek for the best ways to apply what we learn while being constantly improving ourselves. In this series of tutorial, I would like to share with you everything I learned so far to really make Airflow shine in your data ecosystem. You will learn how to create better DAGs, how to optimise Airflow, what you should care about and much more. If you want to start mastering Airflow, you should definitely take a look my course right here: Apache Airflow: The Hands-On Guide.

PS: This tutorial will evolve in time so don’t forget to check it times to times or to subscribe to my mailing list at the bottom of the page so that you stay up to date.

DAGs correspond to your data pipelines. Following best practices with your DAGs is crucial as they will be daily used. They should be optimized, easy to understand, documented and well organized. You can quickly end up with hundreds of DAGs so don’t underestimate this part. It will save you a lot of pain and troubles. Let’s get started.

Define the clear purpose of your DAG

Before creating a DAG, you should think carefully about what do you expect from it. Here are some questions that can help:

  • What is the input?
  • What is the output?
  • When should it be triggered?
  • At which interval of time?
  • What are third parties tools it will interact with?
  • What will be the tasks? (very important)
  • Can make it simpler?

The last question is important. Always ask yourself if you really need to build a DAG to achieve what you want or is there any other solution that could be easier to build and maintain? Do not overcomplicate your data pipeline. A DAG should have a clear purpose like exporting data into your data warehouse or updating your machine learning model in production when needed.

Defining a clear purpose for your tasks

Have a clear idea of tasks you want to execute through your DAG. Try as much as possible to have one task = one job and not one task = multiple jobs. Let me give you an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# DON'T
def process_data():
    wrangling()
    cleaning()
    transforming()
    validating()

task = PythonOperator(
    task_id="my_task",
    python_callable=process_data,
    dag=dag
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# DO
t1 = PythonOperator(
    task_id="wrangling",
    python_callable=wrangling,
    dag=dag
)

t2 = PythonOperator(
    task_id="cleaning",
    python_callable=cleaning,
    dag=dag
)

t3 = PythonOperator(
    task_id="transforming",
    python_callable=transforming,
    dag=dag
)

t4 = PythonOperator(
    task_id="validating",
    python_callable=validating,
    dag=dag
)

As shown in the example above, it’s tempting to put everything into the PythonOperator but I strongly advise against it. Let’s say the function validating fails whereas wrangling, cleaning and transforming succeed. Because only one function fails, Airflow will consider the whole task as a failure. So, instead of retrying only one function validating, all the functions are retried as you put all the work into one task. That’s why you should have one task, one job, or one task one operator, so that if something fails, only this part will be executed again and not the others. This will prevent from inconsistencies in your data and will make easier to spot what fails and why. 

DAGs and context managers

In Python, you can leverage context managers to allocate and release resources precisely when you want to. The most widely used context manager is with. I made a tutorial right here about that in Airflow, so I won’t explain how it works again but let me give you an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# DON'T
dag = DAG("simple_pipe", default_args=default_args, schedule_interval="*/5 * * * *", catchup=False) as dag:

t1 = PythonOperator(
    task_id="t1",
    python_callable=my_func
    dag=dag)

t2 = PythonOperator(
    task_id="t2",
    python_callable=my_func
    dag=dag)

t3 = PythonOperator(
    task_id="t3",
    python_callable=my_func
    dag=dag)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# DO
with DAG("simple_pipe", default_args=default_args, schedule_interval="*/5 * * * *", catchup=False) as dag:
    t1 = PythonOperator(
        task_id="t1",
        python_callable=my_func)

    t2 = PythonOperator(
        task_id="t2",
        python_callable=my_func)

    t3 = PythonOperator(
        task_id="t3",
        python_callable=my_func)

The with statement makes the code cleaner by removing the needs of assigning the variable dag to each task. By doing this, we can clearly see the tasks belonging to the DAG (if you have python functions in your DAG file, things can quickly become messy) and we let the context manager dealing with the DAG life cycle. 

Default arguments

Task’s constructors accept many different arguments such as an email, number of retries, a start date, a queue and so on. I’m pretty sure you’ve already seen many DAGs with a dictionary just before the DAG object definition as shown below:

1
2
3
4
5
6
# DON'T
with DAG('dag', start_date=datetime(2019, 1, 1), schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0', retries=2, retry_delay=timedelta(minutes=5))
    t1 = DummyOperator(task_id='t1', retries=2, retry_delay=timedelta(minutes=5))
    t2 = DummyOperator(task_id='t2', retries=2, retry_delay=timedelta(minutes=5))
    t3 = DummyOperator(task_id='t3', retries=2, retry_delay=timedelta(minutes=5))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# DO
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
with DAG('dag', start_date=datetime(2019, 1, 1), default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0')
    t1 = DummyOperator(task_id='t1')
    t2 = DummyOperator(task_id='t2')
    t3 = DummyOperator(task_id='t3')

The purpose of this dictionary, usually called default_args, is to define a set of parameters common to all tasks in your DAG. That allows to avoid repeating the same arguments again and again, making your DAG clearer and less prone to errors. Define a set of default arguments with a dictionary for all of your tasks and if one task needs a specific value, overwrite that argument in the operator’s definition.

The unique identifier or DAG ID

When you instantiate a DAG object you have to specify a DAG ID. The DAG ID must be unique across all of your DAGs. You should never have two DAGs with the same DAG ID otherwise only one DAG will show up and you might get unexpected behaviours. Define a meaningful DAG ID along with a description of your DAG.

1
2
# DON'T
with DAG('dag_1', start_date=datetime(2019, 1 ,1), schedule_interval='*/10 * * * *')
1
2
3
4
5
# DO
with DAG('csv_to_datawarehouse', 
    description='Fetch data from CSV, process and load them in the data warehouse' 
    start_date=datetime(2019, 1 ,1), 
    schedule_interval='*/10 * * * *')

Think of when you will have hundreds of different DAGs. You definitely want meaningful DAG Ids and descriptions to quickly know which DAG does what. One more thing, in case you have multiple DAGs more or less related to each other, putting a common prefix for all of them can be a great idea.  

The start date

Big topic here. The way DAGs are scheduled in Airflow can be somewhat difficult to understand at first. The start_date defines the date at which your DAG will start being scheduled. One thing you have to know is that each task or operator, can have a different start_date. Yes, tasks within the same DAG can start a different dates. I strongly advise you against this. Do not define different start dates. Keep things simple and homogenous. I don’t even see a use case where you might need to do that. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# DON'T
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
with DAG('dag', start_date=datetime(2019, 1, 1), default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0', start_date=datetime(2019, 1, 15))
    t1 = DummyOperator(task_id='t1', start_date=datetime(2019, 2, 16))
    t2 = DummyOperator(task_id='t2', start_date=datetime(2019, 3, 6))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# DO
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2019, 1, 1)
}
with DAG('dag', default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0')
    t1 = DummyOperator(task_id='t1')
    t2 = DummyOperator(task_id='t2')
    t3 = DummyOperator(task_id='t3')

I will make a tutorial about the start_date as there are many things to know about that parameter which are definitely not obvious. As a best practice, define the start in the default arguments.

In addition, your start date should be static. Do not define a dynamic start date with a function like datetime.now() as it is confusing. Keep in mind that tasks are executed once the start_date + schedule_interval is passed. If the start date is set to now(), in theory, your tasks will never get executed as the date constantly moves forward.

1
2
3
4
5
6
7
8
9
# DON'T
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
    'start_date': datetime.now()
}

with DAG('dag', default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0')
1
2
3
4
5
6
7
8
9
# DO
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
    'start_date': datetime(2020, 1, 1)
}

with DAG('dag', default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0')

The catchup parameter

Airflow automatically runs non triggered DAG Runs between the latest executed DAG Run and the current date. This is really useful if for some reasons, you had to pause your DAG and want to catch up the delay. Nonetheless, you should be careful with this feature. Indeed, let’s imagine that your DAG has a start_date set to 1 year ago with a schedule interval set to 10 minutes. If you start scheduling that DAG, you will end up with thousands of DAG Runs running at the same time. This could prevent from running other tasks or slow down your Airflow instance. To avoid this, I strongly recommend you to turn off this parameter by default. Either set the catchup parameter to False in the DAG definition or in the configuration file.

catchup_by_default
In airflow.cfg
1
2
3
4
5
with DAG('csv_to_datawarehouse', 
    description='Fetch data from CSV, process and load them in the data warehouse' 
    start_date=datetime(2019, 1 ,1), 
    schedule_interval='*/10 * * * *',
    catchup=False)

Notice that you can still use the command: airflow backfill from the Airflow CLI. In my opinion, that’s the best way to catch up your non triggered DAG Runs.

The schedule interval

The schedule interval defines the interval of time at which your DAG is triggered. Airflow is NOT a data streaming solution so don’t set a schedule interval of 1 second.

I’m pretty sure you already saw the schedule_interval either defined with a CRON expression or with a timedelta object.

1
2
3
4
5
6
7
# CRON EXPRESSION
with DAG('dag', default_args=default_args, schedule_interval='*/10 * * * *', catchup=False):
    t0 = DummyOperator(task_id='t0')

# TIMEDELTA OBJECT
with DAG('dag', default_args=default_args, schedule_interval=timedelta(minutes=10), catchup=False):
    t0 = DummyOperator(task_id='t0')

Cron expressions are extremely powerful but they can be difficult to understand at first. Take a look at the following website to be sure that the schedule interval is the one you expect. 

Now, when should you use a Timedelta object instead of a Cron expression? Certain frequency intervals can’t be expressed with Cron expressions. If you want to trigger a DAG once every three days, you have to define a timedelta object with timedelta(days=3). If you try to do it with a Cron expression, at the end of the month, that DAG will be triggered the 30th or 31th, THEN the 1st of the next month, breaking the interval of 3 days.

To sum up, whenever you need to define a schedule interval in relation with the previous one (not stateless), use a timedelta object. 

Conclusion

That’s it for the first part of this series of tutorials. I hope you already learned new best practices. Obviously, there are many other Apache Airflow best practices to discover and I will share them with you at a regular basis. So bookmark this page and if you want to stay in touch, don’t forget to put your email below and start mastering Apache Airflow!

Interested by learning more? Stay tuned and get special promotions!

Liked it? Join the Patreon Community and get an access to exclusive content now!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top