Wondering how can we run python code through Airflow ? The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG. You may have seen in my course “The Complete Hands-On Course to Master Apache Airflow” that I use this operator extensively in different use cases. Indeed, mastering this operator is a must-have and that’s what we gonna learn in this post by starting with the basics. One more thing, if you like my tutorials, you can support my work by becoming my Patron right here. No obligation but if you want to help me, I will thank you a lot.
Getting started with the PythonOperator in Airflow
Let’s start by looking at the following very short DAG
The DAG “python_dag” is composed of two tasks:
- The task called “dummy_task” which basically does nothing.
- The task “python_task” which actually executes our Python function called call_me.
In order to know if the PythonOperator calls the function as expected, the message “Hello from my_func” will be printed out into the standard output each time my_func is executed. Copy and paste the dag into a file python_dag.py and add it to the dags/ folder of Airflow. Next, start the webserver and the scheduler and go to the Airflow UI. From there, you should have the following screen:
Now, trigger the DAG by clicking on the toggle next to the DAG’s name and let the first DAGRun finish. Once it’s done, click on the Graph Icon as shown by the red arrow:
From the Graph View we can visualise the tasks composing your DAG and how do they relate to each others. Click on the task “python_task”, then in the dialog box, click on View Log.
Finally, if we take a look at the logs produced by the “python_task”, we can see that the message “Hello from my_func” is printed out as expected meaning the function has been well executed using the PythonOperator.
Notice also the log message “Returned value was: None” indicating that since we didn’t return any value from the function my_func, None is returned. We could return a value just by typing below the print instruction, return my_value, where my_value can be a variable of any type we want.
If you run the dag again with this new code, you will get following result in the logs of the task:
How to pass parameters to PythonOperator in Airflow
Now we know how to call a Python function, it would be very useful to know how to pass parameters as well to this function using the PythonOperator. There are actually two ways of passing parameters.
- First, we can use the op_args parameter which is a list of positional arguments that will get unpacked when calling the callable function.
- Second, we can use the op_kwargs parameter which is a dictionary of keyword arguments that will get unpacked in the callable function.
Let’s see an example of both methods using the same DAG
Here, we first modified the PythonOperator by adding the parameter op_args sets to a list of string values (it could be any type) since it only accepts a list of positional arguments. Then, in my_func we have the parameter op_args which is unpacked using the ‘*’. We print the arguments given by the PythonOperator and finally, we return the first argument from the op_args list.
If we execute this DAG and go to the logs view of the task python_task like we did before, we get the following results:
Notice that we could specify each argument in the function’s parameters instead of using unpacking which gives exactly the same results as shown below:
Another way to pass parameters is through the use of op_kwargs. It works exactly as the op_args, the only difference is that instead of passing a list of values, we pass a dictionary of keywords.
Here is an example of a DAG with op_kwargs as you can see in the call of PythonOperator:
We replaced op_args by op_kwargs with a dictionary of key value pairs. Then, in my_func we get back the dictionary through the unpacking of kwargs with the two *. Finally, we display the key value pairs to the standard output and return the value of the key “param_1” which is one.
By triggering this DAG, we obtain the following output:
In this short tutorial we have seen how to call a very basic Python Function with the PythonOperator and how can we pass parameters using the op_args and op_kwargs parameters. We also explored quickly the differences between those two methods. In the next articles, we will discover more advanced use cases of the PythonOperator as it is a very powerful Operator.