Apache Airflow with Kubernetes Executor and MiniKube

In this tutorial, we are going to see how to use Apache Airflow with Kubernetes Executor. If you are using Airflow in production, there is a big chance that your workload fluctuates over time. Sometimes you have many taks to execute and sometimes not at all. The problem is in both cases, your resources stay allocated to your Airflow cluster and cannot be assigned to another tool since your cluster is “static”. Wasting resources is expensive in terms of time and money. Kubernetes Executor actually addresses this problem among others so let’s discover this exciting new executor.

Before starting to dive into the Kubernetes Executor, let me first give you a quick reminder about Kubernetes.

So what is Kubernetes? Kubernetes is an open-source platform for managing containerized applications. It orchestrates computing, networking and storage infrastructure and offers very nice features such as deployment, scaling, load balancing, logging and monitoring. In very simple terms, Kubernetes orchestrates the different containers composing your application so that they can work smoothly together.

A very important concept to understand in Kubertenes is the concept of Pod. A Pod is the smallest deployable object in Kubernetes.It encapsulates an application’s container (or multiple containers) as well as storage resources (shared volumes), a unique network IP, and options to set how the container(s) should run. Basically, in the most common Kubertenes use case (and in the case of Airflow), a Pod will run a single Docker container corresponding to a component of your application.

In the context of Airflow and Kubernetes Executors, you can think of Kubernetes as a pool of ressources giving a simple but powerful API to dynamically launch complex deployments.

Now your memories about Kubenertes are fresh let’s move on Airflow executors.

A quick reminder about Airflow executors

Basically, an executor defines how your tasks are going to be executed. A task corresponds to a node in your DAG where an action must be done such as, executing a bash shell command, a python script, kick off a spark job and so on. Before getting executed, a task is always scheduled first and pushed into a queue implemented as an OrderedDict in order to keep them sorted by their addition order. Then, according to the executor used, the execution of the task will differ. 

Apache Airflow gives you 5 type of executors:

  • SequentialExecutor which is the most simple one to execute your tasks in a sequential manner. Recommended for debugging and testing only.
  • LocalExecutor which runs multiple subprocesses to execute your tasks concurrently on the same host. Scale quite well (vertical scaling) and can be used in production.
  • CeleryExecutor allows you to horizontally scale your Airflow cluster. You basically run your tasks on multiple nodes (airflow workers) and each task is first queued through the use of a RabbitMQ for example. All the distribution is managed by Celery. It is the recommended way to go in production since you will be able to absorb the workload you need.
  • DaskExecutor, Dask in another Python distributed task processing system like Celery. It’s up to you to choose either Dask or Celery according to the framework fitting  the most your needs.
  • KubernetesExecutor, which is quite new and allows you to run your tasks using Kubernetes and so makes your Airflow cluster elastic to your workload in order to avoid wasting your precious ressources. By the way, if you want learn more about Airflow and have a special promotion, just click right here.

Introducing Apache Airflow with Kubernetes Executor

The biggest issue that Apache Airflow with Kubernetes Executor solves is the dynamic resource allocation. Before the Kubernetes Executor, all previous Airflow solutions involved static clusters of workers and so you had to determine ahead of time what size cluster you want to use according to your possible workloads. This ended up by over-provisioning or under-provisioning your cluster and so, either wasting your resources or degrading your performances. Also, Airflow workers required to be set up with all dependencies in them to deal with the variety of jobs you may have. Finally, keep in mind that by using Celery Executors you will have to set up multiple technologies (Celery, Redis, Flower etc) that you will need to understand, manage and monitor.

The great benefits of using Apache Airflow with Kubernetes Executor

We can sum up the advantages of using Airflow with Kubernetes in 4 points:

  • High level of elasticity, your Airflow cluster is dynamic and not static anymore. It expands and shrinks according to your workload, no more idle nodes. 
  • Task-level pod configuration, since the Kubernetes Executor creates a new pod for every task instance, you can precisely specify for each task the ressources needed (cpu / memory as well as the docker image – dependencies).
  • Fault tolerance, a task will not crash your entire Airflow worker as it is isolated in a POD. Also, if the scheduler goes down, by leveraging the “resourceVersion” feature of Kubernetes, when the scheduler goes back up we are able to restart from the state we were.
  • Simpler deployment, as you can specify all of your deployment into a single YAML file. Your dependencies are also offload to containers.

Distribute your DAGs in Apache Airflow with Kubernetes Executor

