Airflow DAG: Creating your first DAG in 5 minutes

Looking for creating your first Airflow DAG? Wondering how to process your data in Airflow? What are the steps to code your own data pipelines? You’ve come to the right place! At the end of this short tutorial, you will be able to code your first Airflow DAG! You might think it’s hard to start with Apache Airflow but it is not. The truth is, Airflow is so powerful that the possibilities it brings can be overwhelming. Don’t worry, you are going to discover only what you need to get started now! Before jumping in, if you are looking for a solid and more complete introduction to Airflow, check my course here, you will enjoy it πŸ˜€ . Ready? You’re sure? Let’s go!

Use Case

As usual, the best way to understand a feature/concept is to have a use case. Let’s say, you have the following data pipeline in mind:

Data pipeline

Your goal is to train 3 different machine learning models, then choose the best one and execute either accurate or inaccurate based on the accuracy of the best model. You could even store the value in a database, but let’s keep things simple for now. Now, everything is clear in your head, the first question comes up:

How can I create an Airflow DAG representing my data pipeline?

Well, this is exactly what you are about to find out now!

Airflow DAG? Operators? Terminologies

Before jumping into the code, you need to get used to some terminologies first. I know, the boring part, but stay with me, it is important.

First and foremost,

What is a DAG?

DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges and no cycles. Basically, this:

Valid Airflow DAG
DAG

is a DAG.

But this:

Invalid Airflow DAG
Invalid DAG

is NOT a DAG. Why? Because there is a cycle. As Node A depends on Node C which it turn, depends on Node B and itself on Node A, this DAG (which is not) won’t run at all. A DAG has no cycles, never. Last but not least, a DAG is a data pipeline in Apache Airflow. So, whenever you read “DAG”, it means “data pipeline”. Last but not least, when a DAG is triggered, a DAGRun is created. A DAGRun is an instance of your DAG with an execution date in Airflow.

Ok, once you know what is a DAG, the next question is, what is a “Node” in the context of Airflow?

What is Airflow Operator?

In an Airflow DAG, nodes are operators. In other words, a task in your DAG is an operator. An Operator is a class encapsulating the logic of what you want to achieve. For example, you want to execute a python function, you will use the PythonOperator. You want to execute a Bash command, you will use the BashOperator. Airflow brings a ton of operators that you can find here and here. When an operator is triggered, it becomes a task, and more specifically, a task instance. An example of operators:

from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
# The DummyOperator is a task and does nothing   
accurate = DummyOperator(
task_id='accurate'
)
# The BashOperator is a task to execute a bash command
commands = BashOperator(
task_id='commands'
bash_command='sleep 5'
)

As you can see, an Operator has some arguments. The first one is the task_id. The task_id is the unique identifier of the operator in the DAG. Each Operator must have a unique task_id. The other arguments to fill in depend on the operator used. For example, with the BashOperator, you have to pass the bash command to execute. With the DummyOperator, there is nothing else to specify. At the end, to know what arguments your Operator needs, the documentation is your friend.

You know what is a DAG and what is an Operator. Time to know how to create the directed edges, or in other words, the dependencies between tasks.

Dependencies?

As you learned, a DAG has directed edges. Those directed edges are the dependencies in an Airflow DAG between all of your operators/tasks. Basically, if you want to say “Task A is executed before Task B”, you have to defined the corresponding dependency. How?

task_a >> task_b
# Or
task_b << task_a

The >> and << respectively mean “right bitshift” and “left bitshift” or “set downstream task” and “set upstream task”. In the example, on the first line we say that task_b is a downstream task to task_a. On the second line we say that task_a is an upstream task of task_b. Don’t worry, we will come back at dependencies.

All right, now you got the terminologies, time to dive into the code! Adios boring part πŸ‘‹

Coding your first Airflow DAG

There are 4 steps to follow to create a data pipeline. Don’t forget, your goal is to code the following DAG:

Data pipeline

