How to use the DockerOperator in Apache Airflow

Wondering how to use the DockerOperator in Apache Airflow to kick off a docker and run commands? Let’s discover this operator through a practical example. In a more and more containerized world, it can be very useful to know how to interact with your Docker containers through Apache Airflow. In this article, we are going to learn how to use the DockerOperator in Airflow through a practical example using Spark. We will configure the operator, pass runtime data to it using templating and execute commands in order to start a Spark job from the container.

The first question you may ask yourself is what actually does the DockerOperator. According to the description from the documentation, the DockerOperator allows you to execute a command inside a Docker container. In order to illustrate the most simple use case, let’s start with the following DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator

default_args = {
        'owner'                 : 'airflow',
        'description'           : 'Use of the DockerOperator',
        'depend_on_past'        : False,
        'start_date'            : datetime(2018, 1, 3),
        'email_on_failure'      : False,
        'email_on_retry'        : False,
        'retries'               : 1,
        'retry_delay'           : timedelta(minutes=5)
}

with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
        t1 = BashOperator(
                task_id='print_current_date',
                bash_command='date'
        )

        t2 = DockerOperator(
                task_id='docker_command',
                image='centos:latest',
                api_version='auto',
                auto_remove=True,
                command="/bin/sleep 30",
                docker_url="unix://var/run/docker.sock",
                network_mode="bridge"
        )

        t3 = BashOperator(
                task_id='print_hello',
                bash_command='echo "hello world"'
        )

        t1 >> t2 >> t3

This DAG is composed of three tasks, t1, t2 and t3. Tasks t1 and t3 use the BashOperator in order to execute bash commands on the host, not in the Docker container. The last task t2, uses the DockerOperator in order to execute a command inside a Docker container.

Let’s focus on t2 and the most commonly used parameters in order to configure the DockerOperator.
  • task_id: Like any other operators in Airflow, you must specify a unique task id for the given task.
  • image: The Docker image from which the container will be created. You can specify the name of the image as well as the version separated by “:”. Notice that if the image does not exist, it will be downloaded which will increase the execution time of the task. If you have a SLA, keep this in mind.
  • api_version: It corresponds to the remote API version of the server having the Docker daemon. Set it to “auto” to let Airflow automatically detects the server’s version.
  • auto_remove: Allows to remove the Docker container as soon as the task is finished.
  • command: The command that you want to execute inside the Docker container.
  • docker_url: Corresponds to the url of the host running  the Docker daemon. By default it is set to unix://var/run/docker.sock
  • network_mode: Defines the network mode of the Docker container. For more information, take a look at Docker documentation. Basically, if your containers run on the same computer, you will use bridge. Otherwise, if you have multiple Docker daemon hosts you will use overlay.

Now you know the basics, let me show you what this DAG does in this short video:

 

If you are interested by learning more about Airflow, do not hesitate to take a look at my course: “The Complete Hands-On Course to Master Apache Airflow” 

Templating with the DockerOperator in Airflow

As you may already know, you can pass runtime data to your operators by using macros and the Jinja templating engine. For example, suppose that you would like to send environment variables or pass the execution date of the task to your Docker container, templated parameters allow you to do that. There are actually many predefined macros and variables in Airflow that you can find by looking at the documentation

About the DockerOperator, two parameters can be templated.

  • command: A string representing a bash command with the execution date of the task for example. Think about a Spark job that save data into a database where a column date could be filled with that value.
  • environment: A dictionary of environment variables to set in the container that will be available to your underlying program. This parameter can be very useful to defined your configuration settings.

Let’s modify the DockerOperator of our DAG so that we can benefit from these templated parameters.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator

default_args = {
        'owner'                 : 'airflow',
        'description'           : 'Use of the DockerOperator',
        'depend_on_past'        : False,
        'start_date'            : datetime(2018, 1, 3),
        'email_on_failure'      : False,
        'email_on_retry'        : False,
        'retries'               : 1,
        'retry_delay'           : timedelta(minutes=5)
}

with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
        t1 = BashOperator(
                task_id='print_current_date',
                bash_command='date'
        )

        t2 = DockerOperator(
                task_id='docker_command',
                image='centos:latest',
                api_version='auto',
                auto_remove=True,
                environment={
                        'AF_EXECUTION_DATE': "{{ ds }}",
                        'AF_OWNER': "{{ task.owner }}"
                },
                command='/bin/bash -c \'echo "TASK ID (from macros): {{ task.task_id }} - EXECUTION DATE (from env vars): $AF_EXECUTION_DATE"\'',
                docker_url='unix://var/run/docker.sock',
                network_mode='bridge'
        )

        t3 = BashOperator(
                task_id='print_hello',
                bash_command='echo "hello world"'
        )

        t1 >> t2 >> t3

If you trigger this DAG, you will get the following output from the View Log interface of the task docker_command:

airflow_docker_operator