You may wonder how can you distribute your DAGs into your cluster since Airflow uses only one dag folder. Well, with the Kubernetes Executor, you have basically three ways of making your DAGs available:

  • Git clone with init container for each pod (Git-init mode)
  • Mount volume with DAGs (persistent volume mode)
  • Ensure the image already contains the DAG code (“pre-baked” mode)

Notice that the Git-init mode and “pre-baked” mode are recommended for development and small instances of Airflow (< 1000 jobs) because it does not involve any distributed file systems. Whereas the persistent volume mode is recommended for large DAG folders.

Ok, now you know the theory, let’s move to the practice part and discover how to use Apache Airflow with the Kubernetes Executor.

Installing MiniKube and Apache Airflow with Kubernetes Executor

In this part of the tutorial we are going to set up different tools such as Kubernetes and Airflow. Since working with Kubernetes can be quite tedious as there are many concepts to grasp, we will also use scripts given from the Airflow repository to make the installation process easier.

To set up our development environment, we need a version of Kubernetes being able to run on a single node. As you may already know, many distributions such as MiniKube, MicroK8s or K3s allow you to make your own experiments in a more or less easy way. In our case, we gonna work with MiniKube since it is a very mature solution and is widely used for testing purpose. Let’s get started!

Installing MiniKube

First, we have to clone the official Airflow repository which contains all the scripts and files required to install MiniKube and Airflow running with Kubernetes Executor. So let’s execute the following command in your terminal:

1
git clone https://github.com/apache/airflow.git

Once the download is done, we need to set two environment variables in our shell to specify which version of MiniKube and Kubernetes we want to use. you can simply install MiniKube by executing the script named start_minikube.sh as shown by the command below.

1
2
export KUBERNETES_VERSION=v1.14.0
export MINIKUBE_VERSION=v1.1.1                                     

Then, execute the script named start_minikube.sh as shown by the command below in order to install MiniKube.

1
./airflow/scripts/ci/kubernetes/minikube/start_minikube.sh  
Warning

You may end up with the following error: sudo: minikube: command not found. Why? because when the script tries to run MiniKube, it uses the sudo command and depending on your system configuration, your PATH environment variable can be overridden by a secure_path specified in the /etc/sudoers file. To fix this error (for testing purpose), you can edit the file with the command sudo visudo and put a # at the beginning of the line starting by Default secure_path. Then restart the script.

The script takes some times before finishing so seat back and relax. Also, pay attention to the pre-flight checks where you can have multiple warning or even errors as shown by the following screenshot.

output_minikube_error

For example, you must have at least 2 available CPUs in order to use MiniKube otherwise it won’t work. Your firewall, if activated, must have the ports 8443 and 10250 open. It is advised to disable swap as it is not supported with the command swapoff -a, and so on. I strongly recommend you to check and fix if needed each warning but unfortunately, I can’t give you the solution to fix each of them as it really depends on your operating system.

In the beautiful case where you don’t get any errors, you should have an output that looks like this:

output_minikube_good
Warning

You may end up with the following error: The connection to the server localhost:8080 was refused - did you specify the right host or port? In order to fix this, you need to execute the following commands from your terminal:
- sudo cp /etc/kubernetes/admin.conf $HOME/
- sudo chown $(id -u):$(id -g) $HOME/admin.conf
- export KUBECONFIG=$HOME/admin.conf
And it should work 🙂

Finally, we check that our local Kubernetes cluster is running as expected.

1
2
kubectl get nodes
sudo minikube status                                
output_minikube_check

Installing Apache Airflow with Kubernetes Executor using Docker and MiniKube

Once we get MiniKube running, it’s now time to move on building our Docker image. To do so, we just need to execute the script ./docker/build.sh and it will build the Docker image and push it to the local registry.

1
2
cd airflow/scripts/ci/kubernetes/
./docker/build/sh                                                

This process will take some times to finish as it will download and update many dependencies. I strongly encourage you to take a look at the different scripts contained in the docker folder. At the end of the installation process you should get the following output telling you that the Airflow image has been well added to the local registry.

output_docker_build

The last script we need to exeucte is deploy.sh. This script simply deploys the Airflow’s image on our MiniKube cluster and makes the Airflow’s UI available from our web browser.

1
2
cd airflow/scripts/ci/kubernetes/
./kube/deploy.sh -d persistent_mode                                                

