Airflow Alerts and Task Observability
Overview
Integrate Monte Carlo with Airflow to see Airflow DAGs & Task Runs that may have led to a Monte Carlo Alert. When a data incident occurs, finding the the right DAG and Task that caused it can be cumbersome. The Monte Carlo Airflow integration makes finding the root cause simple and decrease your time to resolution.
How Airflow Callbacks Work
Monte Carlo uses Airflow Callbacks to send a webhook back to Monte Carlo upon an event in Airflow. The callbacks are lightweight, with just an HTTP call, so there is a negligible impact to your DAGs and Tasks.
The callbacks can be at the DAG or Task level; we recommend having both to get full functionality. Callbacks on the DAG can be specified at a granular-level (one for each Callback Type) or more broadly:
# explicit, per callback type
dag = DAG(
'dag_name',
on_success_callback=mcd_callbacks.mcd_dag_success_callback,
on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)
# broad, all callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
)
Similarly, Task Callbacks can be explicit or broad:
# explicit, per callback type
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
on_success_callback=mcd_callbacks.mcd_task_success_callback,
on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)
# broad, all callbacks
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
**mcd_callbacks.task_callbacks,
)
The supported Callback Types and their functions are below.
Callback Type | Description | DAG | Task |
---|---|---|---|
on_success_callback | Invoked when the task succeeds | mcd_dag_success_callback | mcd_task_success_callback |
on_failure_callback | Invoked when the task fails | mcd_dag_failure_callback | mcd_task_failure_callback |
sla_miss_callback | Invoked when a task misses its defined SLA | mcd_sla_miss_callback | N/A |
on_retry_callback | Invoked when the task is up for retry | N/A | mcd_task_retry_callback |
on_execute_callback | Invoked right before the task begins executing. | N/A | mcd_task_execute_callback |
Integration Setup
Callbacks can be configured at the individual DAGs or Tasks. It is recommended to try setting up on one DAG to start, and then broadly applying to your DAGs once tested in your environment.
0. Add Airflow Connection to Monte Carlo
- Under Settings -> Integrations, go to the ETL Tool Connection section
- Use the Create button and select Airflow
- Add a Connection Name (for convenience within Monte Carlo)
- Specify the Host name for Airflow: the web server at which Airflow runs
- Host name is used to provide clickable links into your Airflow Logs and DAGs from the Monte Carlo application; Monte Carlo does not use it for fetching
Alternatively, you can add an Airflow connection using the Monte Carlo CLI.
1. Create a Monte Carlo Gateway Key
Create a Monte Carlo Gateway Key using the Manage keys
menu option in the Airflow Connection:
Take note of the resulting ID and Secret values as you'll need them when creating the connection in Airflow.
Alternatively, you can create the integration key using the Monte Carlo CLI:
montecarlo keys add-airflow --description "Airflow Callbacks"
Note: these keys are not compatible with those generated under Settings -> API -> Account Service Keys
which can be used for Airflow Circuit Breakers but not callbacks.
2. Install airflow-mcd from PyPi to Airflow
Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt
file for Airflow.
Restarting Airflow
Installing packages requires restarting Airflow, so it is ideal to plan this ahead of time. Airflow Downtime can be as long as an hour, depending on the size of your other packages for Airflow.
3. Create a Monte Carlo Data Connection in Airflow
- In your Airflow UI, go to Admin > Connections.
- Click the blue plus button to Create a new connection.
- Name the Conn Id with the name, i.e.
mcd_gateway_default_session
.
Conn ID Naming
You can change the name from
mcd_gateway_default_session
, but it will require you to specify the name of the Conn ID on each callback. Usemcd_gateway_default_session
for the simplest approach.
- Set the Conn Type to
Monte Carlo Data Gateway
. - Paste the API Key ID in the Login field
- Paste the API Key Secret in the Password field.
Please note: Host, Schema, and Port are not needed.
- Test the connection by clicking the Test button. You should see a "Connection to MCD Gateway successfully tested." message.
- Save the connection.
Test Not Working
Occasionally, the Test button does not work from the Airflow interface. This is ok; we recommend still moving forward implementing callbacks. If data still does not reach Monte Carlo after the implementation below, please reach out to our support team at [email protected], and we'd be happy to help.
4. Add Callbacks to DAGs and Tasks
Option 1: Add Monte Carlo DAG Callbacks and for all Tasks as a default argument.
If you are not using callbacks today or prefer to replace your callbacks with Monte Carlo's, the simplest approach is to add Monte Carlo's DAG and Task callbacks at the DAG level. If you are using callbacks already, secondary options are provided below.
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
default_args={
**mcd_callbacks.task_callbacks,
}
)
If you prefer to only add observability to specific Tasks, you can add as a default at the Task level:
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
**mcd_callbacks.dag_callbacks,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
**mcd_callbacks.task_callbacks,
)
Option 2: Add Monte Carlo Callbacks to your existing Callbacks.
Use this method if you already have Callbacks that you would like to keep.
from airflow_mcd.callbacks import mcd_callbacks
# existing dag callback function: execute, success, failure, or miss
def dag_callback(context):
# existing processing code
foo(context)
# pick the one:
# monte carlo callback for success
mcd_callbacks.mcd_dag_success_callback(context)
# monte carlo callback for failure
mcd_callbacks.mcd_dag_failure_callback(context)
# monte carlo callback for sla_miss
mcd_callbacks.mcd_sla_miss_callback(context)
# existing task callback function: execute, success, failure, or retry
def task_callback(context):
# existing callbacks
bar(context)
# pick the one:
# monte carlo callback for execute
mcd_callbacks.mcd_task_execute_callback(context)
# monte carlo callback for success
mcd_callbacks.mcd_task_success_callback(context)
# monte carlo callback for failure
mcd_callbacks.mcd_task_failure_callback(context)
# monte carlo callback for retry
mcd_callbacks.mcd_task_retry_callback(context)
# ! only available in Airflow >= 2.6.0
# add to list
dag = DAG(
'dag_name',
on_success_callback=[existing_dag_success_callback, mcd_dag_success_callback],
)
Option 3: Explicitly configure each Callback Type.
Use this option if you want to only use specific callbacks.
from airflow_mcd.callbacks import mcd_callbacks
dag = DAG(
'dag_name',
on_success_callback=mcd_callbacks.mcd_dag_success_callback,
on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
on_success_callback=mcd_callbacks.mcd_task_success_callback,
on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)
Optional Steps
If you used a custom Conn ID
, you will need to specify the Conn ID
in the default connection name.
dag = DAG(
'dag_name',
params={'mcd_connection_id': 'my_connection_id'}
**mcd_callbacks.dag_callbacks,
)
FAQs
"Monte Carlo Data Gateway" is not an available connection type, how can I create the connection in Airflow?
First check that you installed airflow-mcd
version 0.2.4
or later, if that's the case and you're using MWAA
you might be facing a known issue: in some cases pypi
packages are not installed in the web server (and thus not available in the connection editor).
To workaround this issue you can use the HTTP
connection type that is always available in Airflow and set https://integrations.getmontecarlo.com
as the Host
for the connection. The rest of the settings are the same described in the step to create a connection in Airflow. Please note that when using HTTP
connections you cannot test them as you can do with Monte Carlo connections.
Updated 5 months ago