Airflow Alerts and Task Observability

Overview

Integrate Monte Carlo with Airflow to see Airflow DAGs & Task Runs that may have led to a Monte Carlo Alert. When a data incident occurs, finding the the right DAG and Task that caused it can be cumbersome. The Monte Carlo Airflow integration makes finding the root cause simple and decrease your time to resolution.

How Airflow Callbacks Work

Monte Carlo uses Airflow Callbacks to send a webhook back to Monte Carlo upon an event in Airflow. The callbacks are lightweight, with just an HTTP call, so there is a negligible impact to your DAGs and Tasks.

The callbacks can be at the DAG or Task level; we recommend having both to get full functionality. Callbacks on the DAG can be specified at a granular-level (one for each Callback Type) or more broadly:

# explicit, per callback type 
dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

# broad, all callbacks
dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

Similarly, Task Callbacks can be explicit or broad:

# explicit, per callback type 
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

# broad, all callbacks
task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

The supported Callback Types and their functions are below.

Callback TypeDescriptionDAGTask
on_success_callbackInvoked when the task succeedsmcd_dag_success_callbackmcd_task_success_callback
on_failure_callbackInvoked when the task failsmcd_dag_failure_callbackmcd_task_failure_callback
sla_miss_callbackInvoked when a task misses its defined SLAmcd_sla_miss_callbackN/A
on_retry_callbackInvoked when the task is up for retryN/Amcd_task_retry_callback
on_execute_callbackInvoked right before the task begins executing.N/Amcd_task_execute_callback

Integration Setup

Callbacks can be configured at the individual DAGs or Tasks. It is recommended to try setting up on one DAG to start, and then broadly applying to your DAGs once tested in your environment.

0. Add Airflow Connection to Monte Carlo

  1. Under Settings -> Integrations, go to the ETL Tool Connection section
  2. Use the Create button and select Airflow
  3. Add a Connection Name (for convenience within Monte Carlo)
  4. Specify the Host name for Airflow: the web server at which Airflow runs
    1. Host name is used to provide clickable links into your Airflow Logs and DAGs from the Monte Carlo application; Monte Carlo does not use it for fetching

Alternatively, you can add an Airflow connection using the Monte Carlo CLI.

1. Create a Monte Carlo Gateway Key

Create a Monte Carlo Gateway Key using the Manage keys menu option in the Airflow Connection:

Take note of the resulting ID and Secret values as you'll need them when creating the connection in Airflow.

Alternatively, you can create the integration key using the Monte Carlo CLI:

montecarlo keys add-airflow --description "Airflow Callbacks"

Note: these keys are not compatible with those generated under Settings -> API -> Account Service Keys which can be used for Airflow Circuit Breakers but not callbacks.

2. Install airflow-mcd from PyPi to Airflow

Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt file for Airflow.

❗️

Restarting Airflow

Installing packages requires restarting Airflow, so it is ideal to plan this ahead of time. Airflow Downtime can be as long as an hour, depending on the size of your other packages for Airflow.

3. Create a Monte Carlo Data Connection in Airflow

  1. In your Airflow UI, go to Admin > Connections.
  2. Click the blue plus button to Create a new connection.
  3. Name the Conn Id with the name, i.e. mcd_gateway_default_session.

🚧

Conn ID Naming

You can change the name from mcd_gateway_default_session, but it will require you to specify the name of the Conn ID on each callback. Use mcd_gateway_default_session for the simplest approach.

  1. Set the Conn Type to Monte Carlo Data Gateway.
    1. Note: If Monte Carlo Data Gateway is not available, please check the FAQs section below.
    2. Note: Monte Carlo Data connection type does not support callbacks, it is used only for Circuit Breakers, you must select Monte Carlo Data Gateway or HTTP as described in the FAQs section.
  2. Paste the API Key ID in the Login field
  3. Paste the API Key Secret in the Password field.

Please note: Host, Schema, and Port are not needed.

  1. Test the connection by clicking the Test button. You should see a "Connection to MCD Gateway successfully tested." message.
  2. Save the connection.

🚧

Test Not Working

Occasionally, the Test button does not work from the Airflow interface. This is ok; we recommend still moving forward implementing callbacks. If data still does not reach Monte Carlo after the implementation below, please reach out to our support team at [email protected], and we'd be happy to help.

4. Add Callbacks to DAGs and Tasks

Option 1: Add Monte Carlo DAG Callbacks and for all Tasks as a default argument.

If you are not using callbacks today or prefer to replace your callbacks with Monte Carlo's, the simplest approach is to add Monte Carlo's DAG and Task callbacks at the DAG level. If you are using callbacks already, secondary options are provided below.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
    default_args={
        **mcd_callbacks.task_callbacks,
    }
)

If you prefer to only add observability to specific Tasks, you can add as a default at the Task level:

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    **mcd_callbacks.dag_callbacks,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    **mcd_callbacks.task_callbacks,
)

Option 2: Add Monte Carlo Callbacks to your existing Callbacks.