Notice that we specify the option -d persistent_mode in order to create and mount a persistent volume. This makes available our DAGs to Airflow and ensure that they won’t be lost if a pod is restarted. You can also use the option git_mode so that a git pull of the DAGs repository CONFIGMAP_GIT_REPO in the script ./kube/deploy.sh. will be performed and used throughout the lifecycle of the pods.

output_script_deploy

At this time you have a fully functionnal instance of Apache Airflow with Kubernetes Executor.  By the way, take a look at the persistent volumes associated to our MiniKube cluster and check that you get the same airflow-* volumes as we are going to interact with them.

1
kubectl get pv                                                       

You should end up with the following result.

output_kubectl_pv_minikube

So, everything is set up, let’s move on the funny part 🙂

Apache Airflow with Kubernetes Executor: Practice!

It’s time to discover the Airflow UI. If you are following this tutorial on a VM with CentOS, you will have to bind the port 30809 with your host machine in the VM’s settings. Otherwise, you can directly use your web browser and go to the url http://localhost:30809 in order to obtain the following page

By default the login and password are both set to airflow. Once entered, click on Sign In to arrive on the DAGs page.

airflow_from_minikube_ui_dags

As you can see, Airflow brings with multiple dag examples allowing you to discover how some operators work and interact to each others. Among those DAGs, we gonna particularly focus on the one named example_kubernetes_executor. As it name implies, it gives an example of how can we benefit from Apache Airflow with Kubernetes Executor. Basically, the DAG is composed of four tasks using the PythonOperator. Each task shows an example of what it is possible to do with the KubernetesExecutor such as pulling a special image or limiting the resources used. 

Before running the DAG, let’s modify some of its tasks. First, go back to your terminal and and list the running pods.

1
kubectl get pods                                                       
output_pods_minikube

Our Apache Airflow instance is supported by two pods. The postres-airflow-* pod corresponds the PostgreSQL metadatabase and the airflow-* pod corresponds to both the web server and the scheduler running in their own container. Next, we execute an interactive bash shell against the airflow-* pod and we specify we want to access to the web server container.

1
kubectl exec -it airflow-7bdfd78b9c-4n5w5 --container webserver -- /bin/bash

After executing this command you should be logged under the root user to the web server. In order to make your life easier for editing the DAGs, I recommend you to install vim with the following command

1
apt-get install -y vim                                                  

Once Vim is installed, you can directly edit the dag example_kubernetes_executor under the path /root/airflow/dags. Pay attention to the fact that this path actually corresponds to the persistent volume mounted earlier (airflow-dags). This is where the DAG files are stored.

1
vim /root/airflow/dags/example_kubernetes_executor.py                                                  

Modify the code from this

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def use_zip_binary():
    rc = os.system("zip")
    assert rc == 0


# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
    task_id="start_task", python_callable=print_stuff, dag=dag
)

# But you can if you want to
one_task = PythonOperator(
    task_id="one_task", python_callable=print_stuff, dag=dag,
    executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}}
)

# Use the zip binary, which is only found in this special docker image
two_task = PythonOperator(
    task_id="two_task", python_callable=use_zip_binary, dag=dag,
    executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
)

to this

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def use_airflow_binary():
    rc = os.system("airflow -h")
    assert rc == 0


# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
    task_id="start_task", python_callable=print_stuff, dag=dag
)

# But you can if you want to
one_task = PythonOperator(
    task_id="one_task", python_callable=print_stuff, dag=dag,
    executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)

# Use the airflow -h binary
two_task = PythonOperator(
    task_id="two_task", python_callable=use_airflow_binary, dag=dag,
    executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)

You may ask, what actually does this code? Well it’s fairly easy. You have three tasks here (we will see the last one later). All using the PythonOperator to execute python callable functions, either print_stuff or use_vim_binary. The interesting part here is actually the parameter executor_config. As you can see, we tell the operator that we are going to use the KubernetesExecutor with a special Docker image for task one and two. It means that when PODs get created, they are going to first pull their docker image according to this parameter. Why it so nice? Because you can use an image having only the required dependencies to execute one task and not every dependencies for all tasks of your DAG. This allows to avoid conflicts and makes updates easier. 

A very important point to keep in mind is when you specify a Docker image to use like we did in the previous tasks. This Docker image must have Airflow installed otherwise it won’t work. Like with the Celery Executor, Airflow/Celery must be installed in the worker node. Think of a worker node as being a POD in the context of Kubernetes ExecutorThat being said, let’s move on.

