Airflow Sensors : What you need to know

Airflow Sensors are one of the most commonly used type of operators. Why? Because they allow you to check if a criteria is met to get completed. You need to wait for a file? check if a SQL entry exists? delay the execution of your DAG? That’s the few possibilities of the Airflow Sensors. If you want to make complex and powerful data pipelines you have to truly understand how Sensors work. Well, guess what, that’s exactly what you are going to discover now. By the way, if you are new to Airflow, check my course here, you will get it with a special discount. Ready? Let’s go!

Use Case

As a good example worth 10,000 words, let’s illustrate why Sensors are important for you.

DAG example for Airflow sensors
Dag example with Airflow Sensors

A really common use case is when you have multiple partners (A, B and C in this example) and wait for the data coming from them each day at a more or less specific time. For example, Partner A sends you data at 9:00 AM, B at 9:30 AM and C and 10:00 AM. Hoping without delay, but we will come back to this later. So, your goal is to wait for all files to be available before moving to the task Process. Ok, that being said, what are the tasks Partner A, B and C exactly?

Well, when people are not aware about Sensors, they tend to use the PythonOperator. As they need to wait for a file, they create a python function, do their stuff in it to wait for that file and call the python function with the PythonOperator. This is the worst way to do it. Don’t do this, forget about it. You will not leverage the benefits of Airflow and it will be a nightmare to maintain. Don’t do it. Therefore, what’s the solution?

Airflow Sensors! 😎

Airflow Sensors

What is a Sensor operator? A Sensor is an operator evaluating at a time interval if a criteria/condition is met or not. If yes, it succeeds, if not, it retries until it times out. Concretely, you goal is to verify if a file exists at a specific location. With a Sensor, every 30 seconds it checks if the file exists at that location. If yes it succeeds otherwise it will time out. It is as simple as that. You need to wait for something? Use an Airflow Sensor.

Airflow brings different sensors, here are a non exhaustive list of the most commonly used:

  • The FileSensor: Waits for a file or folder to land in a filesystem.
  • The S3KeySensor: Waits for a key to be present in a S3 bucket.
  • The SqlSensor: Runs a sql statement repeatedly until a criteria is met.
  • The HivePartitionSensor: Waits for a partition to show up in Hive.
  • The ExternalTaskSensor: Waits for a different DAG or a task in a different DAG to complete for a specific execution date. (Pretty useful that one πŸ€“ )
  • The DateTimeSensor: Waits until the specified datetime (Useful to add some delay to your DAGs)
  • The TimeDeltaSensor: Waits for a timedelta after the task’s execution_date + schedule interval (Looks similar to the previous one no?)

And more!

Take a look at the list of Sensors available here.

Regardless of the Sensor used, they are all based on the BaseSensorOperator class which means, they share common attributes that you are going to discover right now!

The King of Airflow Sensors

If you truly want to understand Sensors, you have to understand their base class, the BaseSensorOperator.

When you add a Sensor, the first step is to define the time interval at which the criteria/condition will be evaluated. By default, every 60 seconds. To do this, you have to modify the poke_interval attribute which expects a float as shown below.

    waiting_for_file = FileSensor(
        task_id='waiting_for_file',
        poke_interval=30
    )

The code above implements a FileSensor that checks for every 30 seconds if a file exists (but no file is specified yet). Now you might wonder, what is the best poke interval? What is the optimal value for it? Well, there is no right answer for that. The shorter the poke interval, the faster the check and so the task completion. But, be careful. If it is too short, there is no guarantee that Airflow will be able to evaluate each interval in time.

Ultimately it depends on your condition, your criteria. For instance, checking if a record exists in a SQL Table means creating a new connection at each interval and a network latency. My advise to you, define a “reasonable” value, the default one is usually a good fit. Behind the scene, the poke process is nothing more than a loop verifying your condition with a sleep in it.

Speaking of the time interval, what if the file never arrives?

Pools and Concurrency

Easy to forget but it is super important! You should always define a timeout to your Airflow Sensors. ALWAYS. Why? Let me give you a concrete example. However, we need to step back a little bit and understand what’s going on when a task is executed in Airflow.

In Airflow, all operators share a common pool soberly called “default_pool”. If you go to Admin -> Pools here is what you get:

Airflow sensors with pools
Airflow default pool

Airflow pools are used to limit the execution parallelism on arbitrary sets of tasks. Each time a task is running, a slot is given to that task throughout its execution. Once the task is finished, the slot is free again and ready to be given to another task. A slot is given regardless of the resources a task needs. It’s really just a slot, 1 task = 1 slot. If there is no more slots available, the tasks will be queued and so the number of queued slots will increase. By default, you can execute at most 128 tasks at the same time as the default pool has 128 slots.

In addition, the concurrency or the limit to the maximum number of running tasks for a given DAG is set to 16 by default (dag_concurrency / concurrency). In other words, you can run 16 tasks at the same time for a same DAG.

