Airflow in Lineage
Integrate Monte Carlo with Airflow to see DAGs & Tasks linked to Data Assets (Tables, Views, etc.) and also DAGs & Tasks related to tables in a Monte Carlo incident. Monte Carlo relies on query tagging to ingest DAGs & Tasks related to tables.
Airflow Connection Required for Airflow in Lineage
An Airflow connection in Monte Carlo is necessary to display Airflow lineage. If you have a single connection, it is not required to send an environment tag. However, if there are multiple connections, please refer to the "Multiple Airflow Environments" section in this document.
Which style of tags/labels?
- Snowflake query tags
- BigQuery labels
- Query comments for warehouses which do not support tagging
- dbt + Snowflake
- dbt + other warehouses
- Using cluster policies
The tagged queries executed via Airflow will be later ingested by Monte Carlo via the usual Query Log job. Then as part Query Log processing Monte Carlo will enhance lineage information with Airflow DAGs & Tasks whenever it is present.
Time for Airflow information to show up in lineage
It might take up to 12 hours for the first time query tags are ingested to show up in lineage.
Monte Carlo will look for the following tags in queries:
- DAG:
dag_id
(recommended),dag
,dag_name
,airflow_dag_id
,airflow_dag
,AIRFLOW_DAG_ID
,DAG_ID
- Task:
task_id
(recommended),task
,task_name
,airflow_task
,airflow_task_id
,AIRFLOW_TASK_ID
,TASK_ID
Snowflake
To set query tags on Snowflake, add DAG & task information as a JSON string to session_parameters
in the SnowflakeOperator. Example:
task = SnowflakeOperator(
task_id='MyTask',
sql="...",
session_parameters={
"QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task"}' # Stringified JSON
},
...
)
Using Airflow templates
Unfortunately it is not possible to use Airflow templates in
session_parameters
, so every DAG id and task id need to be hardcoded. See the section Using Cluster Policies below for an alternative approach.
BigQuery
To set labels on BigQuery, add DAG & task information as a Python dict to labels
in any of the BigQuery operators. The labels
field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:
task = BigQueryExecuteQueryOperator(
task_id='MyTask',
sql="...",
labels={
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}"
},
...
)
Using Airflow templates
This approach requires that every operator instance is changed. For a more convenient approach, see the section Using Cluster Policies below.
Other Warehouses
For warehouses which do not support query tagging, add DAG & Task information as a JSON string as a query comments. The sql
field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:
task = RedshiftSQLOperator(
task_id='MyTask',
sql="""
-- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER,
name VARCHAR
);
""",
...
)
task = SQLExecuteQueryOperator(
task_id='MyTask',
sql="""
-- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER,
name VARCHAR
);
""",
...
)
dbt + Snowflake
dbt Core
If you use Airflow to invoke dbt Core for your Snowflake transformations, you can leverage dbt to automatically tag the queries by adding a macro to the dbt project. For more information, check the dbt Snowflake Query Tags documentation.
An example dbt macro might look like:
{% macro set_query_tag() -%}
{% set query_tag = {
"dbt_model": model.name,
"task_id": env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
"dag_id": env_var("AIRFLOW_CTX_DAG_ID", ""),
} %}
{% set new_query_tag = tojson(query_tag) %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
And add the query_tag
to the dbt_project.yml
file.
models:
+query_tag: dbt
dbt Cloud
For dbt Cloud the same approach can be used as with dbt Core, but you need to:
- adjust the macro slightly
- update the Airflow operator used to invoke dbt Cloud
In the macro, you need to use dbt variables instead of environment variables, for example you can use dag_id
and task_id
variables:
"task_id": var("task_id", "local_run"),
"dag_id": var("dag_id", ""),
In the Airflow operator you're using to run the dbt Cloud Job, you need to pass the variables expected by the macro. For example, if you're using DbtCloudRunJobOperator
to run a job with a single dbt build
step, you can pass the dag_id
and task_id
variables this way:
trigger_dbt_cloud_job_run = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job_run",
job_id=456621,
check_interval=10,
timeout=300,
steps_override=[f'dbt build --vars "{{dag_id: \\\"{dag.dag_id}\\\", task_id: \\\"trigger_dbt_cloud_job_run\\\"}}"']
)
dbt + Other Warehouses
dbt Core
If you use Airflow to invoke dbt Core for your transformations in other warehouses, you can leverage dbt to automatically add a comment to the queries by adding a macro to the dbt project. For more information, check the dbt Query Comment documentation.
An example dbt query comment macro might look like
{% macro airflow_query_comment(node) %}
{%- set comment_dict = {} -%}
{%- do comment_dict.update(
app='dbt',
dbt_version=dbt_version,
profile_name=target.get('profile_name'),
target_name=target.get('target_name'),
task_id=env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
dag_id=env_var("AIRFLOW_CTX_DAG_ID", ""),
) -%}
{%- if node is not none -%}
{%- do comment_dict.update(
file=node.original_file_path,
node_id=node.unique_id,
node_name=node.name,
resource_type=node.resource_type,
package_name=node.package_name,
relation={
"database": node.database,
"schema": node.schema,
"identifier": node.identifier
}
) -%}
{% else %}
{%- do comment_dict.update(node_id='internal') -%}
{%- endif -%}
{% do return(tojson(comment_dict)) %}
{% endmacro %}
And add the query-comment
to the dbt_project.yml
file.
query-comment: "{{ airflow_query_comment(node) }}"
Version Compatibility
- For Airflow + dbt + Redshift, the dbt version must be >= v1.6
dbt Cloud
For dbt Cloud the same approach can be used as with dbt Core, but you need to:
- adjust the macro slightly
- update the Airflow operator used to invoke dbt Cloud
In the macro, you need to use dbt variables instead of environment variables, for example you can use dag_id
and task_id
variables:
"task_id": var("task_id", "local_run"),
"dag_id": var("dag_id", ""),
In the Airflow operator you're using to run the dbt Cloud Job, you need to pass the variables expected by the macro. For example, if you're using DbtCloudRunJobOperator
to run a job with a single dbt build
step, you can pass the dag_id
and task_id
variables this way:
trigger_dbt_cloud_job_run = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job_run",
job_id=456621,
check_interval=10,
timeout=300,
steps_override=[f'dbt build --vars "{{dag_id: \\\"{dag.dag_id}\\\", task_id: \\\"trigger_dbt_cloud_job_run\\\"}}"']
)
Using Cluster Policies
As documented in Airflow docs here Cluster Policies can be used to check or mutate DAGs or Tasks on a cluster-wide level.
Using a task policy you can update all operators of a given type to have the query tags automatically set, for example for Snowflake:
from airflow.models import BaseOperator
import json
def task_policy(task: BaseOperator):
if task.task_type == "SnowflakeOperator":
session_parameters = {
"QUERY_TAG": json.dumps({
"dag_id": task.dag_id,
"task_id": task.task_id,
}),
}
if hasattr(task, "hook_params"): # Airflow >= v2.5?
task.hook_params['session_parameters'] = session_parameters
else:
task.session_parameters = session_parameters
This content needs to be saved in a file named airflow_local_settings.py
in AIRFLOW_HOME/config
folder, in environments like MWAA or Composer the file can be stored in the dags
folder, you might need to restart the Airflow instance for the changes to take effect.
Multiple Airflow Environments
If you have multiple airflow environments, you may specify an environment tag in queries.
The Environment tag is required in the situation where there are shared dag and task ids between those environments. Otherwise Monte Carlo will not know which Dag/Task the query is referring to, and we will not associate that query with any Dag/Task.
Monte Carlo will look for the following environment tags in queries:
airflow_env
(recommended), environment
, airflow_environment
The value of the environment tag must be the name given to the Airflow Connection in Monte Carlo.
Example:
task = SnowflakeOperator(
task_id='MyTask',
sql="...",
session_parameters={
"QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task", "airflow_env": "Airflow-Prod"}' # Stringified JSON
},
...
)
Updated about 2 months ago