Without further do, let’s begin!

Step 1: Make the Imports

The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import. For example, you want to execute a Python function, you have to import the PythonOperator. You want to execute a bash command, you have to import the BashOperator. Finally, the last import is usually the datetime class as you need to specify a start date to your DAG.

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

Step 2: Create the Airflow DAG object

After having made the imports, the second step is to create the Airflow DAG object. A DAG object must have two parameters, a dag_id and a start_date. The dag_id is the unique identifier of the DAG across all of DAGs. Each DAG must have a unique dag_id. The start_date defines the date at which your DAG starts being scheduled. Warning here,

If the start_date is set in the past, the scheduler will try to backfill all the non-triggered DAG Runs between the start_date and the current date. For example, if your start_date is defined with a date 3 years ago, you might end up with many DAG Runs running at the same time.

In addition those arguments, 2 others are usually specified. The schedule_interval and the catchup arguments.

The schedule_interval defines the interval of time at which your DAG gets triggered. Every 10 mins, every day, every month and so on. 2 ways to define it, either with a CRON expression or with a timedelta object. The first option is the most often used. By the way, if you don’t know how to define a CRON expression, take a look at this beautiful website and if you don’t know what a CRON expression is, keep in mind that it is way to express time intervals.

Lastly, the catchup argument allows you to prevent from backfilling automatically the non triggered DAG Runs between the start date of your DAG and the current date. If you want don’t want to end up with many DAG runs running at the same time, it’s usually a best practice to set it to False.

That’s it, no more arguments and here is the corresponding code,

with DAG("my_dag", # Dag id
start_date=datetime(2021, 1 ,1), # start date, the 1st of January 2021 
schedule_interval='@daily',  # Cron expression, here it is a preset of Airflow, @daily means once every day.
catchup=False  # Catchup 
) as dag:

Notice that to create an instance of a DAG, we use the with statement. Why? Because “with” is a context manager and allows you to better manager objects. In that case, a DAG object. I won’t go into the details here but I advise you to instantiate your DAGs like that. It’s clearer and better than creating a variable and put your DAG into.

Step 3: Add your tasks!

Once you have made the imports and created your DAG object, you are ready to add your tasks! Remember, a task is an operator. Therefore, based on your DAG, you have to add 6 operators. Let’s dive into the tasks.

Training model tasks

First, training model A, B and C, are implemented with the PythonOperator. Since we are not going to train real machine learning models (too complicated to start), each task will return a random accuracy. This accuracy will be generated from a python function named _training_model.

from random import randint # Import to generate random numbers
def _training_model():
return randint(1, 10) # return an integer between 1 - 10
with DAG(...) as dag:
# Tasks are implemented under the dag object
training_model_A = PythonOperator(
task_id="training_model_A",
python_callable=_training_model
)
training_model_B = PythonOperator(
task_id="training_model_B",
python_callable=_training_model
)
training_model_C = PythonOperator(
task_id="training_model_C",
python_callable=_training_model
)

That’s great but you can do better. Indeed, the 3 tasks are really similar. The only difference lies into the task ids. Therefore, since DAGs are coded in Python, we can benefit from that and generate the tasks dynamically. Take a look at the code below

    training_model_tasks = [
PythonOperator(
task_id=f"training_model_{model_id}",
python_callable=_training_model,
op_kwargs={
"model": model_id
}
) for model_id in ['A', 'B', 'C']
]

By defining a list comprehension, we are able to generate the 3 tasks dynamically which is…. much cleaner. Less code, the better 😎

If you are wondering how the PythonOperator works, take a look at my article here, you will learn everything you need about it.

Choosing best model

The next task is “Choosing Best ML”. Since this task executes either the task “accurate” or “inaccurate” based on the best accuracy, the BranchPythonOperator looks like to be the perfect candidate for that. It allows you to execute one task or another based on a condition, a value, a criterion. If you want to learn more about it, take a look here. The BranchPythonOperator is one of the most commonly used Operator, so don’t miss it.

