Skip to content

Integrate Apache Airflow with Tiger Cloud

Author, schedule, and monitor workflows to orchestrate your data pipelines

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. You declare a DAG in a Python file in the $AIRFLOW_HOME/dags folder of your Airflow instance.

This page shows you how to use a Python connector in a DAG to integrate Apache Airflow with a Tiger Cloud service.

To follow the procedure on this page you need to:

This example DAG uses the company table you create in Optimize time-series data in hypertables

To install the Python libraries required to connect to Tiger Cloud:

  1. Enable PostgreSQL connections between Airflow and Tiger Cloud

    Terminal window
    pip install psycopg2-binary
  2. Enable PostgreSQL connection types in the Airflow UI

    Terminal window
    pip install apache-airflow-providers-postgres

Create a connection between Airflow and your service

Section titled “Create a connection between Airflow and your service”

In your Airflow instance, securely connect to your Tiger Cloud service:

  1. Run Airflow

    On your development machine, run the following command:

    Terminal window
    airflow standalone

    The username and password for Airflow UI are displayed in the standalone | Login with username line in the output.

  2. Add a connection from Airflow to your Tiger Cloud service

    1. In your browser, navigate to localhost:8080, then select Admin > Connections.
    2. Click + (Add a new record), then use your connection info to fill in the form. The Connection Type is Postgres.

Exchange data between Airflow and your service

Section titled “Exchange data between Airflow and your service”

To exchange data between Airflow and your Tiger Cloud service:

  1. Create and execute a DAG

    To insert data in your Tiger Cloud service from Airflow:

    1. In $AIRFLOW_HOME/dags/timescale_dag.py, add the following code:

      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      def insert_data_to_timescale():
      hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
      conn = hook.get_conn()
      cursor = conn.cursor()
      """
      This could be any query. This example inserts data into the table
      you create in:
      https://www.tigerdata.com/docs/getting-started/latest/try-key-features-timescale-products/#optimize-time-series-data-in-hypertables-with-hypercore
      """
      cursor.execute("INSERT INTO crypto_assets (symbol, name) VALUES (%s, %s)",
      ('NEW/Asset','New Asset Name'))
      conn.commit()
      cursor.close()
      conn.close()
      default_args = {
      'owner': 'airflow',
      'start_date': datetime(2023, 1, 1),
      'retries': 1,
      }
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      insert_task = PythonOperator(
      task_id='insert_data',
      python_callable=insert_data_to_timescale,
      dag=dag,
      )

      This DAG uses the company table created in Create regular PostgreSQL tables for relational data.

    2. In your browser, refresh the Airflow UI.

    3. In Search DAGS, type timescale_dag and press ENTER.

    4. Press the play icon and trigger the DAG:

      Apache Airflow DAG results showing daily ETH transaction volume
  2. Verify that the data appears in Tiger Cloud

    1. In Tiger Console, navigate to your service and click SQL editor.

    2. Run a query to view your data. For example: SELECT symbol, name FROM company;.

      You see the new rows inserted in the table.

You have successfully integrated Apache Airflow with Tiger Cloud and created a data pipeline.