Airflow Incidents and Task Observability

Overview

Integrate Monte Carlo with Airflow to see Airflow DAGs & Task Runs that may have led to a Monte Carlo Incident. When a data incident occurs, finding the the right DAG and Task that caused it can be cumbersome. The Monte Carlo Airflow integration makes finding the root cause simple and decrease your time to resolution.

How Airflow Callbacks Work

Monte Carlo uses Airflow Callbacks to send a webhook back to Monte Carlo upon an event in Airflow. The callbacks are lightweight, with just an HTTP call, so there is a negligible impact to your DAGs and Tasks.

The callbacks can be at the DAG or Task level. Callbacks on the DAG can be specified at a granular-level (one for each Callback Type) or more broadly:

# explicit, per callback type 
dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

# broad, all callbacks
dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

Similarly, Task Callbacks can be explicit or broad:

# explicit, per callback type 
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

# broad, all callbacks
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

The supported Callback Types and their functions are below.

Callback TypeDescriptionDAGTask
on_success_callbackInvoked when the task succeedsmcd_dag_success_callbackmcd_task_success_callback
on_failure_callbackInvoked when the task failsmcd_dag_failure_callbackmcd_task_failure_callback
sla_miss_callbackInvoked when a task misses its defined SLAmcd_sla_miss_callbackN/A
on_retry_callbackInvoked when the task is up for retryN/Amcd_task_retry_callback
on_execute_callbackInvoked right before the task begins executing.N/Amcd_task_execute_callback

Integration Setup

Callbacks can be configured at the individual DAGs or Tasks. It is recommended to try setting up on one DAG to start, and then broadly applying to your DAGs once tested in your environment.

0. Add Airflow Connection to Monte Carlo

  1. Under Settings -> Integrations, go to the ETL Tool Connection section.
  2. Use the Create button and select Airflow.
  3. Add a Connection Name and Host name for Airflow. Host name is not currently validated, it is only used to provide external link to Airflow Logs and DAGs from the Monte Carlo application.

Alternatively, you can add an Airflow connection using the Monte Carlo CLI.

1. Create a Monte Carlo API Key

Create a Monte Carlo API Key with at least Editors permissions under Settings > API > Account Service Keys.

2. Install airflow-mcd from PyPi to Airflow

Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt file for Airflow.

❗️

Restarting Airflow

Installing packages requires restarting Airflow, so it is ideal to plan this ahead of time. Airflow Downtime can be as long as an hour, depending on the size of your other packages for Airflow.

3. Create a Monte Carlo Data Connection in Airflow

  1. In your Airflow UI, go to Admin > Connections.
  2. Click the blue plus button to Create a new connection.
  3. Name the Conn Id with the name, i.e. mcd_default_session.

🚧

Conn ID Naming

You can change the name from mcd_default_session, but it will require you to specify the name of the Conn ID on each callback. Use mcd_default_session for the simplest approach.

  1. Set the Conn Type to Monte Carlo Data.
    1. Note: Before airflow-mcd package version v0.2.0 the Conn Type used was http
  2. Paste the API Key ID in the Login field
  3. Paste the API Key Secret in the Password field.
  4. Test the connection by clicking the Test button. You should see a "Connection successfully tested" message.
  5. Save the connection.

4. Add Callbacks to DAGs and Tasks

Option 1: Add Monte Carlo DAG Callbacks and for all Tasks as a default argument.

If you are not using callbacks today or prefer to replace your callbacks with Monte Carlo's, the simplest approach is to add Monte Carlo's DAG and Task callbacks at the DAG level. If you are using callbacks already, secondary options are provided below.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
    default_args={
        **mcd_callbacks.task_callbacks,
    }
)

If you prefer to only add observability to specific Tasks, you can add as a default at the Task level:

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

Option 2: Add Monte Carlo Callbacks to your existing Callbacks.

Use this method if you already have Callbacks that you would like to keep.

from airflow_mcd.callbacks import mcd_callbacks

# existing dag callback function: execute, success, failure, or miss
def dag_callback(context):
    # existing processing code
		foo(context)
    
    # pick the one:
    # monte carlo callback for execute
    mcd_callbacks.mcd_dag_execute_callback(context)
    # monte carlo callback for success
    mcd_callbacks.mcd_dag_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_dag_failure_callback(context)
    # monte carlo callback for sla_miss
    mcd_callbacks.mcd_sla_miss_callback(context)

# existing task callback function: execute, success, failure, or retry
def task_callback(context):
    # existing callbacks
		bar(context)
    
    # pick the one:
    # monte carlo callback for execute
    mcd_callbacks.mcd_task_execute_callback(context)
    # monte carlo callback for success
    mcd_callbacks.mcd_task_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_task_failure_callback(context)
		# monte carlo callback for retry
    mcd_callbacks.mcd_task_retry_callback(context)  
    
# ! only available in Airflow >= 2.6.0
# add to list 
dag = DAG(
    'dag_name',
    on_success_callback=[existing_dag_success_callback, mcd_dag_success_callback],
)

Option 3: Explicitly configure each Callback Type.

Use this option if you want to only use specific callbacks.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    on_execute_callback=mcd_callbacks.mcd_dag_execute_callback,
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

Optional Steps

If you used a custom Conn ID, you will need to specify the Conn ID in the default connection name.

dag = DAG(
    'dag_name',
    params={'mcd_connection_id': 'my_connection_id'}
    **mcd_callbacks.dag_callbacks,
)

What’s Next