Apache Airflow monitoring with TIG: Part 1

Monitoring Apache Airflow should be your top priority when you are in production. Without monitoring you have no way to know if anything goes wrong. Imagine that Airflow stops working for any reason, since it is your orchestrator,  your data pipelines won’t be scheduled anymore and so your data won’t be processed as expected. If this happen over the weekend, this can be dramatic for the company your work for. In this tutorial, I’m gonna show you how to monitor Apache Airflow with StatsD, Telegraf, InfluxDB and Grafana using Docker. At the end of this series of tutorials, you will be able to monitor Apache Airflow and be alerted via a Slack message if it has any technical issue. One more thing, if you like my posts, you can support my work by becoming my Patron right here. No obligation but if you want to help me, I will thank you a lot. If you are new to Apache Airflow, you can check my course right here which will give you a solid introduction. Let’s begin!

Since Apache Airflow is instrumented by StatsD in order to expose the metrics you can monitor, it may be a good idea to start by presenting this tool. If you have never heard about StatsD, let me give you a quick introduction of this very simple but powerful tool.

StatsD is a simple daemon developed and released by Etsy in order to aggregate and summarize application metrics. You can think of StatsD as a push-based monitoring system where it receives metric logs from an application and push them to somewhere else such as ElasticSearch or InfluxDB for example.

How StatsD works is pretty simple. From an application, like Airflow, the developer has to use one of the many StatsD libraries available, in order to produce the metrics he/she wants to capture. Basically, StatsD has multiple metric types which are:

  • Counters: simple counter, increase or decrease a number. At each flush the current count is sent and reset to 0. StatsD sends both the rate and the count at each flush.
  • Timing Summary Statistics: gives the time for something to finish. StatsD figures out percentiles, average (mean), standard deviation, sum, lower and upper bounds for the flush interval. For each percentile threshold, the mean, upper and sum is computed.
  • Gauges: takes an arbitrary value assigned to it and maintains that value until it is next set. Adding a sign to the gauge value will change it rather than setting it.
  • Sets: use to count unique occurrences of events between flushes.

If you want to learn more about them, check the documentation right here.

Once the metrics are added into the application, they are going to be sent to the StatsD daemon over UDP datagrams. Why UDP? Because this protocol doesn’t wait for acknowledgement like with TCP and so, is extremely fast and doesn’t block your application. The StatsD daemon listens to the UDP traffic, aggregates the metrics received over time and flush them to the backend we set at a defined interval. The backend could be ElasticSearch or InfluxDB for example.

The good things with StatsD are its simplicity, its tiny footprint in terms of resources and the fact that your application is decoupled from StatsD. Indeed, since the daemon runs outside of the monitored application, StatsD can’t cause a crash or any slow down. They are totally independent.

Alright, now we had a quick introduction to StatsD, let’s discover how Airflow and StatsD work together.

Apache Airflow with StatsD

StatsD support is built-in to Apache Airflow using the Python client of statsd in order to expose its metrics. Basically, Airflow pushes three type of metrics to the StatsD daemon:

  • The Counters
  • The Gauges
  • The Timers

Each type of metrics brings different information such as the number of overall task failures, the scheduler heartbeat to know if it is up or down, the number of queued tasks on executor, the seconds to load a given DAG and much more. You can find all the metrics available by looking at the documentation right here

You may wonder how exactly Apache Airflow and StatsD interact to each other. Well, in order to answer this question let’s dive into the source code. If you take a look at the file stats.py (which you can find here) and scroll down a little bit, you will see a global variable named Stats initialised as a DummyStatsLogger object by default. The class DummyStatsLogger implemented at the top of the file, does nothing and its only purpose is to avoid having to check if the variable Stats is initialised each time we want to push a metric such as incrementing a counter, adding a value to a gauge and so onHere is the source code of that class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class DummyStatsLogger:
    @classmethod
    def incr(cls, stat, count=1, rate=1):
        pass

    @classmethod
    def decr(cls, stat, count=1, rate=1):
        pass

    @classmethod
    def gauge(cls, stat, value, rate=1, delta=False):
        pass

    @classmethod
    def timing(cls, stat, dt):
        pass

Notice the use of the python keyword pass for each metrics represented by the class methods incr(), decr(), gauge() and timing(). As you may guess, incr() and decr() are related to counter metrics, gauge() and timing() are related to gauge and timing metrics respectively.

