Apache Airflow | With Statement and DAG

If you take a look at some DAG examples in my course “The Complete Hands-On Course to Master Apache Airflow”, you may notice the use of the “with” statement when a dag object is created. The question is why? Why it is considered as a best practice to use this Python keyword when you instantiate a dag object. In this article we are going to make a quick recall by defining what is the “with” keyword and then we will explore its use in the context of Apache Airflow. 

Understanding Python's "with" keyword

Let’s start by looking at the problem the “with” statement tries to solve

connectToResource()
try:
    doSomethingWithResource()
finally:
    disconnectResource()

In this example connectToResource() could be anything such as opening a file or connecting to a database, and disconnectResource() could be closing the file or disconnecting from the database.The try-finally block aims to do something with your resource and in any case, closing this resource even if an error happened from the try clause. The finally clause will always be executed.

Now suppose that we are very cautious coders, we may actually do this a lot in our programs to keep our external resources correctly managed. You would agree with me that this syntax is quite verbose. 

To make this pattern more concise and easy-to-use, the python dev-team came up with a generalised version by using an object with two magic methods to implement in order to control the behaviour of an external resource.

class DatabaseConnection(object):

    # make a database connection and return it
    def __enter__(self):
        self.db.connect()
        return self.db

    # close the database connection
    def __exit__(self, _type, _value, _tb):
        self.db.close()
        
with DatabaseConnection() as conn:
    # do something

When the “with” statement is executed, the method __enter__() is called ( creating a context manager ) and assigns the value returned by __enter__() to the variable defined by as which is conn in our example. The variable conn becomes only available within the block below the “with” statement. Once the block ends, the variable conn goes out of scope triggering the special method __exit__() to clean up the resource. Notice that if an exception is thrown from the “with” block, the resource will still be well cleaned.

To sum up, keep in mind that you can implement your own context manager ( in addition to those already existing ) by adding the methods __enter__() and __exit__() in your class definition. You will then be able to instantiate your class using “with” to avoid leaks when you deal with external resources. 

The "with" statement in Apache Airflow

Now we know the basic, let’s focus on the use of “with” in the context of Apache Airflow. In Airflow you have two ways of instantiating a DAG. 

The first one without using a context manager:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('my_dag',
          schedule_interval='0 12 * * *',
          start_date=datetime(2019, 1, 10), catchup=False)
t_1 = DummyOperator(task_id='t_1_task', dag=dag)
t_2 = DummyOperator(task='t_2_task', dag=dag)
t_1 >> t_2

And the second one using a context manager:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

with DAG('my_dag',
          schedule_interval='0 12 * * *',
          start_date=datetime(2019, 1, 10), catchup=False) as dag:
    t_1 = DummyOperator(task_id='t_1_task')
    t_2 = DummyOperator(task='t_2_task')
    t_1 >> t_2

So, what’s the difference? As you may already saw it, in the second case we didn’t add the pass the dag variable as parameter into each Operator. Why? Because it is automatically done thanks to the “with” statement which manages the instantiation of the DAG class and assigns the object into the variable dag. For a better understanding, let’s take a look at what is done from the source code of the class DAG.

Source code from the DAG class of Apache Airflow:

def __enter__(self):
    global _CONTEXT_MANAGER_DAG
    self._old_context_manager_dags.append(_CONTEXT_MANAGER_DAG)
    _CONTEXT_MANAGER_DAG = self
    return self

def __exit__(self, _type, _value, _tb):
    global _CONTEXT_MANAGER_DAG
    _CONTEXT_MANAGER_DAG = self._old_context_manager_dags.pop()

If you remember, when we use the “with” statement the method __enter__() is called first. In this method,

  1. The keyword global affects the variable _CONTEXT_MANAGER_DAG in order to make it writable within the scope of the method. 
  2. The previous value ( can be None ) of _CONTEXT_MANAGER_DAG is appended to the private list  _old_context_manager_dags. Because a list is used you can have multiple context managers (DAG objects) created by the “with” statement.
  3. _CONTEXT_MANAGER_DAG contains the reference (self) of the current instance of the class DAG
  4. The instance of the DAG class is returned.
The method __exit__() when the instance is no longer used,
 
  1. Use the keyword global to make the variable _CONTEXT_MANAGER_DAG writable within the scope of the function.
  2. Reinitialise the _CONTEXT_MANAGER_DAG using its previous value saved from the list _old_context_manager_dags.

Now the question is, where the _CONTEXT_MANAGER_DAG is used and why we don’t need to pass the dag parameter to the Operator objects?

In the source code of the class BaseOperator which is the class that every operator must inherit from we have the following piece of code:

@functools.total_ordering
class BaseOperator(LoggingMixin):
    # some code ...
    if not dag and _CONTEXT_MANAGER_DAG:
        dag = _CONTEXT_MANAGER_DAG
    if dag:
        self.dag = dag

The first condition basically checks if the dag parameter ( this one: DummyOperator(task_id=’t_1_task’, dag=) is unset and the _CONTEXT_MANAGER_DAG variable is set to a value which is then assigned to the variable dag. The second condition checks if the variable dag is set, either from the _CONTEXT_MANAGER_DAG ( by using “with”) or from the dag parameter to assign it to the self.dag attribute of the Operator. That’s why when you use the “with” statement you don’t need to fill the dag parameter anymore for your operators. 

To sum up, you can either initialise your DAG with the “with” statement or not, but it is considered as a best practice to use it since it makes your DAGs easier to manage.

That’s it for today, I hope you have appreciated the article, see you for the next one and have a great learning day!

 

Stay Tuned and Receive Special Promotions!

Leave a Comment

Your email address will not be published. Required fields are marked *