Use this method if you already have Callbacks that you would like to keep.

from airflow_mcd.callbacks import mcd_callbacks

# existing dag callback function: execute, success, failure, or miss
def dag_callback(context):
    # existing processing code
		foo(context)
    
    # pick the one:
    # monte carlo callback for success
    mcd_callbacks.mcd_dag_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_dag_failure_callback(context)
    # monte carlo callback for sla_miss
    mcd_callbacks.mcd_sla_miss_callback(context)

# existing task callback function: execute, success, failure, or retry
def task_callback(context):
    # existing callbacks
		bar(context)
    
    # pick the one:
    # monte carlo callback for execute
    mcd_callbacks.mcd_task_execute_callback(context)
    # monte carlo callback for success
    mcd_callbacks.mcd_task_success_callback(context)
    # monte carlo callback for failure
    mcd_callbacks.mcd_task_failure_callback(context)
		# monte carlo callback for retry
    mcd_callbacks.mcd_task_retry_callback(context)  
    
# ! only available in Airflow >= 2.6.0
# add to list 
dag = DAG(
    'dag_name',
    on_success_callback=[existing_dag_success_callback, mcd_dag_success_callback],
)

Option 3: Explicitly configure each Callback Type.

Use this option if you want to only use specific callbacks.

from airflow_mcd.callbacks import mcd_callbacks

dag = DAG(
    'dag_name',
    on_success_callback=mcd_callbacks.mcd_dag_success_callback,
    on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
    sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)

task = BashOperator(
    task_id='task_name',
    bash_command='command',
    dag=dag,
    on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
    on_success_callback=mcd_callbacks.mcd_task_success_callback,
    on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
    on_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)

Optional Steps

If you used a custom Conn ID, you will need to specify the Conn ID in the default connection name.

dag = DAG(
    'dag_name',
    params={'mcd_connection_id': 'my_connection_id'}
    **mcd_callbacks.dag_callbacks,
)

FAQs

"Monte Carlo Data Gateway" is not an available connection type, how can I create the connection in Airflow?

First check that you installed airflow-mcd version 0.2.4 or later, if that's the case and you're using MWAA you might be facing a known issue: in some cases pypi packages are not installed in the web server (and thus not available in the connection editor).

To workaround this issue you can use the HTTP connection type that is always available in Airflow and set https://integrations.getmontecarlo.com as the Host for the connection. The rest of the settings are the same described in the step to create a connection in Airflow. Please note that when using HTTP connections you cannot test them as you can do with Monte Carlo connections.

I have updated Airflow to 2.9.0 or greater, and my connection no longer works.

There is a known issue that causes a connection to fail if you are using Airflow 2.9.0 or greater with an airflow-mcd version less than 0.3.3. If you have just updated to Airflow 2.9.0 or greater please update airflow-mcd as well.

How to migrate from the legacy MC Airflow (GraphQL) connection to the Integration Gateway one?

  1. Go to the EXISTING airflow integration in your account and click on the 3 dots. A pop up will show up with different options.

  1. Select Manage Keys and click on Generate integration key .

NOTE: if you already have keys under “Previously generated keys” it is likely that you are already using the Integration Gateway connection and this might not be necessary. You can double check this by looking into your connection in Airflow and see if the Key is used in the Airflow connection Login field.

  1. A new set of keys will be generated. You will be displayed a new Key ID and Key Secret as shown in the image. Keep this screen opened or write down these keys as you will need them in the next steps.

  1. Navigate to Airflow UI and in the menu click on Admin and select Connections.

  1. Search for the connection to Monte Carlo. It should probably be called mcd_default_sessionor you can check for the one with Host: https://api.getmontecarlo.com/graphql

  1. Edit the connection and make the following changes:
    1. Change the connection type to: Monte Carlo Data Gateway(if this option does not show up select HTTP)
    2. Change the Host from: https://api.getmontecarlo.com/graphqlto https://integrations.getmontecarlo.com
    3. Set the Login with the Key ID generated on step #3
    4. Set the Passwordwith the Key Secret generated on step #3
    5. Change the Connection Id from mcd_default_sessionto mcd_gateway_default_session. Note: Only do this if you are not overriding the connection name in your DAGs.
      Click Save. The Test button will be disabled as the connection cannot be tested this way.
      The connection should look similar to the one shown in the image. (If you were able to set the Connection Type to Monte Carlo Data Gateway you should see this in the Connection Type.

  1. To test that the connection work, trigger a DAG in Airflow UI (make sure it is NOT a new one, it should be one that already exists in Monte Carlo, if not it might take several hours to show up in the Assets page and it will make the next step more difficult as we won't be able to find it).

  1. Go to Monte Carlo UI to make sure that the run for the DAG is shown. Look for the DAG on the Assetspage by entering the dag_id in the search box and selecting the Jobs tab.

  1. Click in the DAG you were looking for and the Runs screen will open. You should see that the DAG is running or the run just completed. This means the updated connection worked.


What’s Next