Airflow in Lineage

Integrate Monte Carlo with Airflow to see DAGs & Tasks linked to Data Assets (Tables, Views, etc.) and also DAGs & Tasks related to tables in a Monte Carlo incident. Monte Carlo relies on query tagging to ingest DAGs & Tasks related to tables.

Which style of tags/labels?

  • Snowflake query tags
  • BigQuery labels
  • Query comments for warehouses which do not support tagging
  • dbt + Snowflake
  • dbt + other warehouses
  • Using cluster policies

The tagged queries executed via Airflow will be later ingested by Monte Carlo via the usual Query Log job. Then as part Query Log processing Monte Carlo will enhance lineage information with Airflow DAGs & Tasks whenever it is present.

πŸ“˜

Time for Airflow information to show up in lineage

It might take up to 12 hours for the first time query tags are ingested to show up in lineage.

Monte Carlo will look for the following tags in queries:

  • DAG: dag_id (recommended), dag, dag_name, airflow_dag_id, airflow_dag, AIRFLOW_DAG_ID, DAG_ID
  • Task: task_id (recommended), task, task_name, airflow_task, airflow_task_id, AIRFLOW_TASK_ID, TASK_ID

Snowflake

To set query tags on Snowflake, add DAG & task information as a JSON string to session_parameters in the SnowflakeOperator. Example:

task = SnowflakeOperator(
    task_id='MyTask',
    sql="...",
    session_parameters={
        "QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task"}'  # Stringified JSON
    },
    ...
)

πŸ“˜

Using Airflow templates

Unfortunately it is not possible to use Airflow templates in session_parameters, so every DAG id and task id need to be hardcoded. See the section Using Cluster Policies below for an alternative approach.

BigQuery

To set labels on BigQuery, add DAG & task information as a Python dict to labels in any of the BigQuery operators. The labels field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:

task = BigQueryExecuteQueryOperator(
    task_id='MyTask',
    sql="...",
    labels={
		    "dag_id": "{{ dag.dag_id }}",
		    "task_id": "{{ task.task_id }}"
	  },
    ...
)

πŸ“˜

Using Airflow templates

This approach requires that every operator instance is changed. For a more convenient approach, see the section Using Cluster Policies below.

Other Warehouses

For warehouses which do not support query tagging, add DAG & Task information as a JSON string as a query comments. The sql field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:

task = RedshiftSQLOperator(
    task_id='MyTask',
    sql="""
        -- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
        CREATE TABLE IF NOT EXISTS my_table (
            id INTEGER,
            name VARCHAR
        );
        """,
    ...
)
task = SQLExecuteQueryOperator(
    task_id='MyTask',
    sql="""
        -- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
        CREATE TABLE IF NOT EXISTS my_table (
            id INTEGER,
            name VARCHAR
        );
        """,
    ...
)

dbt + Snowflake

dbt Core

If you use Airflow to invoke dbt Core for your Snowflake transformations, you can leverage dbt to automatically tag the queries by adding a macro to the dbt project. For more information, check the dbt Snowflake Query Tags documentation.

An example dbt macro might look like:

