Airflow Incidents and Task Observability
Overview
Integrate Monte Carlo with Airflow to see Airflow DAGs & Task Runs that may have led to a Monte Carlo Incident. When a data incident occurs, finding the the right DAG and Task that caused it can be cumbersome. The Monte Carlo Airflow integration makes finding the root cause simple and decrease your time to resolution.
How Airflow Callbacks Work
Monte Carlo uses Airflow Callbacks to send a webhook back to Monte Carlo upon an event in Airflow. The callbacks are lightweight, with just an HTTP call, so there is a negligible impact to your DAGs and Tasks.
The callbacks can be at the DAG or Task level. Callbacks on the DAG can be specified at a granular-level (one for each Callback Type) or more broadly:
# explicit, per callback type
dag = DAG(
'dag_name',
on_success_callback=mcd_callbacks.mcd_dag_success_callback,
on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)
# broad, all callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
)
Similarly, Task Callbacks can be explicit or broad:
# explicit, per callback type
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
on_success_callback=mcd_callbacks.mcd_task_success_callback,
on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)
# broad, all callbacks
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
**mcd_callbacks.task_callbacks,
)
The supported Callback Types and their functions are below.
Callback Type | Description | DAG | Task |
---|---|---|---|
on_success_callback | Invoked when the task succeeds | mcd_dag_success_callback | mcd_task_success_callback |
on_failure_callback | Invoked when the task fails | mcd_dag_failure_callback | mcd_task_failure_callback |
sla_miss_callback | Invoked when a task misses its defined SLA | mcd_sla_miss_callback | N/A |
on_retry_callback | Invoked when the task is up for retry | N/A | mcd_task_retry_callback |
on_execute_callback | Invoked right before the task begins executing. | N/A | mcd_task_execute_callback |
Integration Setup
Callbacks can be configured at the individual DAGs or Tasks. It is recommended to try setting up on one DAG to start, and then broadly applying to your DAGs once tested in your environment.
0. Add Airflow Connection to Monte Carlo
- Under Settings -> Integrations, go to the ETL Tool Connection section.
- Use the Create button and select Airflow.
- Add a Connection Name and Host name for Airflow. Host name is not currently validated, it is only used to provide external link to Airflow Logs and DAGs from the Monte Carlo application.
Alternatively, you can add an Airflow connection using the Monte Carlo CLI.
1. Create a Monte Carlo API Key
Create a Monte Carlo API Key with at least Editors
permissions under Settings > API > Account Service Keys.
2. Install airflow-mcd from PyPi to Airflow
Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt
file for Airflow.
Restarting Airflow
Installing packages requires restarting Airflow, so it is ideal to plan this ahead of time. Airflow Downtime can be as long as an hour, depending on the size of your other packages for Airflow.
3. Create a Monte Carlo Data Connection in Airflow
- In your Airflow UI, go to Admin > Connections.
- Click the blue plus button to Create a new connection.
- Name the Conn Id with the name, i.e.
mcd_default_session
.
Conn ID Naming
You can change the name from
mcd_default_session
, but it will require you to specify the name of the Conn ID on each callback. Usemcd_default_session
for the simplest approach.
- Set the Conn Type to Monte Carlo Data.
- Note: Before airflow-mcd package version
v0.2.0
the Conn Type used washttp
- Note: Before airflow-mcd package version
- Paste the API Key ID in the Login field
- Paste the API Key Secret in the Password field.
- Test the connection by clicking the Test button. You should see a "Connection successfully tested" message.
- Save the connection.
4. Add Callbacks to DAGs and Tasks
Option 1: Add Monte Carlo DAG Callbacks and for all Tasks as a default argument.
If you are not using callbacks today or prefer to replace your callbacks with Monte Carlo's, the simplest approach is to add Monte Carlo's DAG and Task callbacks at the DAG level. If you are using callbacks already, secondary options are provided below.
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
default_args={
**mcd_callbacks.task_callbacks,
}
)
If you prefer to only add observability to specific Tasks, you can add as a default at the Task level:
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
**mcd_callbacks.task_callbacks,
)
Option 2: Add Monte Carlo Callbacks to your existing Callbacks.
Use this method if you already have Callbacks that you would like to keep.
from airflow_mcd.callbacks import mcd_callbacks
# existing dag callback function: execute, success, failure, or miss
def dag_callback(context):
# existing processing code
foo(context)
# pick the one:
# monte carlo callback for execute
mcd_callbacks.mcd_dag_execute_callback(context)
# monte carlo callback for success
mcd_callbacks.mcd_dag_success_callback(context)
# monte carlo callback for failure
mcd_callbacks.mcd_dag_failure_callback(context)
# monte carlo callback for sla_miss
mcd_callbacks.mcd_sla_miss_callback(context)
# existing task callback function: execute, success, failure, or retry
def task_callback(context):
# existing callbacks
bar(context)
# pick the one:
# monte carlo callback for execute
mcd_callbacks.mcd_task_execute_callback(context)
# monte carlo callback for success
mcd_callbacks.mcd_task_success_callback(context)
# monte carlo callback for failure
mcd_callbacks.mcd_task_failure_callback(context)
# monte carlo callback for retry
mcd_callbacks.mcd_task_retry_callback(context)
# ! only available in Airflow >= 2.6.0
# add to list
dag = DAG(
'dag_name',
on_success_callback=[existing_dag_success_callback, mcd_dag_success_callback],
)
Option 3: Explicitly configure each Callback Type.
Use this option if you want to only use specific callbacks.
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
on_execute_callback=mcd_callbacks.mcd_dag_execute_callback,
on_success_callback=mcd_callbacks.mcd_dag_success_callback,
on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
on_success_callback=mcd_callbacks.mcd_task_success_callback,
on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)
Optional Steps
If you used a custom Conn ID
, you will need to specify the Conn ID
in the default connection name.
dag = DAG(
'dag_name',
params={'mcd_connection_id': 'my_connection_id'}
**mcd_callbacks.dag_callbacks,
)
Updated 2 months ago