The PostgresOperator: All you need to know

One of the first operators I discovered with Airflow was the Postgres Operator. The Postgres Operator allows you to interact with your Postgres database. Whether you want to create a table, delete records, insert records, you will use the PostgresOperator. Nonetheless, you will quickly be faced to some questions. How can I get records from it? How can I pass parameters to my SQL requests? And others that we are going to answer in this article. By the way, if you want to start mastering Airflow, you should definitely take a look my course right here: Apache Airflow: The Hands-On Guide.

Ready? Let’s go!

Simple requests

The purpose of the PostgresOperator is to execute sql requests in a specific Postgres database. Two parameters are required: sql and postgres_conn_id. Below is the most basic way of instantiating a task with the PostgresOperator.

1
2
3
4
5
    inserting_data = PostgresOperator(
        task_id='inserting_data',
        postgres_conn_id='postgres_default',
        sql='INSERT INTO my_table VALUES ("val")'
    )

Here, we insert the value “val” in the table “my_table”. Notice that you can execute multiple requests at once by passing a list of SQL requests.

1
2
3
4
5
6
7
8
    inserting_data = PostgresOperator(
        task_id='inserting_data',
        postgres_conn_id='postgres_default',
        sql=[
            'TRUNCATE TABLE my_table',
            'INSERT INTO my_table VALUES ("val")',
        ]
    )

In the example above, we execute two requests. One to truncate/empty the table “my_table” and the other one to insert a new row. Now, what about if you want to create a table?

Creating a table

We have seen how to execute some SQL requests like Truncate and Insert, what about creating a table? The naive way of creating a table with the PostgresOperator is this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    request = PostgresOperator(
        task_id='request',
        postres_conn_id='postgres_default',
        sql='''
            CREATE TABLE my_table (
                table_date DATE NOT NULL,
                table_value TEXT NOT NULL,
                PRIMARY KEY (table_date)
            );
        '''
    )

Can you see the issues here?

The first one is more stylistic. Having your long SQL requests embedded in the PostgresOperator will make your DAG harder to read, impacting its maintainability. The second issue, which definitely more critical, is that you won’t be able to execute that task twice. Why? Because if the table already exists, you will end up with an error so your data pipeline in failure.

Make sure that your SQL requests are idempotent : For a given input, I always get the same output whenever I run the task. This is super important. If for some reasons you want to re-execute your data pipeline in past, you won’t be able to do it if your SQL requests are not designed to do so.

The solution?

Create a folder sql/ in your folder dags/. Then, create a file CREATE_TABLE_my_table.sql with the SQL request in it.

1
2
3
4
5
6
/* CREATE_TABLE_my_table.sql */
CREATE TABLE IF NOT EXISTS my_table (
    table_date DATE NOT NULL,
    table_value TEXT NOT NULL,
    PRIMARY KEY (table_date)
);

Notice the “IF NOT EXISTS” making the SQL request idempotent. Then in the PostgresOperator:

1
2
3
4
5
    request = PostgresOperator(
        task_id='request',
        postres_conn_id='postgres_default',
        sql='sql/CREATE_TABLE_my_table.sql'
    )

This is much cleaner isn’t it?

Getting records from the PostgresOperator

What about if we want to get records from a SELECT request?

1
2
3
4
5
    request = PostgresOperator(
        task_id='request',
        postres_conn_id='postgres_default',
        sql='SELECT * FROM connection;'
    )

Well, you can’t. There is no way to get records from the PostgresOperator. It hasn’t been designed for that which makes sense. Don’t forget that Airflow is an orchestrator, not a processing framework. Your data should be processed in a tier tool and not directly inside Airflow as I can see it too many times with HUUUGE panda dataframes 😉

If you truly want to get some data from your Postgres database, you will need to use the PostgresHook. Long time ago I made the video below to show this in action (what I show is still valid)

Autocommit?

There is an option is the PostgresOperator called commit with the following comment:

autocommit: if True, each command is automatically committed.

What does that means? In SQL databases there is a very important concept called Transactions. A transaction is the propagation of one or more changes to the database. 4 standard properties (ACID) are respected:

  1. Atomicity – Ensures that all operations within the transaction are completely successfully, otherwise the transaction is aborted (avoid inconsistency and partial result)
  2. Consistency – Ensures that the database properly changes states upon a committed transaction.
  3. Isolation – Transactions operator independently to each other.
  4. Durability – Ensures that the result of a transaction persists in the db in case of a failure

Basically, there a three commands to control a transaction, BEGIN TRANSACTION, COMMIT and ROLLBACK. The commit command saves all transactions (sql requests) to the database since the last COMMIT (or previous successfully executed SQL requests). The point is, commit indicates that the transaction/sql requests can be applied to the database.

One this you have to know is that all SQL requests in Postgres, are automatically committed for you. So you actually don’t need to set this parameter to True and that’s why it is set to False by default. This parameter is pretty useless in my opinion.

The PostgresOperator: The advanced

Parameters

With the PostgresOperator, it is possible to pass parameters/values to your SQL requests. This is extremely useful for two reasons:

  1. The parameters/values are grouped at the same place increasing the clarity of your DAG
  2. You can inject those parameters/values at runtime and so, avoid hardcoding data in your SQL requests.

Let’s say we would like to pass an age to our SQL request. As shown from the documentation, the way to do this is to use the argument “parameters” of the PostgresOperator. Parameters either takes a list/tuple or a dictionary. Let’s take a look at the example below:

