Airflow Alerts and Task Observability

Overview

Integrate Monte Carlo with Airflow to see Airflow DAGs & Task Runs that may have led to a Monte Carlo Alert. When a data incident occurs, finding the the right DAG and Task that caused it can be cumbersome. The Monte Carlo Airflow integration makes finding the root cause simple and decrease your time to resolution.

How Airflow Callbacks Work

Monte Carlo uses Airflow Callbacks to send a webhook back to Monte Carlo upon an event in Airflow. The callbacks are lightweight, with just an HTTP call, so there is a negligible impact to your DAGs and Tasks.

The callbacks can be at the DAG or Task level; we recommend having both to get full functionality. Callbacks on the DAG can be specified at a granular-level (one for each Callback Type) or more broadly:

# explicit, per callback type 
dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

# broad, all callbacks
dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

Similarly, Task Callbacks can be explicit or broad:

# explicit, per callback type 
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

# broad, all callbacks
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

The supported Callback Types and their functions are below.

Callback TypeDescriptionDAGTask
on_success_callbackInvoked when the task succeedsmcd_dag_success_callbackmcd_task_success_callback
on_failure_callbackInvoked when the task failsmcd_dag_failure_callbackmcd_task_failure_callback
sla_miss_callbackInvoked when a task misses its defined SLAmcd_sla_miss_callbackN/A
on_retry_callbackInvoked when the task is up for retryN/Amcd_task_retry_callback
on_execute_callbackInvoked right before the task begins executing.N/Amcd_task_execute_callback

Integration Setup

Callbacks can be configured at the individual DAGs or Tasks. It is recommended to try setting up on one DAG to start, and then broadly applying to your DAGs once tested in your environment.

0. Add Airflow Connection to Monte Carlo

  1. Under Settings -> Integrations, go to the ETL Tool Connection section
  2. Use the Create button and select Airflow
  3. Add a Connection Name (for convenience within Monte Carlo)
  4. Specify the Host name for Airflow: the web server at which Airflow runs
    1. Host name is used to provide clickable links into your Airflow Logs and DAGs from the Monte Carlo application; Monte Carlo does not use it for fetching

Alternatively, you can add an Airflow connection using the Monte Carlo CLI.

1. Create a Monte Carlo Gateway Key

Create a Monte Carlo Gateway Key using the Manage keys menu option in the Airflow Connection:

Take note of the resulting ID and Secret values as you'll need them when creating the connection in Airflow.

Alternatively, you can create the integration key using the Monte Carlo CLI:

montecarlo keys add-airflow --description "Airflow Callbacks"

Note: these keys are not compatible with those generated under Settings -> API -> Account Service Keys which can be used for Airflow Circuit Breakers but not callbacks.

2. Install airflow-mcd from PyPi to Airflow

Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt file for Airflow.

❗️

Restarting Airflow

Installing packages requires restarting Airflow, so it is ideal to plan this ahead of time. Airflow Downtime can be as long as an hour, depending on the size of your other packages for Airflow.

3. Create a Monte Carlo Data Connection in Airflow

  1. In your Airflow UI, go to Admin > Connections.
  2. Click the blue plus button to Create a new connection.
  3. Name the Conn Id with the name, i.e. mcd_gateway_default_session.

🚧

Conn ID Naming

You can change the name from mcd_gateway_default_session, but it will require you to specify the name of the Conn ID on each callback. Use mcd_gateway_default_session for the simplest approach.

  1. Set the Conn Type to Monte Carlo Data Gateway.
    1. Note: If Monte Carlo Data Gateway is not available, please check the FAQs section below.
    2. Note: Monte Carlo Data connection type does not support callbacks, it is used only for Circuit Breakers, you must select Monte Carlo Data Gateway or HTTP as described in the FAQs section.
  2. Paste the API Key ID in the Login field
  3. Paste the API Key Secret in the Password field.

Please note: Host, Schema, and Port are not needed.

  1. Test the connection by clicking the Test button. You should see a "Connection to MCD Gateway successfully tested." message.
  2. Save the connection.

🚧

Test Not Working

Occasionally, the Test button does not work from the Airflow interface. This is ok; we recommend still moving forward implementing callbacks. If data still does not reach Monte Carlo after the implementation below, please reach out to our support team at [email protected], and we'd be happy to help.

4. Add Callbacks to DAGs and Tasks

Option 1: Add Monte Carlo DAG Callbacks and for all Tasks as a default argument.

If you are not using callbacks today or prefer to replace your callbacks with Monte Carlo's, the simplest approach is to add Monte Carlo's DAG and Task callbacks at the DAG level. If you are using callbacks already, secondary options are provided below.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
    default_args={
        **mcd_callbacks.task_callbacks,
    }
)

If you prefer to only add observability to specific Tasks, you can add as a default at the Task level:

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

Option 2: Add Monte Carlo Callbacks to your existing Callbacks.

Use this method if you already have Callbacks that you would like to keep.

from airflow_mcd.callbacks import mcd_callbacks

# existing dag callback function: execute, success, failure, or miss
def dag_callback(context):
    # existing processing code
		foo(context)
    
    # pick the one:
    # monte carlo callback for success
    mcd_callbacks.mcd_dag_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_dag_failure_callback(context)
    # monte carlo callback for sla_miss
    mcd_callbacks.mcd_sla_miss_callback(context)

# existing task callback function: execute, success, failure, or retry
def task_callback(context):
    # existing callbacks
		bar(context)
    
    # pick the one:
    # monte carlo callback for execute
    mcd_callbacks.mcd_task_execute_callback(context)
    # monte carlo callback for success
    mcd_callbacks.mcd_task_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_task_failure_callback(context)
		# monte carlo callback for retry
    mcd_callbacks.mcd_task_retry_callback(context)  
    
# ! only available in Airflow >= 2.6.0
# add to list 
dag = DAG(
    'dag_name',
    on_success_callback=[existing_dag_success_callback, mcd_dag_success_callback],
)

Option 3: Explicitly configure each Callback Type.

Use this option if you want to only use specific callbacks.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

Optional Steps

If you used a custom Conn ID, you will need to specify the Conn ID in the default connection name.

dag = DAG(
    'dag_name',
    params={'mcd_connection_id': 'my_connection_id'}
    **mcd_callbacks.dag_callbacks,
)

FAQs

"Monte Carlo Data Gateway" is not an available connection type, how can I create the connection in Airflow?

First check that you installed airflow-mcd version 0.2.4 or later, if that's the case and you're using MWAA you might be facing a known issue: in some cases pypi packages are not installed in the web server (and thus not available in the connection editor).

To workaround this issue you can use the HTTP connection type that is always available in Airflow and set https://integrations.getmontecarlo.com as the Host for the connection. The rest of the settings are the same described in the step to create a connection in Airflow. Please note that when using HTTP connections you cannot test them as you can do with Monte Carlo connections.

I have updated Airflow to 2.9.0 or greater, and my connection no longer works.

There is a known issue that causes a connection to fail if you are using Airflow 2.9.0 or greater with an airflow-mcd version less than 0.3.3. If you have just updated to Airflow 2.9.0 or greater please update airflow-mcd as well.


What’s Next