How to implement it? Here is the answer,

def _choosing_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
if max(accuracies) > 8:
return 'accurate'
return 'inaccurate'
with DAG(...) as dag:
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model
)

Ok, it looks a little bit more complicated here. Let’s start by the beginning. First, the BranchPythonOperator executes a python function. Here, _choosing_best_model. This function must return the task id of the next task to execute. For your DAG, either “accurate” or “inaccurate” as shown from the return keywords. Now, there is something we didn’t talk about yet. What is xcom_pull? 😱

Whenever you want to share data between tasks in Airflow, you have to use XCOMs. XCOM stands for cross-communication messages, it is a mechanism allowing to exchange small data between the tasks of a DAG. A XCOM is an object encapsulating a key, serving as an identifier, and a value, corresponding to the value you want to share. I won’t go into the details here as I made a long article about it, just keep in mind that by returning the accuracy from the python function _training_model_X, we create a XCOM with that accuracy, and with xcom_pull in _choosing_best_model, we fetch that XCOM back corresponding to the accuracy. As we want the accuracy of each training_model task, we specify the task ids of these 3 tasks. That’s all you need to know πŸ˜ƒ

Accurate or inaccurate?

The last two tasks to implements are “accurate” and “inaccurate”. To do that, you can use the BashOperator and execute a very simple bash command to either print “accurate” or “inaccurate” on the standard output.

    accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command=" echo 'inaccurate'"
)

That’s it, nothing more to add. The BashOperator is used to execute bash commands and that’s exactly what you’re doing here.

All right, that was a lot, time to move to the last step!

Step 4: Defining dependencies

Now you’ve implemented all of the tasks, the last step is to put the glue between them or in other words, to define the dependencies between them. How? By using bitshift operators.

In your case, it’s really basic as you want to execute one task after the other.

 with DAG(...) as dag:
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

Here you say that training_model_tasks are executed first, then once all of the tasks are completed, choosing_best_model gets executed, and finally, either accurate or inaccurate. Keep in mind that each time you have multiple tasks that should be on the same level, in a same group, that can be executed at the same time, use a list with [ ].

The Final Airflow DAG!

Ok, now you’ve gone through all the steps, time to see the final code:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from random import randint
def _choosing_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
if max(accuracies) > 8:
return 'accurate'
return 'inaccurate'
def _training_model(model):
return randint(1, 10)
with DAG("my_dag",
start_date=datetime(2021, 1 ,1), 
schedule_interval='@daily', 
catchup=False) as dag:
training_model_tasks = [
PythonOperator(
task_id=f"training_model_{model_id}",
python_callable=_training_model,
op_kwargs={
"model": model_id
}
) for model_id in ['A', 'B', 'C']
]
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model
)
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command=" echo 'inaccurate'"
)
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

That’s it you’ve just created your first Airflow DAG! If you want to test it, put that code into a file my_dag.py and put that file into the folder dags/ of Airflow. Once you’ve done that, run it from the UI and you should obtain the following output:

Graph View of my_dag

Creating your first DAG in action!

Conclusion

That’s it about creating your first Airflow DAG. It wasn’t too difficult isn’t it? You’ve learned how to create a DAG, generate tasks dynamically, choose one task or another with the BranchPythonOperator, share data between tasks and define dependencies with bitshift operators. If you want to learn more about Apache Airflow, check my course here, have a wonderful day and see you for another tutorial! πŸ‘‹

2 thoughts on “Airflow DAG: Creating your first DAG in 5 minutes”

  1. Thank you for sharing this information. I am learning the XCom concept. Do you not need to push the values into the XCom in order to later pull it in _choosing_best_model?

  2. You send argument to _training_model function but not use it:
    def _training_model(model):
    return randint(1, 10)
    Should return model right though it is not an integer

Leave a Comment

Your email address will not be published. Required fields are marked *