The log line encircled in red corresponds to the output of the command defined in the DockerOperator. This command basically prints out the task id of t2 that we get using {{ task.task_id }}, as well as its execution date using the environment parameter with the variable AF_EXECUTION_DATE sets to the value of {{ ds }}. Again, I strongly encourage you to take a look at the documentation if you don’t know what are these predefined variables.

An end-to-end pipeline with DockerOperator and Spark

In this last part of the post, I’m going to show you a real data pipeline which does the following steps:

  1. Check if the repo https://github.com/marclamberti/simple-app exists on the host (where the airflow task is executed). This repo contains a script named SimpleApp.py which is a self contained Spark application that gonna count the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README.
  2. If the repo exists, a task pulls it to stay up to date.
  3. If not, the repo is cloned on the host machine and the same task of the previous step pulls the repo.
  4. Finally, a last task runs a Docker container with Spark installed in order to kick off a Spark job corresponding to the execution of the script SimpleApp.py

Without further waiting, here is the new version of our DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

import os.path

default_args = {
        'owner'                 : 'airflow',
        'description'           : 'Use of the DockerOperator',
        'depend_on_past'        : False,
        'start_date'            : datetime(2018, 1, 3),
        'email_on_failure'      : False,
        'email_on_retry'        : False,
        'retries'               : 1,
        'retry_delay'           : timedelta(minutes=5)
}

def checkIfRepoAlreadyCloned():
        if os.path.exists('/home/airflow/simple-app/.git'):
                return 'dummy'
        return 'git_clone'

with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:

        t_git_clone = BashOperator(
                task_id='git_clone',
                bash_command='git clone https://github.com/marclamberti/simple-app.git /home/airflow/simple-app'
        )

        # Notice the trigger_rule sets to one_success
        # Why?
        # By default your tasks are set to all_success, so all parents must succeed for the task to be triggered
        # Here t_git_pull depends on either t_git_clone or t_dummy
        # By default if one these tasks is skipped then its downstream tasks will be skipped as well since the trigger_rule is set to all_succeed and so invalidate the task.
        # With one_success, t_git_pull will not be skipped since it now needs only either dummy or git_clone to succeed.
        t_git_pull = BashOperator(
                task_id='git_pull',
                bash_command='cd /home/airflow/simple-app && git pull',
                trigger_rule='one_success'
        )

        t_check_repo = BranchPythonOperator(
                task_id='is_repo_exists',
                python_callable=checkIfRepoAlreadyCloned
        )

        t_check_repo >> t_git_clone
        t_check_repo >> t_dummy >> t_git_pull

        t_git_clone >> t_git_pull

        t_docker = DockerOperator(
                task_id='docker_command',
                image='bde2020/spark-master:latest',
                api_version='auto',
                auto_remove=True,
                environment={
                        'PYSPARK_PYTHON': "python3",
                        'SPARK_HOME': "/spark"
                },
                volumes=['/home/airflow/simple-app:/simple-app'],
                command='/spark/bin/spark-submit --master local[*] /simple-app/SimpleApp.py',
                docker_url='unix://var/run/docker.sock',
                network_mode='bridge'
        )

        t_git_pull >> t_docker

You can divide the DAG in two parts. The first one is where the BranchPythonOperator is used in order to select one branch or another according to whether or not the repository exists. The second one is where the DockerOperator is used in order to start a Docker container with Spark and kick off a Spark job using the SimpleApp.py file. 

By the way, I’m not gonna explain here what does the BranchPythonOperator and why there is a dummy task, but if you are interested by learning more about Airflow, feel free to check my course right there.

Notice the environment and the volumes parameters in the DockerOperator. In this example, the environment variables set are gonna be used by Spark inside the Docker container. The variable PYSPARK_PYTHON is defined to use Python3 as the default interpreter of PySpark and the variable SPARK_HOME contains the path where the script SimpleApp.py must go to fetch the file README.md.

The volumes parameter contains the mapping between the host (“/home/airflow/simple-app”) and the Docker container (“/simple-app”) in order to have an access of the cloned repository and so the SimpleApp.py script.

Running the DAG

If you trigger this DAG you should end up with the following Graph if the repository simple-app has not been yet cloned:

airflow_docker_operator_spark_graph

All the tasks have succeeded. Don’t hesitate to take a look at the output of the Docker container, by click on the task “docker_command”, View log, and check if the Spark job did what we expected by giving you the number of lines containing the letter ‘a’ or ‘b’ as shown below:

airflow_docker_operator_output

Conclusion

In this tutorial we have first seen how to use the DockerOperator by using its commonly parameters. Then, we have taken advantage of its templated parameters such as command and environment in order to pass runtime data to our Docker container. Finally, we have made an end-to-end data pipeline that pulls a Spark application from a Github repository if it exists, otherwise it clones it (by using the BranchPythonOperator) , runs a Docker container, and kicks off a Spark job. 

So that’s it for this tutorial, I hope you learned exciting things in this post and see for the next one 🙂

Interested by learning more? Stay tuned and get special promotions!

Leave a Comment

Your email address will not be published. Required fields are marked *