To sum up. Your tasks share the same pool and when a task is running a slot is locked until that task is done. The concurrency argument limits the number of maximum running tasks for a given DAG to 16 by default.

Now, what’s the link with the Sensor timeout?

The Deadlock

The timeout attribute defines the maximum number of seconds a Sensor is allowed to run for. It is not a timeout for each poke interval but for the whole Sensor execution process. By default, this timeout is set to 7 days or 60 * 60 * 24 * 7 seconds. And let’s take a few seconds to understand why it can be a big problem for you. Back to your data pipeline:

Dag example for Airflow sensors
Dag example with Airflow Sensors

Let’s say the schedule interval of your DAG is set to daily but the files of Partner A, B and C never come. Also, the maximum number of running tasks for that DAG is limited to 12 (concurrency=12 or dag_concurrency=12). Now, what do you think is going to happen?

  • Day 1: DAG Run 1, Tasks Partner A, B and C are polling, 3 tasks/slots are occupied. 9 slots left.
  • Day 2: DAG Run 2, Tasks Partner A, B and C are polling, 3 tasks/slots are occupied, 6 in total. 6 slots left.
  • Day 3: DAG Run 3, Tasks Partner A, B and C are polling, 3 tasks/slots are occupied, 9 in total. 3 slots left.
  • Day 4: DAG Run 4, Tasks Partner A, B and C are polling, 3 tasks/slots are occupied, 12 in total. 0 slots left.
  • Day 5: DAG Run 5, No more slots available -> Deadlock! 😱

Yes that’s right. You’ve just discovered the Sensor Deadlock. As the concurrency limit is set to 12, all slots are occupied by your sensors and you can’t execute any more tasks. Now you might say, “Wait, isn’t the job of the timeout to deal with that?” Yes it is! But, it is set to 7 days by default. So, you have to wait 2 more days before being able to execute new tasks.

Airflow Sensor Timeout

Always, Always define a meaningful timeout for your Sensors. Always. Basically, it has to be shorter than the schedule interval of your DAG and corresponds to the time range in which you expect to see your condition being met. For example, if your DAG runs once a day at 8 AM, and your files should land at 08:10 AM, a timeout set to 30 minutes might be a good idea. That will prevent you from the Sensor Deadlock issue.

    waiting_for_file = FileSensor(
        task_id='waiting_for_file',
        poke_interval=30,
        timeout=60 * 30
    )

Ok, the poke_interval is defined and the timeout is set to avoid deadlocks. What else? Well, do you know that you can optimize your sensor further by defining its mode?

Sensor Modes

A sensor has two modes. “poke”, which is the mode by default, means that when a sensor runs it takes a slot for its whole execution time and sleeps between pokes. In other words, it takes a slot until it gets completed. Remember the previous example? That’s why you end up with a deadlock. Because more and more sensors are taking up all of your slots and keep them even when they are NOT poking (between each poke_interval). The truth is, 99% of the time, a sensor does nothing but sleep (That’s why Smart Sensors have been introduced in Airflow 2.0, but that will be for another article). To avoid having your slot on hold for the duration of the sensor’s runtime, you would need to use the “reschedule” mode.

If you set your sensor to the “reschedule” mode, it will free the slot when the condition/criteria is not met and it is rescheduled later. Concretely, between each poke_interval, when the sensor is not checking your criteria anymore, the slot is released and your sensor gets the status “up_for_reschedule”. Meanwhile, your other tasks can run. “up_for_reschedule” means you sensor is going to be rescheduled at a later time, or more specifically at the current date + poke_interval. However, there is no guarantee! Indeed, if there is no more slots available, your sensor like any other task will have to wait until a slot is released.

You can check your Sensors in “up_for_reschedule” in Browse -> Task Reschedules. Notice the Reschedule date.

Airflow sensor task reschedules
Task Reschedules

Which mode should you use? Usually, if the expected runtime of your sensor is short or if the poke_interval is short like less than a minute, go with the “poke” mode. Otherwise, if the expected runtime is quite long, then go with the “reschedule” mode.

    waiting_for_file = FileSensor(
        task_id='waiting_for_file',
        poke_interval=120,
        timeout=60 * 30,
        mode="reschedule"
    )

What if an Airflow Sensor times out

It’s great to know that you can define a timeout to your Sensors but it would be better if you know how to take actions next. Well, good news! You can define a callback function in case an operator fails. Let me show you an example:

def _failure_callback(context):
    if isinstance(context['exception'], AirflowSensorTimeout):
        print(context)
        print("Sensor timed out")

with DAG(...) as dag:
    waiting_for_file = FileSensor(
        task_id='waiting_for_file',
        poke_interval=120,
        timeout=10,
        mode="reschedule",
        on_failure_callback=_failure_callback
    )

