Circuit breakers

Alpha / early access

Circuit breakers can be used to stop (or “circuit break”) pipelines when data does not meet a set of quality or integrity thresholds. This can be useful for multiple purposes including, but definitely not limited to, checking if data does not meet your requirements between transformation steps, or after ETL/ELT jobs execute, but before BI dashboards are updated.

Example of an integrity test preventing a downstream task from executing using our [airflow provider](https://pypi.org/project/airflow-mcd/).Example of an integrity test preventing a downstream task from executing using our [airflow provider](https://pypi.org/project/airflow-mcd/).

Example of an integrity test preventing a downstream task from executing using our airflow provider.

The following guide explains how to get started with circuit breakers. This includes:

  1. Finding all existing compatible monitors
  2. Creating a new compatible monitor
  3. Adding circuit breakers to your pipeline
  4. Advanced details
  5. Limitations

❗️

Word of caution: Prevent unintended data downtime

Circuit breakers provide an excellent way to prevent data issues from occurring in the first place. However, circuit breakers can also wreak havoc on pipelines when delayed jobs set off a chain reaction of data failures downstream.

Keep the following in mind when considering how you implement circuit breakers:

  • When considering which SQL Rules to implement, select only rules with a run history where you have a good understanding of how often Incidents are triggered
  • Start small by implementing circuit breakers in a handful of Airflow DAGs with a lot of visibility to catch when the breaker is triggered
  • Select SQL Rules where the underlying SQL executes quickly to avoid long query timeouts

Finding all existing compatible monitors

You can use any Custom SQL Monitor (AKA a Rule) to circuit break in your pipelines.

This includes both scheduled and unscheduled rules. See here for more details on the difference between the two.

Via the Dashboard

To find a compatible rule in Monte Carlo, please visit the monitors page, and update the filter to only display "Rules".

You can find the rule UUID by viewing monitor details for that rule you want to use. Save this value as you will need it to add the rule to your pipeline.

Example of dashboard filteringExample of dashboard filtering

Example of dashboard filtering

Via the CLI

📘

Follow this guide to install and configure the CLI. Requires >= 0.18.0

The monitors list command with circuit_breaker_compatible as the rule type can be used to conveniently list all relevant rules with their description and UUIDs.

You should save the UUIDs for any monitors you want use as they will be needed to add the rule to your pipeline.

For reference, see help for this command below:

$ montecarlo monitors list --help
Usage: montecarlo monitors list [OPTIONS]

  List monitors ordered by update recency.

Options:
  --limit INTEGER                 Max number of monitors to list.  [default:
                                  100]

  --rule-type [circuit_breaker_compatible|custom_sql|table_metric|freshness]
                                  The type of rule to list.  [required]
  --help                          Show this message and exit.
# List all circuit breaker compatible rules
$ montecarlo monitors list --rule-type circuit_breaker_compatible

Creating a new compatible monitor

You can create a compatible monitor via the Dashboard, CLI (AKA monitors as code) or the API/SDK.

These Custom SQL Monitors (or rules) can either be either scheduled or unscheduled. The latter are intended to be used only for circuit breakers (or another manual trigger) while the former, or scheduled rules, will also run outside of your pipelines based on the interval you specified. Using scheduled rules for circuit breaking does not affect their normal execution, and can be helpful if you also want your quality checks to happen outside of your pipeline too. Generally, unscheduled rules are recommended to prevent unnecessary queries on your warehouse outside of your transformation pipeline.

Via the Dashboard

Follow this guide to create a compatible rule in Monte Carlo.

An unscheduled rule is created by selecting "Manually triggered only" in the wizard.

Via the CLI

Follow this guide to create a rule in Monte Carlo via monitors as code (CLI + IaC).

Compatible configurations include - custom_sql. See here for details on how to configure. For instance,

montecarlo:
  custom_sql:
    - description: Test rule
      sql: |
         select foo from project.dataset.my_table
      comparisons:
        - type: threshold
          operator: GT
          threshold_value: 0
      schedule:
        type: manual
montecarlo:
  custom_sql:
    - description: Test rule
      sql: |
         select foo from project.dataset.my_table
      comparisons:
        - type: threshold
          operator: GT
          threshold_value: 0
      schedule:
        type: fixed
        interval_minutes: 60
        start_time: "2021-07-27T19:00:00"

Adding circuit breakers to your pipeline

📘

Prerequisites

These steps require creating an API token. See here for details.

Using circuit breakers in your pipeline require a Data Collector with a version newer than v2360. See here for details on how to upgrade.

How you want to add a circuit breaker to your pipeline is based on your environment and pipelines. Monte Carlo offers multiple options. See the compatibility matrix below:

SimpleCircuitBreakerOperator

pycarlo (features)

pycarlo (core)

API

Airflow orchestrator

Yes

Yes

Yes

Yes

Other python orchestrator

No

Yes

Yes

Yes

Non-python orchestrator

No

No

No

Yes

SimpleCircuitBreakerOperator

You can leverage the MCD Airflow provider to simply add a Circuit breaker to your existing Airflow DAGs.

This operator handles triggering, polling and breach evaluation for a rule and Raises an AirflowFailException if the rule condition is in breach when using an Airflow version newer than 1.10.11. Older Airflow versions raise an AirflowException.

This operator expects the following parameters:

  • mcd_session_conn_id: A SessionHook compatible connection. See basic usage below.
  • rule_uuid: UUID of the rule (custom SQL monitor) to execute.

The following parameters can also be passed:

  • timeout_in_minutes [default=5]: Polling timeout in minutes. Note that The Data Collector Lambda has a max timeout of 15 minutes when executing a query. Queries that take longer to execute are not supported, so we recommend filtering down the query output to improve performance (e.g limit WHERE clause). If you expect a query to take the full 15 minutes we recommend padding the timeout to 20 minutes.
  • fail_open [default=True]: Prevent any errors or timeouts when executing a rule from stopping your pipeline. Raises AirflowSkipException if set to True and any issues are encountered. Recommended to set the trigger_rule param for any downstream tasks to none_failed in this case.

🚧

On failing open

By default this operator fails open (fail_open=True). What this means is if there are any errors during executing, processing or evaluating the rule your pipeline is not halted. Only actual breaches should be able to circuit-break or stop your pipelines.

If you want to guarantee a rule passes (i.e. does not breach) set this parameter to False.

Basic Usage:

  1. Create a new Airflow HTTP Connection that is compatible with the SessionHook. This hook expects an Airflow HTTP connection with the MCD API ID and Token in "extra" with the following format:
{
    "mcd_id": "<ID>",
    "mcd_token": "<SECRET>"
}

👍

If you prefer you can specify the mcd_token in the "password" field instead.

Creating a connection exampleCreating a connection example

Creating a connection example

  1. Install airflow-mcd from PyPI.
    Normally this can be done by adding the python package to your requirements.txt file for Airflow.

  2. Add the SimpleCircuitBreakerOperator operator to your Airflow DAG. For instance,

from datetime import datetime, timedelta

from airflow import DAG

try:
  from airflow.operators.bash import BashOperator
except ImportError:
  # For airflow versions <= 2.0.0. This module was deprecated in 2.0.0.
  from airflow.operators.bash_operator import BashOperator

from airflow_mcd.operators import SimpleCircuitBreakerOperator

mcd_connection_id = 'mcd_default_session'

with DAG('sample-dag', start_date=datetime(2022, 2, 8), catchup=False, schedule_interval=timedelta(1)) as dag:
    task1 = BashOperator(
        task_id='example_elt_job_1',
        bash_command='echo I am transforming a very important table!',
    )
    breaker = SimpleCircuitBreakerOperator(
        task_id='example_circuit_breaker',
        mcd_session_conn_id=mcd_connection_id,
        rule_uuid='<RULE_UUID>'
    )
    task2 = BashOperator(
        task_id='example_elt_job_2',
        bash_command='echo I am building a very important dashboard from the table created in task1!',
        trigger_rule='none_failed'
    )

    task1 >> breaker >> task2

If you prefer you can also extend the BaseMcdOperator and SessionHook to include your own logic, processing using our SDK or any other dependencies. This operator and hook are useful outside of customizing circuit breaking too (e.g. creating custom lineage after a task completes).

Pycarlo

📘

Follow this guide to install and configure the SDK. Requires >= 0.0.8.

We highly recommend you review the basic usage from the link above before proceeding.

You can leverage the SDK (features or core) in any python orchestrator or tool.

For instance, with Airflow, you can use a PythonOperator to execute python callables.

See Pycarlo (features) and Pycarlo (core) for more information.

Pycarlo (features)

Pycarlo (features) are an extension of our core SDK, which provides additional convenience by performing operations like combining queries, polling, paging and more.

For circuit breakers we offer methods to trigger, poll and chain both. For instance,

from pycarlo.features.circuit_breakers import CircuitBreakerService

service = CircuitBreakerService(print_func=print)
if service.trigger_and_poll(rule_uuid='<RULE_UUID>'):
    raise SystemExit

You can use pydoc to retrieve any documentation on these methods. For instance,

pydoc pycarlo.features.circuit_breakers.service

Pycarlo (core)

Pycarlo (core) provides convenient programmatic access to execute all MC queries and mutations. This can be used to further customize usage of circuit breakers vs what is already provided out of the box in features.

For circuit breakers the following two operations allow you to trigger and check the status of rule executions. See Advanced details for more information.

  1. Trigger - trigger_circuit_breaker_rule.
from pycarlo.core import Mutation, Client

mutation = Mutation()
mutation.trigger_circuit_breaker_rule(rule_uuid=str('<RULE_UUID>')).__fields__('job_execution_uuid')
job_execution_uuid = Client()(mutation).trigger_circuit_breaker_rule.job_execution_uuid
  1. Check state - get_circuit_breaker_rule_state.
# Note that this example only checks the state once, this needs to be repeated until the status is in a term state.
# See advanced details for an explanation of states.

from pycarlo.core import Client, Query

query = Query()
query.get_circuit_breaker_rule_state(job_execution_uuid=str('<JOB_UUID>')).__fields__('status', 'log')
circuit_breaker_state = Client()(query).get_circuit_breaker_rule_state

API

For any non-python environments you can always leverage our APIs. Like pycarlo (core) this consists of a trigger mutation and a series of status check queries. See Advanced details for more information.

  1. Trigger - triggerCircuitBreakerRule
curl --location --request POST 'https://api.getmontecarlo.com/graphql' \
--header 'x-mcd-id: <MCD_ID>' \
--header 'x-mcd-token: <MCD_TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{"query":"mutation triggerCircuitBreakerRule {\n  triggerCircuitBreakerRule(ruleUuid:\"<RULE_UUID>\"){\n    jobExecutionUuid\n  }\n}","variables":{}}'
  1. Check state - getCircuitBreakerRuleState
# Note that this example only checks the state once, this needs to be repeated until the status is in a term state.
# See advanced details for an explanation of states.

curl --location --request POST 'https://api.getmontecarlo.com/graphql' \
--header 'x-mcd-id: <MCD_ID>' \
--header 'x-mcd-token: <MCD_TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{"query":"query getCircuitBreakerRuleState {\n  getCircuitBreakerRuleState(jobExecutionUuid: \"<JOB_UUID>\"){\n    status\n    log\n  }\n}","variables":{}}'

Advanced details

If you are a user of pycarlo (core), the API or just curious how to interpret the results from the underlying requests:

When triggering
Triggering a circuit breaker rule returns a job UUID. This ID should be used in state polls until the status is in a term state.

When getting the state
The state query contains two core fields in the response -

  1. The status as a string
  2. The log as a list

The status field contains multiple states, and are mostly used for internal tracing. From a user's perspective only the term states need to be tracked. This includes PROCESSING_COMPLETE and HAS_ERROR. The former indicates the rule has finished executing, processing and evaluating while the latter indicates an error has occurred.

The last value of the log list will always contains details about the state. If in HAS_ERROR you should see details about the exception, while in PROCESSING_COMPLETE you should see details about the breach.

A breach count of 0 indicates no breach has occurred.

[{...}, {
    "stage": "EVALUATOR",
    "timestamp": "2022-02-11T19:36:37.947347",
    "payload": {
        "breach_count": 1
    }
}]
[{...}, {
    "stage": "DATA_COLLECTOR_CHILD",
    "timestamp": "2022-02-11T19:36:37.947347",
    "payload": {
        "error": "...",
        "traceback": ["..."]
    }
}]

Limitations

Rules with same UUID executed within 5 minutes of each other share the same execution. If you need to run the same rule in parallel we recommended creating multiple rules with the same query and threshold.

Query execution delays are primarily limited by the concurrency of the Data Collector or cluster load. Please reach out to your Monte Carlo representative if you are affected by this.


Did this page help you?