Apache Airflow | How to use the PythonOperator

Wondering how can we run python code through Airflow ? The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG. You may have seen in my course “The Complete Hands-On Course to Master Apache Airflow” that I use this operator extensively in different use cases. Indeed, mastering this operator is a must-have and that’s what we gonna learn in this post by starting with the basics. One more thing, if you like my tutorials, you can support my work by becoming my Patron right here. No obligation but if you want to help me, I will thank you a lot.

Getting started with the PythonOperator in Airflow

Let’s start by looking at the following very short DAG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

def my_func():
    print('Hello from my_func')
 
with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
	dummy_task 	= DummyOperator(task_id='dummy_task', retries=3)
	python_task	= PythonOperator(task_id='python_task', python_callable=my_func)
 
	dummy_task >> python_task

The DAG “python_dag” is composed of two tasks:

  • The task called “dummy_task” which basically does nothing.
  • The task “python_task” which actually executes our Python function called call_me.

In order to know if the PythonOperator calls the function as expected, the message “Hello from my_func” will be printed out into the standard output each time my_func is executed. Copy and paste the dag into a file python_dag.py and add it to the dags/ folder of Airflow. Next, start the webserver and the scheduler and go to the Airflow UI. From there, you should have the following screen:

airflow_ui

Now, trigger the DAG by clicking on the toggle next to the DAG’s name and let the first DAGRun finish. Once it’s done, click on the Graph Icon as shown by the red arrow:

From the Graph View we can visualise the tasks composing your DAG and how do they relate to each others. Click on the task “python_task”, then in the dialog box, click on View Log.

airflow_ui_dialog_box

Finally, if we take a look at the logs produced by the “python_task”, we can see that the message “Hello from my_func” is printed out as expected meaning the function has been well executed using the PythonOperator.

airflow_ui_logs

Notice also the log message “Returned value was: None” indicating that since we didn’t return any value from the function my_func, None is returned. We could return a value just by typing below the print instruction, return my_value, where my_value can be a variable of any type we want.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

def my_func():
    print('Hello from my_func')
    return 'hello from my_func'
 
with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
	dummy_task 	= DummyOperator(task_id='dummy_task', retries=3)
	python_task	= PythonOperator(task_id='python_task', python_callable=my_func)
 
	dummy_task >> python_task

If you run the dag again with this new code, you will get following result in the logs of the task:

airflow_ui_pythonoperator_return
Did you learn something? Become my Patron and get more high quality tutorials

How to pass parameters to PythonOperator in Airflow

Now we know how to call a Python function, it would be very useful to know how to pass parameters as well to this function using the PythonOperator. There are actually two ways of passing parameters.

  • First, we can use the op_args parameter which is a list of positional arguments that will get unpacked when calling the callable function.
  • Second, we can use the op_kwargs parameter which is a dictionary of keyword arguments that will get unpacked in the callable function.

Let’s see an example of both methods using the same DAG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task

Here, we first modified the PythonOperator by adding the parameter op_args sets to a list of string values (it could be any type) since it only accepts a list of positional arguments. Then, in my_func we have the parameter op_args which is unpacked using the ‘*’. We print the arguments given by the PythonOperator and finally, we return the first argument from the op_args list.

If we execute this DAG and go to the logs view of the task python_task like we did before, we get the following results:

pythonoperator_op_args_airflow

Notice that we could specify each argument in the function’s parameters instead of using unpacking which gives exactly the same results as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(p1, p2, p3):
        print(p1)
        return p1

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task

The output:

pythonoperator_op_args_airflow_2

Another way to pass parameters is through the use of op_kwargs. It works exactly as the op_args, the only difference is that instead of passing a list of values, we pass a dictionary of keywords.

Here is an example of a DAG with op_kwargs as you can see in the call of PythonOperator:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

def my_func(**kwargs):
        print(kwargs)
        return kwargs['param_1']

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_kwargs={'param_1': 'one', 'param_2': 'two', 'param_3': 'three'})

        dummy_task >> python_task

We replaced op_args by op_kwargs with a dictionary of key value pairs. Then, in my_func we get back the dictionary through the unpacking of kwargs with the two *. Finally, we display the key value pairs to the standard output and return the value of the key “param_1” which is one.

By triggering this DAG, we obtain the following output:

pythonoperator_op_kwargs_airflow

Conclusion

In this short tutorial we have seen how to call a very basic Python Function with the PythonOperator and how can we pass parameters using the op_args and op_kwargs parameters. We also explored quickly the differences between those two methods. In the next articles, we will discover more advanced use cases of the PythonOperator as it is a very powerful Operator.

Interested by learning more? Stay tuned and get special promotions!

Liked it? Join the Patreon Community and get an access to exclusive content now!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top