Airflow in Lineage

Integrate Monte Carlo with Airflow to see DAGs & Tasks linked to Data Assets (Tables, Views, etc.) and also DAGs & Tasks related to tables in a Monte Carlo incident. Monte Carlo relies on query tagging to ingest DAGs & Tasks related to tables.

Which style of tags/labels?

  • Snowflake query tags
  • BigQuery labels
  • Query comments for warehouses which do not support tagging
  • dbt + Snowflake
  • dbt + other warehouses
  • Using cluster policies

The tagged queries executed via Airflow will be later ingested by Monte Carlo via the usual Query Log job. Then as part Query Log processing Monte Carlo will enhance lineage information with Airflow DAGs & Tasks whenever it is present.

πŸ“˜

Time for Airflow information to show up in lineage

It might take up to 12 hours for the first time query tags are ingested to show up in lineage.

Monte Carlo will look for the following tags in queries:

  • DAG: dag_id (recommended), dag, dag_name, airflow_dag_id, airflow_dag, AIRFLOW_DAG_ID, DAG_ID
  • Task: task_id (recommended), task, task_name, airflow_task, airflow_task_id, AIRFLOW_TASK_ID, TASK_ID

Snowflake

To set query tags on Snowflake, add DAG & task information as a JSON string to session_parameters in the SnowflakeOperator. Example:

task = SnowflakeOperator(
    task_id='MyTask',
    sql="...",
    session_parameters={
        "QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task"}'  # Stringified JSON
    },
    ...
)

πŸ“˜

Using Airflow templates

Unfortunately it is not possible to use Airflow templates in session_parameters, so every DAG id and task id need to be hardcoded. See the section Using Cluster Policies below for an alternative approach.

BigQuery

To set labels on BigQuery, add DAG & task information as a Python dict to labels in any of the BigQuery operators. The labels field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:

task = BigQueryExecuteQueryOperator(
    task_id='MyTask',
    sql="...",
    labels={
		    "dag_id": "{{ dag.dag_id }}",
		    "task_id": "{{ task.task_id }}"
	  },
    ...
)

πŸ“˜

Using Airflow templates

This approach requires that every operator instance is changed. For a more convenient approach, see the section Using Cluster Policies below.

Other Warehouses

For warehouses which do not support query tagging, add DAG & Task information as a JSON string as a query comments. The sql field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:

task = RedshiftSQLOperator(
    task_id='MyTask',
    sql="""
        -- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
        CREATE TABLE IF NOT EXISTS my_table (
            id INTEGER,
            name VARCHAR
        );
        """,
    ...
)
task = SQLExecuteQueryOperator(
    task_id='MyTask',
    sql="""
        -- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
        CREATE TABLE IF NOT EXISTS my_table (
            id INTEGER,
            name VARCHAR
        );
        """,
    ...
)

dbt + Snowflake

If you use Airflow to invoke dbt for your Snowflake transformations, you can leverage dbt to automatically tag the queries by adding a macro to the dbt project. For more information, check the dbt Snowflake Query Tags documentation.

An example dbt macro might look like:

{% macro set_query_tag() -%}
  {% set query_tag = {
    "dbt_model": model.name,
    "task_id": env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
    "dag_id": env_var("AIRFLOW_CTX_DAG_ID", ""),
  } %}
  {% set new_query_tag = tojson(query_tag) %} 
  {% if new_query_tag %}
    {% set original_query_tag = get_current_query_tag() %}
    {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
    {% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
    {{ return(original_query_tag)}}
  {% endif %}
  {{ return(none)}}
{% endmacro %}

And add the query_tag to the dbt_project.yml file.

models:
  +query_tag: dbt

dbt + Other Warehouses

If you use Airflow to invoke dbt for your transformations in other warehosues, you can leverage dbt to automatically add a comment to the queries by adding a macro to the dbt project. For more information, check the dbt Query Comment documentation.

An example dbt query comment macro might look like

{% macro airflow_query_comment(node) %}
    {%- set comment_dict = {} -%}
    {%- do comment_dict.update(
        app='dbt',
        dbt_version=dbt_version,
        profile_name=target.get('profile_name'),
        target_name=target.get('target_name'),
        task_id=env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
        dag_id=env_var("AIRFLOW_CTX_DAG_ID", ""),
    ) -%}
    {%- if node is not none -%}
      {%- do comment_dict.update(
        file=node.original_file_path,
        node_id=node.unique_id,
        node_name=node.name,
        resource_type=node.resource_type,
        package_name=node.package_name,
        relation={
            "database": node.database,
            "schema": node.schema,
            "identifier": node.identifier
        }
      ) -%}
    {% else %}
      {%- do comment_dict.update(node_id='internal') -%}
    {%- endif -%}
    {% do return(tojson(comment_dict)) %}
{% endmacro %}

And add the query-comment to the dbt_project.yml file.

query-comment: "{{ airflow_query_comment(node) }}"

Version Compatibility

  • For Airflow + dbt + Redshift, the dbt must be >= v1.6

Using Cluster Policies

As documented in Airflow docs here Cluster Policies can be used to check or mutate DAGs or Tasks on a cluster-wide level.

Using a task policy you can update all operators of a given type to have the query tags automatically set, for example for Snowflake:

from airflow.models import BaseOperator
import json

def task_policy(task: BaseOperator):
	if task.task_type == "SnowflakeOperator":
		session_parameters = {
			"QUERY_TAG": json.dumps({
				"dag_id": task.dag_id,
				"task_id": task.task_id,
			}),
		}
		if hasattr(task, "hook_params"):  # Airflow >= v2.5?
			task.hook_params['session_parameters'] = session_parameters
		else:
			task.session_parameters = session_parameters

This content needs to be saved in a file named airflow_local_settings.py in AIRFLOW_HOME/config folder, in environments like MWAA or Composer the file can be stored in the dags folder, you might need to restart the Airflow instance for the changes to take effect.