Wondering how can we run python code through Airflow ? The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG. You may have seen in my course “The Complete Hands-On Course to Master Apache Airflow” that I use this operator extensively in different use cases. Indeed, mastering this operator is a must-have and that’s what we gonna learn in this post by starting with the basics. One more thing, if you like my tutorials, you can support my work by becoming my Patron right here. No obligation but if you want to help me, I will thank you a lot.
Getting started with the PythonOperator in Airflow
Let’s start by looking at the following very simple DAG
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime def my_func(): print('Hello from my_func') with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag: dummy_task = DummyOperator(task_id='dummy_task', retries=3) python_task = PythonOperator(task_id='python_task', python_callable=my_func) dummy_task >> python_task |
The DAG “python_dag” is composed of two tasks:
- The task called “dummy_task” which basically does nothing.
- The task “python_task” which actually executes our Python function called call_me.
In order to know if the PythonOperator calls the function as expected, the message “Hello from my_func” will be printed out into the standard output each time my_func is executed. Copy and paste the dag into a file python_dag.py and add it to the dags/ folder of Airflow. Next, start the webserver and the scheduler and go to the Airflow UI. From there, you should have the following screen:
Now, trigger the DAG by clicking on the toggle next to the DAG’s name and let the first DAGRun to finish. Once it’s done, click on the Graph Icon as shown by the red arrow:
From the Graph View, we can visualise the tasks composing the DAG and how they depend to each other. Click on the task “python_task”, then in the dialog box, click on View Log.
Finally, if we take a look at the logs produced by the “python_task”, we can see that the message “Hello from my_func” has been printed as expected. Meaning, the function has been well executed using the PythonOperator.
Notice also the log message “Returned value was: None” indicating that since we didn’t return any value from the function my_func, None is returned. We could return a value just by typing below the print instruction, return my_value, where my_value can be a variable of any type we want.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime def my_func(): print('Hello from my_func') return 'hello from my_func' with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag: dummy_task = DummyOperator(task_id='dummy_task', retries=3) python_task = PythonOperator(task_id='python_task', python_callable=my_func) dummy_task >> python_task |
If you run the dag again with this new code, you will get following result in the logs of the task:
How to pass parameters to PythonOperator in Airflow
Now we know how to call a Python function, it would be very useful to know how to pass parameters as well to this function using the PythonOperator. There are actually two ways of passing parameters.
- First, we can use the op_args parameter which is a list of positional arguments that will get unpacked when calling the callable function.
- Second, we can use the op_kwargs parameter which is a dictionary of keyword arguments that will get unpacked in the callable function.
Let’s see an example of both methods using the same DAG
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from time import sleep from datetime import datetime def my_func(*op_args): print(op_args) return op_args[0] with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag: dummy_task = DummyOperator(task_id='dummy_task', retries=3) python_task = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three']) dummy_task >> python_task |
Here, we first modified the PythonOperator by adding the parameter op_args sets to a list of string values (it could be any type) since it only accepts a list of positional arguments. Then, in my_func we have the parameter op_args which is unpacked using the ‘*’. We print the arguments given by the PythonOperator and finally, we return the first argument from the op_args list.
If we execute this DAG and go to the logs view of the task python_task like we did before, we get the following results:
Notice that we could specify each argument in the function’s parameters instead of using unpacking which gives exactly the same results as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from time import sleep from datetime import datetime def my_func(p1, p2, p3): print(p1) return p1 with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag: dummy_task = DummyOperator(task_id='dummy_task', retries=3) python_task = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three']) dummy_task >> python_task |
The output:
Another way to pass parameters is through the use of op_kwargs. It works exactly as the op_args, the only difference is that instead of passing a list of values, we pass a dictionary of keywords.
Here is an example of a DAG with op_kwargs as you can see in the call of PythonOperator:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime def my_func(**kwargs): print(kwargs) return kwargs['param_1'] with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag: dummy_task = DummyOperator(task_id='dummy_task', retries=3) python_task = PythonOperator(task_id='python_task', python_callable=my_func, op_kwargs={'param_1': 'one', 'param_2': 'two', 'param_3': 'three'}) dummy_task >> python_task |
We replaced op_args by op_kwargs with a dictionary of key value pairs. Then, in my_func we get back the dictionary through the unpacking of kwargs with the two *. Finally, we display the key value pairs to the standard output and return the value of the key “param_1” which is one.
By triggering this DAG, we obtain the following output:
Conclusion
In this short tutorial we have seen how to call a very basic Python Function with the PythonOperator and how can we pass parameters using the op_args and op_kwargs parameters. We also explored quickly the differences between those two methods. In the next articles, we will discover more advanced use cases of the PythonOperator as it is a very powerful Operator.
Hi, Thanks for sharing nice articles…
can we run a python code in specific hose using python operator like how we ssh and run in ssh operator or we will have to use the round about approach of calling the python script via a ssh operator?
Great article! Any idea when will the next articles be available (advanced use cases of the PythonOperator)? I’d be really interested to learn about best practices to execute external python scripts using this operator (for example: where to put the scripts and make them executable by airflow). Thanks.
How would one include logging functionality to python callables? As the BaseOperator offers its logger attribute, I would like to reuse exactly this logger in the callable, is that possible?
Does anyone know In a Dag ,how to call a function of an external python script and need to pass input parameter to it’s function? Ex: I have a DAG by name dag_1 and i need to a call a function gs_csv(5 input parameters ) in the python script gsheet.py (accessible by DAG) .Please let me know
Good article. Thank. And how to call this dag with *arfgs and **kwargs from REST API?