Airflow in Lineage
Integrate Monte Carlo with Airflow to see DAGs & Tasks linked to Data Assets (Tables, Views, etc.) and also DAGs & Tasks related to tables in a Monte Carlo incident. Monte Carlo relies on query tagging to ingest DAGs & Tasks related to tables.
Which style of tags/labels?
- Snowflake query tags
- BigQuery labels
- Query comments for warehouses which do not support tagging
- dbt + Snowflake
- dbt + other warehouses
- Using cluster policies
The tagged queries executed via Airflow will be later ingested by Monte Carlo via the usual Query Log job. Then as part Query Log processing Monte Carlo will enhance lineage information with Airflow DAGs & Tasks whenever it is present.
Time for Airflow information to show up in lineage
It might take up to 12 hours for the first time query tags are ingested to show up in lineage.
Monte Carlo will look for the following tags in queries:
- DAG:
dag_id
(recommended),dag
,dag_name
,airflow_dag_id
,airflow_dag
,AIRFLOW_DAG_ID
,DAG_ID
- Task:
task_id
(recommended),task
,task_name
,airflow_task
,airflow_task_id
,AIRFLOW_TASK_ID
,TASK_ID
Snowflake
To set query tags on Snowflake, add DAG & task information as a JSON string to session_parameters
in the SnowflakeOperator. Example:
task = SnowflakeOperator(
task_id='MyTask',
sql="...",
session_parameters={
"QUERY_TAG": '{"dag_id": "my_dag", "task_id": "my_task"}' # Stringified JSON
},
...
)
Using Airflow templates
Unfortunately it is not possible to use Airflow templates in
session_parameters
, so every DAG id and task id need to be hardcoded. See the section Using Cluster Policies below for an alternative approach.
BigQuery
To set labels on BigQuery, add DAG & task information as a Python dict to labels
in any of the BigQuery operators. The labels
field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:
task = BigQueryExecuteQueryOperator(
task_id='MyTask',
sql="...",
labels={
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}"
},
...
)
Using Airflow templates
This approach requires that every operator instance is changed. For a more convenient approach, see the section Using Cluster Policies below.
Other Warehouses
For warehouses which do not support query tagging, add DAG & Task information as a JSON string as a query comments. The sql
field accepts Airflow templates, so use that instead of hardcoding the DAG id and task id. Example:
task = RedshiftSQLOperator(
task_id='MyTask',
sql="""
-- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER,
name VARCHAR
);
""",
...
)
task = SQLExecuteQueryOperator(
task_id='MyTask',
sql="""
-- {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER,
name VARCHAR
);
""",
...
)
dbt + Snowflake
If you use Airflow to invoke dbt for your Snowflake transformations, you can leverage dbt to automatically tag the queries by adding a macro to the dbt project. For more information, check the dbt Snowflake Query Tags documentation.
An example dbt macro might look like:
{% macro set_query_tag() -%}
{% set query_tag = {
"dbt_model": model.name,
"task_id": env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
"dag_id": env_var("AIRFLOW_CTX_DAG_ID", ""),
} %}
{% set new_query_tag = tojson(query_tag) %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
And add the query_tag
to the dbt_project.yml
file.
models:
+query_tag: dbt
dbt + Other Warehouses
If you use Airflow to invoke dbt for your transformations in other warehosues, you can leverage dbt to automatically add a comment to the queries by adding a macro to the dbt project. For more information, check the dbt Query Comment documentation.
An example dbt query comment macro might look like
{% macro airflow_query_comment(node) %}
{%- set comment_dict = {} -%}
{%- do comment_dict.update(
app='dbt',
dbt_version=dbt_version,
profile_name=target.get('profile_name'),
target_name=target.get('target_name'),
task_id=env_var("AIRFLOW_CTX_TASK_ID", "local_run"),
dag_id=env_var("AIRFLOW_CTX_DAG_ID", ""),
) -%}
{%- if node is not none -%}
{%- do comment_dict.update(
file=node.original_file_path,
node_id=node.unique_id,
node_name=node.name,
resource_type=node.resource_type,
package_name=node.package_name,
relation={
"database": node.database,
"schema": node.schema,
"identifier": node.identifier
}
) -%}
{% else %}
{%- do comment_dict.update(node_id='internal') -%}
{%- endif -%}
{% do return(tojson(comment_dict)) %}
{% endmacro %}
And add the query-comment
to the dbt_project.yml
file.
query-comment: "{{ airflow_query_comment(node) }}"
Version Compatibility
- For Airflow + dbt + Redshift, the dbt must be >= v1.6
Using Cluster Policies
As documented in Airflow docs here Cluster Policies can be used to check or mutate DAGs or Tasks on a cluster-wide level.
Using a task policy you can update all operators of a given type to have the query tags automatically set, for example for Snowflake:
from airflow.models import BaseOperator
import json
def task_policy(task: BaseOperator):
if task.task_type == "SnowflakeOperator":
session_parameters = {
"QUERY_TAG": json.dumps({
"dag_id": task.dag_id,
"task_id": task.task_id,
}),
}
if hasattr(task, "hook_params"): # Airflow >= v2.5?
task.hook_params['session_parameters'] = session_parameters
else:
task.session_parameters = session_parameters
This content needs to be saved in a file named airflow_local_settings.py
in AIRFLOW_HOME/config
folder, in environments like MWAA or Composer the file can be stored in the dags
folder, you might need to restart the Airflow instance for the changes to take effect.
Updated 2 months ago