Airflow Timetable. This new concept introduced in Airflow 2.2 is going to change your way of scheduling your data pipelines. Or I would say, you’re finally going to have all the freedom and flexibility you ever dreamt of for scheduling your DAGs. What if you want to run your DAG for specific schedule intervals with “holes” in between? if you want to avoid running your DAG during holidays or weekends? What if you want to trigger your DAG on different times according to the current day? I can tell you, the Airflow timetables give you the power and it’s truly a game changing feature. Without further do, let’s dive into it!
By the way, if you want to master Apache Airflow, take a look at my courses here 🤓
How Airflow DAGs are scheduled?
Before diving into the Timetables, let me remind you some important concepts (and introduce to you a new one 😉)
I don’t know about you but when I started to use Airflow for the first time, the concepts of scheduling interval, execution date, start date, end date, catchup and so on, were so confusing for me. I spent hours just to understand how everything works. As a gift, here is quick reminder just for you.
The start date is the date at which your DAG starts being scheduled. This date can be in the past or in the future. Think of the start date as the start of the data interval you want to process. For example, the 01/01/2021 00:00.
In addition to the start date, you need a schedule interval. This schedule interval defines the interval of time at which your DAG gets triggered. Represented either by a CRON expression or Timedelta object. For example, @daily which means, everyday at midnight.
Now, VERY IMPORTANT, your DAG is triggered after the start date + the schedule interval. So, your DAG is effectively triggered, the 02/01/2021 00:00 and not the 01/01/2021 00:00. Once your DAG is triggered, there is another concept to know, very confusing, the so called “execution date”. The execution date is NOT the date at which your DAG got triggered, but it is the date of the beginning of the data interval you want to process. In this example, the 01/01/2021 00:00.
Why does it work like that?
Because Airflow says, “if you want to process the data of the 01/01/2021, then you need to wait for the 02/01/2021 00:00 in order to have all the data. Then, your DAG processes the last 24 hours of data.”
In the example above, the start_date of DAG Run 1 is the 01/01 00:00, then the scheduler waits for 1 day. The 02/01 00:00, DAG Run 1 is triggered and the execution_date is set to the beginning of this interval, the 01/01 00:00.
Ok, that is still valid but Airflow 2.2 introduced a new concept and changed the execution date.
The Airflow Data Intervals
We all agree that the execution date is confusing isn’t it? Guess what, there is no more execution date 🤩 In fact, it still does exist but under another name: the logical date.
In addition to the logical date, there is another concept introduced in Airflow 2.2, the Data Intervals. A data interval is nothing more than the time range for which a DAG operates in (it covers). All of those concepts can be summed up like this
The data_interval_start = the logical_date = the execution_date whereas the data_interval_end is the date at which the DAG is effectively triggered. So, all of those changes are more semantic changes than something else, but that makes the comprehension of DAG scheduling much easier and clearer than before.
From Airflow 2.2, a scheduled DAG has always a data interval. The functions get_next_data_interval(dag_id) and get_run_data_interval(dag_run) give you the next and current data intervals respectively.
The Airflow Timetable
Now all the basics and concepts are clear, it’s time to talk about the Airflow Timetable. First thing first, what is a Timetable?
A Timetable is a class that defines the schedule interval of your DAG and describes what to do if it is triggered manually or triggered by the scheduler.
Keep in mind that whenever you set a schedule interval to a DAG, there is always a timetable behind the scene. That being said, let me show you a concrete example.
Let’s imagine that you would like to schedule your DAG for every Monday between 2PM and 9PM UST, and every Thursday between 4PM and 10PM UST.
What should you do? 🤔
Create and register your Timetable
The first step is to create a new file, different_times.py for example, in the folder plugins/ of Airflow.
Your timetable MUST be registered as a plugin. A timetable IS A PLUGIN.
Like any other plugin, you need to create a subclass of AirflowPlugin, then define a name and the Timetable to register. Your Timetable must be subclass of Timetable.
# in plugins/different_times.py from airflow.plugins_manager import AirflowPlugin from airflow.timetables.base import Timetable class DifferentTimesTimetable(Timetable): pass class DifferentTimesTimetablePlugin(AirflowPlugin): name = "different_times_timetable_plugin" timetables = [DifferentTimesTimetable]
In the example above, your Timetable is called DifferentTimesTimetable and your plugin is called DifferentTimesTimetablePlugin where the timetables attribute is override.
Ok, once your Timetable is registered, there are two methods that you need to implement. Let’s begin by the first one.
Trigger your DAG manually with an Airflow Timetable
What the Scheduler does if you trigger your DAG manually? The first method to implement is infer_manual_data_interval. It is called when a DAG run is manually triggered to infer a data interval for it.
For example, if you trigger manually your DAG on Sunday, what should be the data interval attached to it? Sunday, last Friday, or next Monday?
This method has one argument, run_after that is a DateTime corresponding to when the user triggers the run.
Let’s implement it.
from airflow.plugins_manager import AirflowPlugin from airflow.timetables.base import Timetable from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from datetime import timedelta from typing import Optional from pendulum import Date, DateTime, Time, timezone UTC = timezone("UTC") class DifferentTimesTimetable(Timetable): def _compute_delta_time(weekday): if weekday is 0: # Monday -> Last Thursday delta = timedelta(days=4) time_range = THURSDAY_TIMERANGE # 4PM - 10PM elif weekday < 3: # Tuesday, Wednesday -> Monday delta = timedelta(days=weekday) time_range = MONDAY_TIMERANGE # 2PM - 9PM else: # Thursday, Friday, Saturday, Sunday -> Thursday delta = timedelta(days=(weekday - 3) % 7) time_range = THURSDAY_TIMERANGE # 4PM - 10PM return delta, time_range def infer_data_interval(self, run_after: DateTime) -> DataInterval: weekday = run_after.weekday() delta, time_range = self._compute_delta_time(weekday) start = DateTime.combine((run_after - delta).date(), Time(hour=time_range)).replace(tzinfo=UTC) end = DateTime.combine(start.date(), Time(hour=time_range)).replace(tzinfo=UTC) return DataInterval(start=start, end=end)
As shown in the code above, the data interval start is set according the date at which the DAG is manually triggered (run_after).
- Triggered on Monday -> data_interval_start = last Thursday (previous week) at 4PM.
- If Triggered either on Tuesday or Wednesday -> data_interval_start = last Monday (current week) at 2 PM
- Triggered on the other days -> data_interval_start = last Thursday (current week) at 4 PM.
The end is always at 10PM on Thursday or 9PM if on Monday.
Notice that the methods returns a DataInterval object to describe the interval and the dates are “aware” (timezone info).
Finally, don’t forget that the data_interval_start corresponds to the logical date (execution date) and the data_interval_end is when your DAG is effectively triggered.
DAG triggered by the Scheduler with an Airflow Timetable
All right, now you handle the case in which your DAG gets triggered manually, it’s time to tell to Airflow what to do when your DAG gets triggered (automatically) by the Scheduler.
Stick me as this method is a little more complicated 😅.
def next_dagrun_info(self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction) -> Optional[DagRunInfo]:
next_dagrun_info has two important arguments:
last_automated_data_interval that corresponds to the last DAG Run’s data interval. If your DAG is triggered for the first time then this argument will be equal to None.
restriction that corresponds to crucial information about your DAG such as:
- restriction.earliest : the start date
- restriction.latest: the end date (if any)
- restriction.catchup: True if the catchup parameter is set to True, otherwise it is equal to False.
This method returns a DagRunInfo object that contains nothing more that the data interval of the next DAG Run. Like with infer_data_interval, you will have to define the start and end dates.
One thing to keep in mind is that you will always handles those two cases in your Airflow Timetable:
- When your DAG gets triggered for the first time by the scheduler (start_date = data_interval_start)
- When your DAG has already been triggered (You have some DAG Runs)
regardless of your schedule interval. ALWAYS.
All right, ready to implement the method? Let’s go 🚀
DAG triggered for the first time
So, what the scheduler checks when it triggers your DAG for the first time? Well, that’s exactly what you’re required to implement.
First thing first, check if your DAG has a start date. If not, then you don’t schedule your DAG as you don’t know when to start.
Then, what if the catchup parameter is set to False? In that case you want to start your DAG at the earliest date. Today or at the start_date (if set in the future).
Also, what if the start date is not set either on Monday or Thursday or at different times? For example, the 01/11/2021 11 PM, is on Monday but after 2:00 PM. Therefore you don’t want to run the DAG for this date, but for the next one (Thursday) (this condition is specific to the example of the blog post).
Finally, as you don’t want to run the DAG during weekends, then you check if the next start is on weekends or not, and change the next start date accordingly.
All right, enough talking, let’s see what does it mean in code for your airflow timetable:
next_start = restriction.earliest # No start_date or next date if next_start is None: return None # Catchup = False if not restriction.catchup: next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) delta, time_range = self._compute_delta_time(next_start.weekday()) next_start = DateTime.combine((next_start.date() - delta).date(), Time(hour=time_range)).replace(tzinfo=UTC)
Notice that next_start = restriction.earliest corresponds to the start date of your DAG.
DAG triggered by the Scheduler
Now, what if it’s not the first time that your DAG gets triggered?
Well, you just have to specify what to do if last run was on Monday or on Thursday
if last_automated_data_interval is not None: last_start = last_automated_data_interval.start # If previous run was between Monday -> next Thursday if last_start.weekday() == 0: next_start = last_start + timedelta(days=3) time_range = THURSDAY_TIMERANGE # If last run was Thursday -> go to Monday else: next_start = last_start + timedelta(days=(7 - last_start.weekday())) time_range = MONDAY_TIMERANGE next_start = DateTime.combine(next_start.date(), Time(hour=time_range)).replace(tzinfo=UTC)
The final code for the next_dagrun_info method looks like this:
def next_dagrun_info(self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction) -> Optional[DagRunInfo]: if last_automated_data_interval is not None: last_start = last_automated_data_interval.start # If previous run was between Monday -> next Thursday if last_start.weekday() == 0: next_start = last_start + timedelta(days=3) time_range = THURSDAY_TIMERANGE # If last run was Thursday -> go to Monday else: next_start = last_start + timedelta(days=(7 - last_start.weekday())) time_range = MONDAY_TIMERANGE next_start = DateTime.combine(next_start.date(), Time(hour=time_range)).replace(tzinfo=UTC) else: next_start = restriction.earliest # No start_date or next date if next_start is None: return None # Catchup = False if not restriction.catchup: next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) delta, time_range = self._compute_delta_time(next_start.weekday()) next_start = DateTime.combine((next_start.date() - delta).date(), Time(hour=time_range)).replace(tzinfo=UTC) next_end = DateTime.combine(next_start.date(), Time(hour=time_range)).replace(tzinfo=UTC) if restriction.latest is not None and next_start > restriction.latest: return None return DagRunInfo.interval(start=next_start, end=next_end)
The DAG end_date
Notice the following code:
if restriction.latest is not None and next_start > restriction.latest: return None
to avoid scheduling your DAG if the end_date is lower than the next_start. That means, you don’t want to schedule your DAG anymore as the next start is beyond the end date.
Adding the Airflow Timetable to your DAG
Once you are done with your Airflow timetable, you are ready to use it.
For that, it is extremely simple.
from airflow.models import DAG from airflow.operators.dummy import DummyOperator from different_times import DifferentTimesTimetable from datetime import datetime with DAG('different_times_dag', timetable=DifferentTimesTimetable(), start_date=datetime(2021, 1, 1), catchup=True ) as dag: None
As you can see from the code above you need to:
- Import your Timetable (from plugins)
- Specify your Timetable in the new timetable DAG’s argument.
That’s it. Do not specify any schedule_interval as it is handled by your Timetable.
Now, you are ready to schedule your DAG at any time, in any way you want! 🤩
Wait a second!!!!!
Don’t miss any new article that I post and more importantly, stay up to date with the latest Airflow features so you can create better, reliable, INCREDIBLE DATA PIPELINES!!!!
Register below, no spams 👇