In the code above, as soon as your FileSensor times out, the function _failure_callback will be called with the task instance context object given in parameter. This context object contains a lot of useful information about your dag, task etc. In that case, you can check the key “exception” and filter on it. If the exception is equal to AirflowSensorTimeout then your Sensor timed out therefore do whatever you want next.

Notice that this callback won’t be called if your Airflow Sensor is skipped!

Wait what?! Skipped?

Soft fail or skip your Sensor

Once a time out occurs you have the choice of either marking the sensor as failed or as skipped.

Whether to choose one or the other really depends on your use case. For example, let’s you have multiple sensors waiting for different files and if one file is not available you still want to process the others, skip that sensor might be a good idea as it is not a real “failure”, it is not critical for your DAG.

One important side note, don’t forget that when you skip a task, by default, all downstream task of the skipped task will be skipped as well. Therefore, you have to change the trigger rule of the task depending on your Sensors. More about trigger rules in another article πŸ˜‰

Here is a complete example:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from airflow.exceptions import AirflowSensorTimeout

from datetime import datetime

default_args = {
    'start_date': datetime(2021, 1, 1)
}

def _done():
    pass

def _partner_a():
    return False

def _partner_b():
    return True

def _failure_callback(context):
    if isinstance(context['exception'], AirflowSensorTimeout):
        print(context)
        print("Sensor timed out")

with DAG('my_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    partner_a = PythonSensor(
        task_id='partner_a',
        poke_interval=120,
        timeout=10,
        mode="reschedule",
        python_callable=_partner_a,
        on_failure_callback=_failure_callback,
        soft_fail=True
    )

    partner_b = PythonSensor(
        task_id='partner_b',
        poke_interval=120,
        timeout=10,
        mode="reschedule",
        python_callable=_partner_b,
        on_failure_callback=_failure_callback,
        soft_fail=True
    )

    done = PythonOperator(
        task_id="done",
        python_callable=_done,
        trigger_rule='none_failed_or_skipped'
    )

    [partner_a, partner_b] >> done

Notice the trigger rule of the task done to say, all parents haven’t failed and at least one parent has succeeded.

All right, there is one last parameter you don’t know yet.

Exponential backoff or Retry your Sensor cleverly

There is a very little known but loss useful argument that all sensors share, the exponential_backoff which expects a boolean True or False.

This argument allows you progressive longer waits between pokes by using exponential backoff algorithm. In general, retries with exponential backoff is more clever way of retrying your task as you will wait more at each new retry. That avoid “hammering” your system especially if you have a lot of tasks running that interact with external tools. Now, should you use it with your Sensors? Maybe. Keep in mind that each “poke” will be more delayed than the previous one which may slow down the execution of your tasks/DAGs.

Again, no golden rule here. Try and learn 😁

The Final Code with the FileSensor

Ok, now you know everything you need about Sensors, what would be the final code for the example DAG with the partners?

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from airflow.exceptions import AirflowSensorTimeout

from datetime import datetime

default_args = {
    'start_date': datetime(2021, 1, 1)
}

def _process():
    pass

def _store():
    pass

def _failure_callback(context):
    if isinstance(context['exception'], AirflowSensorTimeout):
        print(context)
        print("Sensor timed out")

with DAG('my_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    partners = [
        FileSensor(
            task_id=f'partner_{partner}',
            poke_interval=60,
            timeout=60 * 30,
            mode="reschedule",
            on_failure_callback=_failure_callback,
            filepath=f'partner_{partner}.txt',
            fs_conn_id=f'conn_filesensor_{partner}'
        ) for partner in ['a', 'b', 'c']]

    process = PythonOperator(
        task_id="process",
        python_callable=_process,
    )

    store = PythonOperator(
        task_id="store",
        python_callable=_store,
    )

    partners >> process >> store

And you get:

The DAG example

One important thing to remember. Each sensor has its own set of arguments. For example, with the FileSensor you have two arguments to provide. The filepath corresponding to either to the file or folder your are waiting for (can be a glob). A connection, which will be of the type “File” with a json extra value {“path”: “/where/you/file/is/”}. Again, each sensor has specificities so don’t hesitate to look at the documentation for that.

The ExternalTaskSensor

Before finishing the article, I couldn’t leave you without talking about the ExternalTaskSensor. Hold on tight, this special Airflow Sensor allows you to create DAG dependencies 🀯

Yes, you heard it right. You want to execute DAG B when DAG A is done, you can do that with the ExternalTaskSensor. That Sensor allows you to wait for a task to complete in another DAG before getting itself completed. Want to learn more? I made a video about it. Enjoy 😁

The ExternalTaskSensor

Conclusion

Wo, finally you’re done. It was a pretty long and intense article isn’t it? Airflow Sensors are super important to understand as they allow you to make more complex data pipelines and solve very common use cases. I hope now you’ve a better idea of how they work and what you can do with them. If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go way much further, enrol in my 12 hours course here.

Have a great day! 😁

Leave a Comment

Your email address will not be published. Required fields are marked *