Type Ctrl+D to close the shell session and exit the container. Now let’s go back to the user interface of Airflow and click on the DAG example_kubernetes_executor.py. From there, schedule the DAG by turning ON the toggle and trigger it manually by clicking on “Trigger DAG” as shown below.

airflow_from_minikube_ui_example_k8s_new

Once it is running, go back to the terminal. Type the following command to watch the lifecycle of the PODs executing the tasks.

1
kubectl get --watch pods                                                

As the Docker images specified in the tasks don’t exist yet on your computer, it may take time to actually finish executing the DAG since they must be downloaded first. After some times you should PODs popping as shown below.

And if you go back to the Airflow UI, then click on the example_kubernetes_executor and go to “Graph View”. After a while, you should get the following screen:

apache_airflow_with_kubernetes_executor_output

Well done! Our tasks are being executed using the Kubernetes Executor as expected. Each time a task starts, a pod is created and then terminated when the task has finished executing.

Kubernetes Executor and Resource Definition

Before ending this tutorial, let me point out the task named “three task” in the DAG example_kubernetes_executor.py.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
affinity = {
    'podAntiAffinity': {
        'requiredDuringSchedulingIgnoredDuringExecution': [
            {
                'topologyKey': 'kubernetes.io/hostname',
                'labelSelector': {
                    'matchExpressions': [
                        {
                            'key': 'app',
                            'operator': 'In',
                            'values': ['airflow']
                        }
                    ]
                }
            }
        ]
    }
}

tolerations = [{
    'key': 'dedicated',
    'operator': 'Equal',
    'value': 'airflow'
}]

# Limit resources on this operator/task with node affinity & tolerations
three_task = PythonOperator(
    task_id="three_task", python_callable=print_stuff, dag=dag,
    executor_config={
        "KubernetesExecutor": {"request_memory": "128Mi",
                               "limit_memory": "128Mi",
                               "tolerations": tolerations,
                               "affinity": affinity}}
)

Let’s define each of the configuration parameters given to the Kubernetes Executor.

Resources Definition

  • request_memory: When a POD is created, the Kubernetes scheduler selects a node for the Pod to run on. The scheduler ensures that, for each resource type (CPU/memory), the sum of the resource requests of the scheduled Containers (tasks) is less than the capacity of the node. In our case, it means that the node where the task (container) is going to be executed must have more than 128Mi in memory. If the task exceeds its memory request, it is likely that its POD will be evicted.
  • limit_memory: When the kubelet starts a Container of a Pod, it passes the CPU and memory limits to the container runtime. If the task exceeds its memory limit, it might be terminated (if the task is restartable, the kubelet will restart it).

Affinity and Tolerations

  • tolerations: Tolerations work with the concept of taint. Both concept work together and are a way to avoid having pods running onto inappropriate nodes. Here, if we set the appropriate taint to the node corresponding to the toleration given in the code, all the tasks having this toleration will be executed on that node. Since we are using Minikube, we have only one node, but think of the case where we have multiple nodes. You would be able to dedicate certain tasks to certain nodes.
  • affinity: Node affinity allows you to constrain which nodes your POD is eligible to be scheduled on, based on labels on the node. Anti-affinity allow you to constrain which nodes your POD is eligible to be scheduled based on labels on PODs that are already running on the node rather than based on labels on nodes. In the case of Anti-affinity right there, we say, this POD will not be scheduled onto a node if that node is already running a POD with label having key “app” and value “airflow” and the POD cannot be colocated with another one on the same host (topologyKey).

Wo, well done if you have reached this point! I know it can be difficult to grasp everything but no worries, with practice you will feel better. I strongly recommend you to take a look at the documentation to learn more about affinity, limitations and so on

Conclusion

In this tutorial we have seen how to set up Apache Airflow with Kubernetes Executor inside MiniKube. The Kubernetes Executor offers you a new way to execute your tasks, making them flexible and resilient of failures. You can now define the resources you want to dedicate for a specific task, you specify on which node you want to see your tasks executing. Also, since you don’t need to keep your worker nodes alive indefinitely thanks to the Kubernetes Executor, this allows you to reduce drastically the amount of resources unused. This tutorial was actually just a taste of this very exciting executor. Be prepared for the next article where we will see how to use it in a real cluster of multiple nodes.

So that’s it for this tutorial, I hope you learned exciting things in this post and see you for the next one 🙂

Interested by learning more? Stay tuned and get special promotions!

Leave a Comment

Your email address will not be published. Required fields are marked *