1
2
3
4
5
6
    check_age = PostgresOperator(
        task_id='check_age',
        postgres_conn_id='postgres_default',
        sql="SELECT CASE WHEN %s > 21 THEN 'adult' ELSE 'young' END",
        parameters=[30]
    )

The output is the following:

postgres_output

In this example, the placeholder %s will be replaced by the value 30 in the parameters. Furthermore, you can see that the value is well used as shown from the output “with parameters [30]”. What about if we want to use a dictionary instead of a list?

1
2
3
4
5
6
    check_age = PostgresOperator(
        task_id='check_age',
        postgres_conn_id='postgres_default',
        sql="SELECT CASE WHEN %(age)s > 21 THEN 'adult' ELSE 'young' END",
        parameters={ 'age': 30 }
    )

Pay attention to the special syntax %(thekey)s.

Now, as we want to follow best practices, let’s put the SQL request in a file SELECT_AGE.sql. The content of the file is the following:

1
2
/*  dags/SELECT_AGE.sql */
SELECT CASE WHEN %(age)s > 21 THEN 'adult' ELSE 'young' END

And the corresponding task:

1
2
3
4
5
6
    check_age = PostgresOperator(
        task_id='check_age',
        postgres_conn_id='postgres_default',
        sql="sql/SELECT_AGE.sql",
        parameters={ 'age': 30 }
    )

As you can see, it’s almost the same code, except that this time, the SQL request is in a dedicated file to make your DAG cleaner and better organized. When you create a data pipeline, always think about maintainability, this is super important.

Params

Like any other operator, the PostgresOperator inherits from the BaseOperator. That means, all arguments defined in the BaseOperator are accessible from the PostgresOperator and one very interesting argument is “params”. params allows you to pass additional values to your operators. Let me give you an example.

If we keep the same SQL request:

1
2
/*  dags/SELECT_AGE.sql */
SELECT CASE WHEN %(age)s > 21 THEN 'adult' ELSE 'young' END

But instead of using parameters we use params

1
2
3
4
5
6
    check_age = PostgresOperator(
        task_id='check_age',
        postgres_conn_id='postgres_default',
        sql="sql/SELECT_AGE.sql",
        params={ 'age': 30 }
    )

You will end up with the following error:

error_params

Why? Because params doesn’t work like the argument parameters. The difference between params and parameters is that parameters is specific to the PostgresOperator whereas params is common to all operators. In the case of the PostgresOperator, if you want to access params, you need to use the Jinja template syntax. If you don’t know what Jinja is, take a look at my article here.

So, how can we access the params and so the value “age” from the sql file? Like that:

1
2
/*  SELECT_AGE.sql */
SELECT CASE WHEN {{ params.age }} > 21 THEN 'adult' ELSE 'young' END

Can you see the pair of curly brackets {{ }} ? This is the special Jinja syntax to inject data at runtime in your templates. If you execute the task again, here is the output:

The bottom line is, don’t forget to make the difference between params and parameters. For example, if you try to access parameters with {{ parameters.age }} it won’t work. In that case you have to use the postgres notation %(age)s. If you want to access params, you need to use the curly brackets as shown previously. Those are two distinct arguments and yes, you could use both of them.

Is there one better than the other? No. params and parameters have the same functionality.Keep in mind that params leverages the Jinja template engine.

Conclusion

That’s it about the PostgresOperator. I hope you really enjoyed what you’ve learned. Airflow is a really powerful orchestrator with many operators to discover. If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go way much further, enrol in my 12 hours course here.

Have a great day! 🙂

Interested by learning more? Stay tuned and get special promotions!

4 thoughts on “The PostgresOperator: All you need to know”

  1. Confused about “There is no way to get records from the PostgresOperator”.
    Early on, you suggest that the way to extract data from Postgre is thru PostgresHook. But, in the Advanced topic, you clearly show supplying SQL to PostgresOperator.

    Can you elaborate, what we CAN do, what we CAN`T.

    1. Sure,
      You can’t get records directly from the PostgresOperator but
      You can get records by using the PostgresHook.
      The PostgresHook is used by the PostgresOperator but the PostgresOperator doesn’t allow you to get records.
      Does that makes sense? 🙂

  2. Marc,
    I recently enrolled in your course(Udemy). I have a few questions regarding the KubernetesPodOperator. 1. I’m running airflow on Kubernetes env and to execute PostgresOperator tasks on it I usually end up with quota issues. How do I configure my config in values.yaml to reserve resources for executing base operator tasks?
    2. Is airflow on Kubernetes only reserved for executing DAG’s using KubernetesPodOperator ?
    3. Is there way I can run leverage the base operators(python, bash, postgres etc.,) using KubernetesPodOperator?

  3. There’s a reason many don’t use the Postgres Operator.

    The majority use-case for an insertion in airflow is that you’re inserting data that was generated upstream.

    That means the parameters field needs to be populated by some code that “does work”, and that work needs to be done only when the entire operator starts executing, not when your dag is parsed.

    An example is if you did this:

    inserting_data = PostgresOperator(
    task_id=’inserting_data’,
    postgres_conn_id=’postgres_default’,
    sql=’INSERT INTO my_table VALUES (?, ?, ?)’,
    parameters=fetch_my_csv_as_list()
    )

    the function would attempt to run by the scheduler before the task even starts.

    This is why python operator is often preferred, because it can run fetch_my_csv_as_list() at execution time, and then couple those parameters with a postgres request/connection.

    This tutorial (like many out there for PostgresOperator) escapes this by using hardcoded values in the example.

Leave a Comment

Your email address will not be published. Required fields are marked *