{% macro set_query_tag() -%}
  {% set query_tag = {
    "dbt_model": model.name,
    "task_id": env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
    "dag_id": env_var("AIRFLOW_CTX_DAG_ID", ""),
  } %}
  {% set new_query_tag = tojson(query_tag) %} 
  {% if new_query_tag %}
    {% set original_query_tag = get_current_query_tag() %}
    {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
    {% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
    {{ return(original_query_tag)}}
  {% endif %}
  {{ return(none)}}
{% endmacro %}

And add the query_tag to the dbt_project.yml file.

models:
  +query_tag: dbt

dbt Cloud

For dbt Cloud the same approach can be used as with dbt Core, but you need to:

  • adjust the macro slightly
  • update the Airflow operator used to invoke dbt Cloud

In the macro, you need to use dbt variables instead of environment variables, for example you can use dag_id and task_id variables:

"task_id": var("task_id", "local_run"),
"dag_id": var("dag_id", ""),

In the Airflow operator you're using to run the dbt Cloud Job, you need to pass the variables expected by the macro. For example, if you're using DbtCloudRunJobOperator to run a job with a single dbt build step, you can pass the dag_id and task_id variables this way:

trigger_dbt_cloud_job_run = DbtCloudRunJobOperator(
	task_id="trigger_dbt_cloud_job_run",
	job_id=456621,
	check_interval=10,
	timeout=300,
	steps_override=[f'dbt build --vars "{{dag_id: \\\"{dag.dag_id}\\\", task_id: \\\"trigger_dbt_cloud_job_run\\\"}}"']
)

dbt + Other Warehouses

dbt Core

If you use Airflow to invoke dbt Core for your transformations in other warehouses, you can leverage dbt to automatically add a comment to the queries by adding a macro to the dbt project. For more information, check the dbt Query Comment documentation.

An example dbt query comment macro might look like

{% macro airflow_query_comment(node) %}
    {%- set comment_dict = {} -%}
    {%- do comment_dict.update(
        app='dbt',
        dbt_version=dbt_version,
        profile_name=target.get('profile_name'),
        target_name=target.get('target_name'),
        task_id=env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
        dag_id=env_var("AIRFLOW_CTX_DAG_ID", ""),
    ) -%}
    {%- if node is not none -%}
      {%- do comment_dict.update(
        file=node.original_file_path,
        node_id=node.unique_id,
        node_name=node.name,
        resource_type=node.resource_type,
        package_name=node.package_name,
        relation={
            "database": node.database,
            "schema": node.schema,
            "identifier": node.identifier
        }
      ) -%}
    {% else %}
      {%- do comment_dict.update(node_id='internal') -%}
    {%- endif -%}
    {% do return(tojson(comment_dict)) %}
{% endmacro %}

And add the query-comment to the dbt_project.yml file.

query-comment: "{{ airflow_query_comment(node) }}"

Version Compatibility

  • For Airflow + dbt + Redshift, the dbt version must be >= v1.6

dbt Cloud

For dbt Cloud the same approach can be used as with dbt Core, but you need to:

  • adjust the macro slightly
  • update the Airflow operator used to invoke dbt Cloud

In the macro, you need to use dbt variables instead of environment variables, for example you can use dag_id and task_id variables:

"task_id": var("task_id", "local_run"),
"dag_id": var("dag_id", ""),

In the Airflow operator you're using to run the dbt Cloud Job, you need to pass the variables expected by the macro. For example, if you're using DbtCloudRunJobOperator to run a job with a single dbt build step, you can pass the dag_id and task_id variables this way:

trigger_dbt_cloud_job_run = DbtCloudRunJobOperator(
	task_id="trigger_dbt_cloud_job_run",
	job_id=456621,
	check_interval=10,
	timeout=300,
	steps_override=[f'dbt build --vars "{{dag_id: \\\"{dag.dag_id}\\\", task_id: \\\"trigger_dbt_cloud_job_run\\\"}}"']
)

Using Cluster Policies

As documented in Airflow docs here Cluster Policies can be used to check or mutate DAGs or Tasks on a cluster-wide level.

Using a task policy you can update all operators of a given type to have the query tags automatically set, for example for Snowflake:

from airflow.models import BaseOperator
import json

def task_policy(task: BaseOperator):
	if task.task_type == "SnowflakeOperator":
		session_parameters = {
			"QUERY_TAG": json.dumps({
				"dag_id": task.dag_id,
				"task_id": task.task_id,
			}),
		}
		if hasattr(task, "hook_params"):  # Airflow >= v2.5?
			task.hook_params['session_parameters'] = session_parameters
		else:
			task.session_parameters = session_parameters

This content needs to be saved in a file named airflow_local_settings.py in AIRFLOW_HOME/config folder, in environments like MWAA or Composer the file can be stored in the dags folder, you might need to restart the Airflow instance for the changes to take effect.

Multiple Airflow Environments

If you have multiple airflow environments, you may specify an environment tag in queries.

The Environment tag is required in the situation where there are shared dag and task ids between those environments. Otherwise Monte Carlo will not know which Dag/Task the query is referring to, and we will not associate that query with any Dag/Task.

Monte Carlo will look for the following environment tags in queries:

airflow_env(recommended), environment, airflow_environment

The value of the environment tag must be the name given to the Airflow Connection in Monte Carlo.

Example:

task = SnowflakeOperator(
    task_id='MyTask',
    sql="...",
    session_parameters={
        "QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task", "airflow_env": "Airflow-Prod"}'  # Stringified JSON
    },
    ...
)