So, we have seen that the variable Stats is set as a DummyStatsLogger object by default but what is the other alternative then? If you go at the bottom of the stats.py file, you will find the following try clause:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
try:
    if conf.getboolean('scheduler', 'statsd_on'):
        from statsd import StatsClient

        statsd = StatsClient(
            host=conf.get('scheduler', 'statsd_host'),
            port=conf.getint('scheduler', 'statsd_port'),
            prefix=conf.get('scheduler', 'statsd_prefix'))
        Stats = SafeStatsdLogger(statsd)
except (socket.gaierror, ImportError) as e:
    log.warning("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e)

Let me describe you what this snippet of code does:

  1. It checks in the configuration file airflow.cfg if the parameter statsd_on is set to True under the section scheduler.
  2. If so, it imports the StatsClient class from the Python package statsd and initialise the client.
  3. StatsClient to be initialised, requires to have the following three configuration parameters filled: statsd_host, statsd_port and statsd_prefix.
  4. Once the client is set, it is passed to the class SafeStatsdLoggger which is in turn assigned to the global variable Stats. The class SafeStatsdLogger has the same methods of DummyStatsLogger except they really does something as they interact with the statsd client given in parameter.

So, in order to use StatsD and monitor Apache Airflow, you have to set the following configuration parameters in airflow.cfg:

statsd_apache_airflow_parameters

statsd_host and statsd_port correspond to the host and port of the machine where the StatsD daemon is running and listening on the port 8125. For example, if we take the file dagbag.py (you find it here), under the method collect_dags() you can see the following lines:

statsd_apache_airflow_in_code_2

Each time the method collect_dags is called, those three metrics, collect_dags, dagbag_size and dag_import_errors are pushed to the StatsD daemon which in turn, at a certain defined interval, will flush the metrics aggregated to a backend. Notice the use of the global variable Stats as well as the methods given by the class SafeStatsLogger.

Alright, now you know how StatsD is implemented and used by Apache Airflow, let’s discover the tools we gonna used to set up our monitoring platform.

TIG stack or Telegraf, InfluxDB and Grafana

The TIG stack for Telegraf, InfluxDB and Grafana, is a common stack used in production for monitoring purpose. That’s exactly the stack we gonna use for monitoring Apache Airflow. Let me briefly introduce you each components

What is Telegraf?

Telegraf is an open source agent for collecting, processing and aggregating metrics. Once data are collected, they are pushed to various outputs such as InfluxDB, ElasticSearch, Syslog and so on.  Telegraf is very lightweight and works with a system of plugins that you can add or remove from each phase your metrics go through (collecting, processing, aggregating, pushing). You can find the exhaustive list of plugins right here.

What is InfluxDB?

InfluxDB is an open source time series database written in Go, built from the ground up to handle high write and query loads. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics.

What is Grafana?

Grafana is an open source data visualization and monitoring suite. It supports many databases such as ElasticSearch, Prometheus or InfluxDB. The tool gives you the ability to create very complex and beautiful dashboards mixing different data sources at one place. It is also widely used to set alerts according to the metrics and thresholds you have.

To sum up, we basically have Telegraf in order to collect and process the metrics, InfluxDB to store them in an optimized way and Grafana to build our dashboards upon the metrics stored into InfluxDB. Now, let’s monitor Apache Airflow using the TIG stack.

Monitoring Apache Airflow with the TIG stack

In order to set up the TIG stack for monitoring Apache Airflow, we are going to use Docker as well as Docker Compose extensively. If you don’t know what is Docker I strongly advise you to get familiar with this technology by taking a look at their website here. For the rest of the tutorial I assume that Docker is already installed on your computer and you have some familiarities with the tool.

Creating a shared network for Telegraf and Apache Airflow

If you remember, Apache Airflow send its metrics through the StatsD client to the StatsD daemon. So basically, we need to have a StatsD daemon running on a machine in order to collect the metrics sent from Airflow. To do this, we are going to use Telegraf and more specifically, one of its input plugins named statsd. The statsd plugin is a special type of plugin which runs a backgrounded statsd listener service (daemo) while telegraf is running

In order to set up our environment, we are going to use Docker and Docker Compose. First thing first, we need to create a network that is going to be shared between the two docker containers where Airflow and Telegraf are respectively going to run. In your terminal, execute the following command:

1
docker create network tig                                      

Then, you can check that the network named “tig” has been created by typing:

1
docker network ls                                                       

Your output won’t be the same as mine but you should have the same line about the network named tig as you can see at the bottom of the following screenshot.

docker-network-ls

Now the network is set, we have to start the docker container running Telegraf. Why not running Airflow before Telegraf? Well, because if you do that, Airflow won’t be able to start as its connection to the StatD daemon (which is an input plugin of Telegraf in our case)  will fail since Telegraf is not running. Always start the StatsD daemon before running Airflow. Let’s begin with Telegraf using Docker.

Configuring and running Telegraf with Docker

First, generate a sample configuration and save it as telegraf.conf on the host (your machine) by executing this command:

1
docker run --rm telegraf telegraf config > telegraf.conf   

Once the telegraf.conf file is generated, open it with your favorite editor and go to the line where you will see the [[inputs.statsd]] section. Remove, for every line ( the one with [[input.statsd]] also ), only the first # at the beginning until the end of the section. Like so:

statsd_conf_file_telegraf

Notice that, this screenshot only shows you the beginning of the section [[inputs.statsd]]. You should remove all the first # until you reach the [[inputs.syslog]] section. 

Next, before sending the metrics into InfluxDB, let’s use another output to see how those metrics are formatted when they are sent by Airflow through StatsD. Still in telegraf.conf, comment the where [[outputs.influxdb]] is by adding a # before, like so:

comment_influxdb_output_telegraf

Then, go to the section [[outputs.file]] and uncomment all the lines as we did previously with the section [[inputs.statsd]]. You should obtain the following lines:

uncomment_outputs_file_telegraf

Again, this screenshot only shows the beginning of the section, you should uncomment every line until the end. Last thing, remove the value “stdout” from the array “files”. You should end up with the following array:

remove_stdout_file_telegraf

By using the plugin file, Telegraf will write all the metrics received from Airflow into the file /tmp/metrics.out. It’s perfect for debugging purpose. Now, save the file telegraf.conf and in your terminal, execute the command below:

1
2
3
4
5
6
7
8
docker run \
--rm \
-d \
--hostname telegraf \
--name telegraf \
-v $PWD/telegraf.conf:/etc/telegraf/telegraf.conf:ro \
--network tig \
telegraf

This command basically starts a docker container running Telegraf in background (-d). The hostname and the name of the container are set to “telegraf” ( you will see later why ). The -v parameter allows to bind the configuration file telegraf.conf from your machine to the /etc/telegraf/telegraf.conf in the docker container. It means that any modification from $PWD/telegraf.conf will modify the file /etc/telegraf/telegraf.conf as well. The –network option specifies that we want the container being part of the network “tig” we created earlier. If everything goes well, by typing the command:

1
docker ps                                                                   

You should see a line corresponding to the Docker container named “telegraf”.

screen_docker_ps_telegraf

Now it’s time to start the Docker container running Apache Airflow.

Configuring and running Airflow with Docker in order to send metrics into Telegraf

Configuring and running Airflow with Docker is quite simple. You just have to execute the following command:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
docker run \
--rm \
-d \
-p 8080:8080 \
-e LOAD_EX=y \
-e AIRFLOW__SCHEDULER__STATSD_ON=True \
-e AIRFLOW__SCHEDULER__STATSD_HOST=telegraf \
-e AIRFLOW__SCHEDULER__STATSD_PORT=8125 \
-e AIRFLOW__SCHEDULER__STATSD_PREFIX=airflow \
--hostname airflow \
--name airflow \
--network tig \
puckel/docker-airflow

Let me explain you briefly what this command does. The -p 8080:8080 parameter binds the port 8080 where the Airflow web server is running inside the Docker container, to the port 8080 from your machine. Without this parameter, you wouldn’t be able to access to the UI form your web browser. The -e parameters set different environment variables. Apache Airflow allows you to either modify its configuration via the configuration file airflow.cfg or, by overwriting the values in this file with environment variables. This is exactly what we do here. LOAD_EX=y loads the DAG examples and the AIRFLOW__SCHEDULER__STATSD variables define the different values such as hostname, port and prefix, required for sending metrics to the StatsD daemon ( Telegraf ). Notice the value of the environment variable AIRFLOW__SCHEDULER__STATSD_HOST sets to “telegraf”. If you remember, that’s exactly the same value we defined earlier to the hostname parameter of the docker container running Telegraf.

If you list the running docker containers with the following command:

1
docker ps                                                                   

You should end up with two running docker containers being airflow and telegraf as shown by the screenshot below:

screen_docker_ps_telegraf_2

That’s it! Apache Airflow is currently sending its metrics through the use of its StatsD client to the StatsD daemon running with Telegraf. We have just set up the basics for monitoring Apache Airflow. Now, let’s see what the metrics sent by Airflow look like.

Did you learn something? Become my Patron and get more high quality tutorials

Understanding the metrics for monitoring Apache Airflow

For monitoring Apache Airflow, you have to understand the metrics used. To do so, we are going to open the file metrics.out where the metrics are flushed from the StatsD daemon and take a look at the data.

First, connect to the docker container “Telegraf” with the following command:

1
docker exec -it c21398ac3264 /bin/bash                                        

Obviously, you have to replace my container id with yours running Telegraf. Once you are connected to the Docker container, type:

1
cat /tmp/metrics.out | grep airflow | head                                              

After executing the command, you should obtain this kind of output (without the same values):

output_metrics_apache_airflow

Don’t be afraid, I’m gonna explain you exactly what those metrics mean. If you remember, we have seen three types of metrics: Counters, Gauges and Timers that can be used for monitoring Apache Airflow. Let’s begin with the Counters.

Example of a Counter’s metric:

1
airflow_scheduler_heartbeat,host=telegraf,metric_type=counter value=2i 1567699473000000000  

This metric tells us if the Scheduler of Airflow is up. Let’s decompose this metric:

  • airflow: StatsD prefix we set in the Airflow configuration file (airflow.cfg).
  • scheduler_heartbeat: Name of the metric sent by the StatsD client.
  • telegraf: Hostname of the machine where the metric is sent.
  • counter: Type of this metric.
  • 2i: Current value of the counter (i: for int).
  •  1567699473000000000: Unix timestamp in nanoseconds.

We will see later with Grafana how to display this metric, but basically, the counter should continuously change. If there is a lack of change, it may indicate an issue with the Scheduler. Now we have seen a counter, let’s take a look at a Gauge’s metric.

1
airflow_executor_running_tasks,host=telegraf,metric_type=gauge value=0 1567702629000000000  

This metric tells us the number of tasks the executor is running. Again, let’s decompose this metric:

  • airflow: StatsD prefix we set in the Airflow configuration file (airflow.cfg).
  • executor_running_tasks: Name of the metric sent by the StatsD client.
  • telegraf: Hostname of the machine where the metric is sent.
  • gauge: Type of this metric.
  • 0: Current value of the gauge.
  •  1567702629000000000: Unix timestamp in nanoseconds.

The gauge metrics are really simple to interpret only take an arbitrary value. Here, the value 0 indicates that there is currently no running tasks. Alright, the last metric example we gonna see is a Timing’s metric.

1
airflow_dag_loading-duration_example_xcom,host=telegraf,metric_type=timing lower=0.015018,count=1i,90_percentile=0.015018,mean=0.015018,stddev=0,sum=0.015018,upper=0.015018 1567702969000000000  

This metric tells us the number of seconds taken to load the DAG example_xcom. As we did previously, let’s see the metric in details:

  • airflow: StatsD prefix we set in the Airflow configuration file (airflow.cfg).
  • dag_loading-duration: Name of the metric sent by the StatsD client.
  • example_xcom: Name of the DAG for which is the metric is sent
  • telegraf: Hostname of the machine where the metric is sent.
  • timing: Type of this metric.
  • 0.015018: Lower bound among all timings recorded during the flush interval.
  • 1i: Number of timings recorded during the flush interval.
  • 0.015018: 90% of timings recorded during the flush interval are below 0.015018 seconds.
  • 0.015018: Average of the timings recorded during the flush interval.
  • 0: Standard deviation of the timings. recorded during the flush interval.
  • 0.015018: Sum of seconds corresponding to the timings recorded in the flush interval.
  • 0.015018: Upper bound among all timings recorded during the flush interval.
  •  1567702969000000000: Unix timestamp in nanoseconds.

Now, you may want to tell me: “Ok that’s good but what’s the point if all the stats share the same value : 0.015018 ?”. Well, good news, we can fix this. But before, let me tell you why do we get the same value on timing. The flush interval, if you remember, is the interval of time defining when the StatsD daemon pushes the metrics to one or multiple outputs which can be a file or a database for example. This parameter is defined in the file telegraf.conf under the name flush_interval:

1
2
3
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"

By default the flush interval is set to 10 seconds. So, every 10 seconds, the StatsD daemon will flush the metrics and push them into the given outputs. Back to Apache Airflow, if you type the following command in your terminal (the container id should correspond to the id of your container running Airflow):

1
docker logs 2c2ea5276bba                                                   

You will get this output:

dagbag_airflow

From the screenshot above, let’s focus only on the logs surrounded in red are important. If you carefully take a look at the time each log is produced, you will notice an interval of around 30 seconds between each “Filling up the DagBag”.

This log means that Apache Airflow scans the folder where the DAGs are located every 30 seconds and so the metric dag_loading-duration is produced every 30 seconds as well. Now, since this metric is produced every 30 seconds and the flush interval of the StatsD daemon is set to 10 seconds, it’s impossible to have more than one metric in the interval of time before the flush happens. That’s why we get the same value for all the stats of the timing metric dag_loading-duration. In order to fix this, you can stop the docker container running Airflow like so:

1
docker stops 2c2ea5276bba                                                   

And start a new Docker container running Airflow with the following command:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
docker run \
--rm \
-d \
-p 8080:8080 \
-e LOAD_EX=y \
-e AIRFLOW__SCHEDULER__STATSD_ON=True \
-e AIRFLOW__SCHEDULER__STATSD_HOST=telegraf \
-e AIRFLOW__SCHEDULER__STATSD_PORT=8125 \
-e AIRFLOW__SCHEDULER__STATSD_PREFIX=airflow \
-e AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=5 \
--hostname airflow \
--name airflow \
--network tig \
puckel/docker-airflow

The slight modification is the add of a new environment variable AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL in order to reduce the interval of time between each filling of the DagBag. If you execute the commands below:

1
2
docker ps
docker logs <container_id_of_airflow>

You will get the following output:

dagbag_airflow_increased

Now, the interval of “Filling up the DagBag” has been reduced to around 5 seconds. The modification done, we can go back to our Docker container running Telegraf. In the file “metrics.out”, if you take the metric dag_loading-duration, you will see that the values for the stats are now different as shown below:

1
airflow_dag_loading-duration_tutorial,host=telegraf,metric_type=timing sum=0.00939,upper=0.007784,lower=0.001606,count=2i,90_percentile=0.007784,mean=0.004695,stddev=0.003089 1567718677000000000  

Furthermore, the stats “count” is equal to two meaning two metrics in the flush_interval have been received and aggregated. In the current configuration, we won’t be able to have more than two metrics for the stats. At this point, you should know why and be able to increase this number. If not, you should read again this section 🙂

Conclusion

In the first part of this series of tutorial about monitoring Apache Airflow, we have seen:

  • What is StatsD
  • How StatsD is integrated in Apache Airflow
  • How Apache Airflow sends its metrics
  • What is the TIG stack for monitoring Apache Airflow
  • How to configure and run Telegraf for receiving metrics from Airflow using Docker
  • How to configure and run Apache Airflow for sending its metrics to Telegraf using Docker
  • What are the metrics sent by Apache Airflow in details

It was a pretty dense first part, the others should be lighter but now you have a solid knowledge of monitoring Apache Airflow. If you are new to Apache Airflow, do not hesitate to check my course right here (you get a big discount). If you like my tutorials and show me your support, click here and become my Patron. ( The number of Patrons is limited )

I hope you enjoyed this tutorial and see you for the next one!

Interested by learning more? Stay tuned and get special promotions!

Liked it? Join the Patreon Community and get an access to exclusive content now!

2 thoughts on “Apache Airflow monitoring with TIG: Part 1”

    1. marclamberti

      Yes, but this plugin is used for monitoring Airflow with Prometheus. I will come back at it in a future blog post 😉 Thank you for sharing!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top