Push Ingest API

What is the Ingest API?

The Ingest API lets you push observability data directly to Monte Carlo from any data source, without waiting for a standard (pull-based) collection job to complete. Use it to:

  • Fill gaps your integration does not cover natively (column lineage, query history, freshness timestamps)
  • Send lineage and metadata you already have in your own systems (dbt, Airflow, custom ETL)
  • Complement Monte Carlo's collected data with authoritative lineage from sources the pull model cannot reach

The primary use case is augmenting an existing integration: your Monte Carlo integration might not support (metadata collection, table or column lineage or query logs collection), and the Push Ingest API fills in the gaps — so you can end up with your desired data ingested in Monte Carlo.

Three endpoints cover everything:

EndpointWhat it ingestsTypical latency
POST /ingest/v1/metadataTable/view schema, columns, row counts, byte counts, last-update timestampsNormally under 15 minutes
POST /ingest/v1/lineageTable-level and column-level data lineageNormally under 15 minutes
POST /ingest/v1/querylogsSQL query historyNormally under 1 hour

Every accepted request returns an invocation_id. Save it — it is your primary handle for tracing a push through the downstream systems. The Monte Carlo engineering team will ask for it to investigate any issue.

📘

Build your data collector and pusher using AI

The mcd-agent-toolkit let you generate a complete, working push integration in minutes. Install our plugin in Claude and use these slash commands:

  • /mc-build-metadata-collector — generate a metadata push script for your warehouse
  • /mc-build-lineage-collector — generate a lineage push script
  • /mc-build-query-log-collector — generate a query log push script
  • /mc-validate-metadata — validate what you've pushed

The skills also explain how to convert a one-off push into a scheduled recurring script.

Architecture overview

flowchart LR
    A["Your script\npycarlo SDK or direct HTTP"] --> B["Integration Gateway\nhttps://integrations.getmontecarlo.com"]
    B --> C{"Auth + schema validation"}
    C -->|"202 + invocation_id"| A
    C --> D["Processing pipeline"]
    D --> E["Monte Carlo\ncatalog · lineage graph · query logs"]

End-to-end workflow

Before you push data, you need a Monte Carlo integration in place. Here is the full flow:

  1. Set up your integration — Go to Settings → Integrations and create a connection for your data source. This establishes the warehouse record and gives you a warehouse UUID.
  2. Create an Ingestion keySee Prerequisites below. Push requests require a dedicated Ingestion key, not a standard API key.
  3. Push data — Use the pycarlo SDK or direct HTTP. See Getting started.
  4. Create monitors — Once data is in Monte Carlo, create freshness, volume, or custom SQL monitors on your pushed tables from the Monitors UI.

Prerequisites

1. Create an Ingestion key

Push requests use a dedicated integration key with scope Ingestion. A standard Monte Carlo API key will not work. See

Option A — GraphQL mutation (recommended)

Use the API Explorer with a GraphQL API key:

mutation {
  createIntegrationKey(
    description: "Push ingestion key"
    scope: Ingestion
    warehouseIds: ["<warehouse-uuid>"]   # optional — omit to allow all warehouses
  ) {
    key { id secret }
  }
}

Option B — CLI

pip install montecarlodata
montecarlo configure            # enter your GraphQL API key when prompted
montecarlo integrations create-key \
  --scope Ingestion \
  --description "Push ingestion key"

Output:

Key id:     <id>
Key secret: <secret>    ← shown only once — store it immediately
❗️

Store the key secret immediately

The key secret is shown only once at creation time. Save it to a secure secrets manager before closing the terminal.

2. Create a GraphQL API key (for verification)

You also need a separate GraphQL API key to verify pushed data. Go to Settings → API in the Monte Carlo UI, click Add, and copy the key ID and secret immediately.

GraphQL endpoint: https://api.getmontecarlo.com/graphql

Go to API Authentication for more details.

3. Find your warehouse UUID

Every push request must include resource.uuid — the UUID of the Monte Carlo resource you are pushing data for.

Query it via the GraphQL API:

query {
  getUser {
    account {
      warehouses {
        uuid
        name
        connectionType
      }
    }
  }
}

Or in the UI: Settings → Integrations → [click your warehouse] — the UUID appears in the URL.

4. Install pycarlo (optional)

The pycarlo SDK simplifies push calls. Minimum required version: v0.12.251.

pip install "pycarlo>=0.12.251"

You can also call the APIs directly over HTTPS without the SDK — see the Direct HTTP API section.


Getting started — first push

Initialize the client

import os
from pycarlo.core import Client, Session
from pycarlo.features.ingestion import IngestionService

service = IngestionService(mc_client=Client(session=Session(
    mcd_id=os.environ["MC_INGEST_KEY_ID"],
    mcd_token=os.environ["MC_INGEST_KEY_TOKEN"],
    scope="Ingestion",
)))

Recommended environment variables:

export MC_INGEST_KEY_ID="<ingestion-key-id>"
export MC_INGEST_KEY_TOKEN="<ingestion-key-secret>"
export MC_RESOURCE_UUID="<warehouse-uuid>"
❗️

Keep credentials out of source control

Replace placeholder values with your actual credentials. Never commit real key secrets to version-controlled files or CI configuration.

Send metadata

📘

Illustrative example

The table name, database, schema, field names, volume, and freshness values below are illustrative. Replace them with your actual values.

from pycarlo.features.ingestion.models import (
    AssetField, AssetFreshness, AssetMetadata, AssetVolume, RelationalAsset,
)

result = service.send_metadata(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    resource_type="snowflake",   # lowercase — e.g. snowflake, bigquery, data-lake
    events=[
        RelationalAsset(
            type="TABLE",
            metadata=AssetMetadata(
                name="orders",
                database="analytics",
                schema="public",
                description="Customer orders",
            ),
            fields=[
                AssetField(name="id",         type="INTEGER"),
                AssetField(name="amount",     type="DECIMAL(10,2)"),
                AssetField(name="created_at", type="TIMESTAMP_NTZ"),
            ],
            volume=AssetVolume(row_count=1_000_000, byte_count=111_111_111),
            freshness=AssetFreshness(last_update_time="2024-03-01T14:30:00Z"),
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

A 202 response with an invocation_id means the push was accepted. Metadata appears in Monte Carlo within a few minutes.


API reference

Base URL (production): https://integrations.getmontecarlo.com

Authentication

All three endpoints use the same headers:

HeaderValue
x-mcd-idIntegration key ID
x-mcd-tokenIntegration key secret
Content-Typeapplication/json

Success response

All three endpoints return 202 Accepted on success:

{ "invocation_id": "<uuid>" }

Error codes

StatusMeaning
400Schema validation failed. Check details in the response body for field-level errors.
401Authentication failed — wrong key ID or secret.
403The key is not authorized for the resource.uuid in the payload.
413Payload too large — compressed body exceeds 1 MB. Split into smaller batches.
5xxServer error — retry with exponential backoff.

POST /ingest/v1/metadata

Push table or view metadata, column definitions, volume, and freshness.

Table Schema
{
  "event_type": {
    "type": "string",
    "required": true,
    "description": "Always \"METADATA\""
  },
  "resource": {
    "uuid": {
      "type": "string",
      "required": true,
      "description": "Warehouse UUID"
    },
    "resource_type": {
      "type": "string",
      "required": true,
      "description": "Warehouse type — e.g. snowflake, bigquery, data-lake"
    }
  },
  "events": [
    {
      "relational_asset": {
        "type": {
          "type": "string",
          "required": true,
          "description": "One of: TABLE, VIEW, EXTERNAL, WILDCARD"
        },
        "metadata": {
          "name": { "type": "string", "required": true, "description": "Table or view name" },
          "database": { "type": "string", "required": true, "description": "Database name" },
          "schema": { "type": "string", "required": true, "description": "Schema name" },
          "description": { "type": "string", "required": false, "description": "Human-readable description" },
          "view_query": { "type": "string", "required": false, "description": "SQL definition — views only" },
          "created_on": { "type": "string", "required": false, "description": "Creation timestamp (ISO 8601)" }
        },
        "tags": {
          "type": "array",
          "required": false,
          "description": "Key-value pairs: [{ \"key\": \"...\", \"value\": \"...\" }]"
        },
        "fields": [
          {
            "name": { "type": "string", "required": true, "description": "Column name" },
            "type": { "type": "string", "required": true, "description": "Column data type" },
            "description": { "type": "string", "required": false, "description": "Column description" }
          }
        ],
        "volume": {
          "row_count": { "type": "integer", "required": false, "description": "Number of rows" },
          "byte_count": { "type": "integer", "required": false, "description": "Table size in bytes" }
        },
        "freshness": {
          "last_update_time": {
            "type": "string",
            "required": true,
            "description": "ISO 8601 timestamp of most recent data modification. Required if the freshness block is included. See Anomaly detection for detector requirements."
          }
        }
      }
    }
  ]
}
{
  "event_type": "METADATA",
  "resource": {
    "uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "resource_type": "snowflake"
  },
  "events": [
    {
      "relational_asset": {
        "type": "TABLE",
        "metadata": {
          "name": "orders",
          "database": "analytics",
          "schema": "public",
          "description": "Customer orders",
          "created_on": "2023-01-01T00:00:00Z"
        },
        "tags": [
          { "key": "team", "value": "data-eng" }
        ],
        "fields": [
          { "name": "id",         "type": "INTEGER",       "description": "Primary key" },
          { "name": "amount",     "type": "DECIMAL(10,2)" },
          { "name": "created_at", "type": "TIMESTAMP_NTZ" }
        ],
        "volume": {
          "row_count": 1000000,
          "byte_count": 111111111
        },
        "freshness": {
          "last_update_time": "2024-03-01T14:30:00Z"
        }
      }
    }
  ]
}
View Schema
{
  "event_type": {
    "type": "string",
    "required": true,
    "description": "Always \"METADATA\""
  },
  "resource": {
    "uuid": { "type": "string", "required": true, "description": "Warehouse UUID" },
    "resource_type": { "type": "string", "required": true, "description": "Warehouse type — e.g. snowflake, bigquery, data-lake" }
  },
  "events": [
    {
      "relational_asset": {
        "type": {
          "type": "string",
          "required": true,
          "description": "Always \"VIEW\" for views"
        },
        "metadata": {
          "name": { "type": "string", "required": true, "description": "View name" },
          "database": { "type": "string", "required": true, "description": "Database name" },
          "schema": { "type": "string", "required": true, "description": "Schema name" },
          "description": { "type": "string", "required": false, "description": "Human-readable description" },
          "view_query": { "type": "string", "required": false, "description": "SQL definition of the view" }
        }
      }
    }
  ]
}
{
  "event_type": "METADATA",
  "resource": {
    "uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "resource_type": "snowflake"
  },
  "events": [
    {
      "relational_asset": {
        "type": "VIEW",
        "metadata": {
          "name": "orders_view",
          "database": "analytics",
          "schema": "public",
          "description": "View over the orders table",
          "view_query": "SELECT id, amount, created_at FROM analytics.public.orders WHERE amount > 0"
        }
      }
    }
  ]
}

POST /ingest/v1/lineage

Push table-level or column-level data lineage. Both use the same endpoint; event_type determines the shape.

Table Lineage Schema
{
  "event_type": {
    "type": "string",
    "required": true,
    "description": "Always \"LINEAGE\" for table lineage"
  },
  "resource": {
    "uuid": { "type": "string", "required": true, "description": "Warehouse UUID" },
    "resource_type": { "type": "string", "required": true, "description": "Warehouse type" }
  },
  "events": [
    {
      "destination": {
        "type": { "type": "string", "required": true, "description": "One of: TABLE, VIEW, EXTERNAL, WILDCARD" },
        "database": { "type": "string", "required": true },
        "schema": { "type": "string", "required": true },
        "name": { "type": "string", "required": true }
      },
      "sources": [
        {
          "type": { "type": "string", "required": true, "description": "One of: TABLE, VIEW, EXTERNAL, WILDCARD" },
          "database": { "type": "string", "required": true },
          "schema": { "type": "string", "required": true },
          "name": { "type": "string", "required": true }
        }
      ]
    }
  ]
}
{
  "event_type": "LINEAGE",
  "resource": {
    "uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "resource_type": "snowflake"
  },
  "events": [
    {
      "destination": { "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_curated" },
      "sources": [
        { "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_raw" }
      ]
    }
  ]
}
Column lineage Schema
{
  "event_type": {
    "type": "string",
    "required": true,
    "description": "Always \"COLUMN_LINEAGE\" for column lineage"
  },
  "resource": {
    "uuid": { "type": "string", "required": true, "description": "Warehouse UUID" },
    "resource_type": { "type": "string", "required": true, "description": "Warehouse type" }
  },
  "events": [
    {
      "destination": {
        "type": { "type": "string", "required": true, "description": "One of: TABLE, VIEW, EXTERNAL, WILDCARD" },
        "database": { "type": "string", "required": true },
        "schema": { "type": "string", "required": true },
        "name": { "type": "string", "required": true },
        "asset_id": { "type": "string", "required": true, "description": "Local alias for cross-referencing within the payload — not stored externally" }
      },
      "sources": [
        {
          "type": { "type": "string", "required": true },
          "database": { "type": "string", "required": true },
          "schema": { "type": "string", "required": true },
          "name": { "type": "string", "required": true },
          "asset_id": { "type": "string", "required": true, "description": "Local alias for cross-referencing" }
        }
      ],
      "fields": [
        {
          "name": { "type": "string", "required": true, "description": "Destination column name" },
          "source_fields": [
            {
              "asset_id": { "type": "string", "required": true, "description": "Matches the asset_id on the source asset" },
              "field_name": { "type": "string", "required": true, "description": "Source column name" }
            }
          ]
        }
      ]
    }
  ]
}
{
  "event_type": "COLUMN_LINEAGE",
  "resource": { "uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "resource_type": "snowflake" },
  "events": [
    {
      "destination": { "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_curated", "asset_id": "dest" },
      "sources": [
        { "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_raw", "asset_id": "src" }
      ],
      "fields": [
        {
          "name": "order_id",
          "source_fields": [ { "asset_id": "src", "field_name": "id" } ]
        }
      ]
    }
  ]
}
❗️

Column lineage expires after 10 days

Column lineage pushed via this endpoint expires after 10 days — the same retention as our data collector pull-model column lineage. Table lineage never expires.

Pushed table lineage nodes and edges will be set with an expireAt: "9999-12-31" so they will never expire. In the case that you use the GraphQL mutations to handle custom lineage ie (createOrUpdateLineageNode or createOrUpdateLineageEdge) set the expireAt: "9999-12-31" if you do not want them to expire.

TTL summary:

Lineage typeExpiration
Table lineage (push API)Never expires
Column lineage (push API)10 days
Custom nodes/edges (createOrUpdateLineageEdge)7 days by default; set expireAt: "9999-12-31" for permanent

POST /ingest/v1/querylogs

Push SQL query history.

❗️

log_type, not resource_type

This endpoint uses log_type instead of resource_type on the resource object. This is the only endpoint where the field name differs.

Query logs Schema
{
  "event_type": {
    "type": "string",
    "required": true,
    "description": "Always \"QUERY_LOG\""
  },
  "resource": {
    "uuid": { "type": "string", "required": true, "description": "Warehouse UUID" },
    "log_type": {
      "type": "string",
      "required": true,
      "description": "Warehouse type — snowflake, redshift, bigquery, athena, teradata, clickhouse, databricks-metastore-sql-warehouse, s3, presto-s3, hive-s3"
    }
  },
  "events": [
    {
      "start_time": { "type": "string", "required": true, "description": "Query start time (ISO 8601)" },
      "end_time": { "type": "string", "required": true, "description": "Query end time (ISO 8601)" },
      "query_text": { "type": "string", "required": true, "description": "SQL query text" },
      "query_id": { "type": "string", "required": false, "description": "Warehouse-assigned query ID" },
      "user": { "type": "string", "required": false, "description": "User who ran the query" },
      "returned_rows": { "type": "integer", "required": false, "description": "Number of rows returned" },
      "error_code": { "type": "string", "required": false, "description": "Error code if the query failed; null if successful" },
      "error_text": { "type": "string", "required": false, "description": "Error message if the query failed; null if successful" },
      "extra": { "type": "object", "required": false, "description": "Additional metadata" }
    }
  ]
}
{
  "event_type": "QUERY_LOG",
  "resource": {
    "uuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "log_type": "snowflake"
  },
  "events": [
    {
      "start_time":    "2024-03-01T12:00:00Z",
      "end_time":      "2024-03-01T12:00:05Z",
      "query_text":    "SELECT * FROM analytics.public.orders LIMIT 100",
      "query_id":      "01b2c3d4-0000-1234-0000-ab00ef567890",
      "user":          "[email protected]",
      "returned_rows": 100,
      "error_code":    null,
      "error_text":    null,
      "extra":         {}
    }
  ]
}

Query logs are processed asynchronously. Allow 45–60 minutes after pushing before checking results.


pycarlo SDK guide

For the full pycarlo API reference, see apidocs.getmontecarlo.com.

📘

Illustrative examples

The table name, database, schema, field names, volume, and freshness values below are illustrative. Replace them with your actual values.

Push table lineage

from pycarlo.features.ingestion.models import LineageAssetRef, LineageEvent

result = service.send_lineage(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    resource_type="snowflake",
    events=[
        LineageEvent(
            destination=LineageAssetRef(
                type="TABLE", name="orders_curated",
                database="analytics", schema="public",
            ),
            sources=[
                LineageAssetRef(
                    type="TABLE", name="orders_raw",
                    database="analytics", schema="public",
                )
            ],
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

Push column lineage

from pycarlo.features.ingestion.models import (
    ColumnLineageField, ColumnLineageSourceField, LineageAssetRef, LineageEvent,
)

result = service.send_lineage(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    resource_type="snowflake",
    events=[
        LineageEvent(
            destination=LineageAssetRef(
                type="TABLE", name="orders_curated",
                database="analytics", schema="public",
                asset_id="dest",
            ),
            sources=[
                LineageAssetRef(
                    type="TABLE", name="orders_raw",
                    database="analytics", schema="public",
                    asset_id="src",
                )
            ],
            fields=[
                ColumnLineageField(
                    name="order_id",
                    source_fields=[ColumnLineageSourceField(asset_id="src", field_name="id")],
                )
            ],
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

Push query logs

from datetime import datetime, timezone, timedelta
from pycarlo.features.ingestion.models import QueryLogEntry

now = datetime.now(timezone.utc)

result = service.send_query_logs(
    resource_uuid="<warehouse-uuid>",
    log_type="snowflake",
    events=[
        QueryLogEntry(
            start_time=now - timedelta(minutes=5),
            end_time=now - timedelta(minutes=4),
            query_text="SELECT * FROM analytics.public.orders LIMIT 100",
            query_id="<optional-warehouse-query-id>",
            user="[email protected]",
            returned_rows=100,
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

Batching large payloads

The compressed request body must not exceed 1 MB. For large collections of tables, lineage edges, or query log entries, split your events list into batches of ~500. Each batch returns its own invocation_id.

BATCH_SIZE = 500
for i in range(0, len(all_events), BATCH_SIZE):
    batch = all_events[i : i + BATCH_SIZE]
    result = service.send_metadata(
        resource_uuid=resource_uuid,
        resource_type=resource_type,
        events=batch,
    )
    print(f"Batch {i // BATCH_SIZE + 1} invocation_id: {service.extract_invocation_id(result)}")

Direct HTTP API

The pycarlo SDK is optional. You can call the push APIs directly over HTTPS.

❗️

Keep credentials out of source control

Replace placeholder values with your actual credentials. Never commit real key secrets to version-controlled files or CI configuration.

Metadata:

curl -X POST "https://integrations.getmontecarlo.com/ingest/v1/metadata" \
  -H "Content-Type: application/json" \
  -H "x-mcd-id: <key-id>" \
  -H "x-mcd-token: <key-secret>" \
  -d '{
    "event_type": "METADATA",
    "resource": { "uuid": "<warehouse-uuid>", "resource_type": "snowflake" },
    "events": [{
      "relational_asset": {
        "type": "TABLE",
        "metadata": { "name": "orders", "database": "analytics", "schema": "public" }
      }
    }]
  }'

Table lineage:

curl -X POST "https://integrations.getmontecarlo.com/ingest/v1/lineage" \
  -H "Content-Type: application/json" \
  -H "x-mcd-id: <key-id>" \
  -H "x-mcd-token: <key-secret>" \
  -d '{
    "event_type": "LINEAGE",
    "resource": { "uuid": "<warehouse-uuid>", "resource_type": "snowflake" },
    "events": [{
      "destination": { "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_curated" },
      "sources": [{ "type": "TABLE", "database": "analytics", "schema": "public", "name": "orders_raw" }]
    }]
  }'

Query logs:

curl -X POST "https://integrations.getmontecarlo.com/ingest/v1/querylogs" \
  -H "Content-Type: application/json" \
  -H "x-mcd-id: <key-id>" \
  -H "x-mcd-token: <key-secret>" \
  -d '{
    "event_type": "QUERY_LOG",
    "resource": { "uuid": "<warehouse-uuid>", "log_type": "snowflake" },
    "events": [{
      "start_time": "2024-03-01T12:00:00Z",
      "end_time": "2024-03-01T12:00:05Z",
      "query_text": "SELECT * FROM analytics.public.orders LIMIT 100",
      "user": "[email protected]",
      "returned_rows": 100
    }]
  }'

Best practices

Push frequency

Push freshness and volume once per hour. Do not push more frequently — the training pipeline aggregates data into hourly buckets. Sub-hourly pushes do not improve coverage and may produce unpredictable detector behavior.

Batching

Keep batches to ~500 events per request. The compressed body must not exceed 1 MB. Smaller batches also make debugging easier — each batch returns its own invocation_id.

Recurring scripts

A one-off push is useful for testing, but real observability requires consistent, scheduled delivery. To convert your push script into a recurring job:

  • Cron / task scheduler: Schedule your Python script with cron, AWS EventBridge, or Airflow.
  • Claude skills: The mcd-agent-toolkit include /mc-build-metadata-collector and related commands that generate complete, schedulable collectors for your warehouse.

Push table deletion

Push-ingested tables are not automatically deleted by Monte Carlo's periodic cleanup — they are customer-managed, so automatic deletion is not what customers that push their table metadata desire. You are responsible for cleaning them up when tables are removed from your source system. Use the deletePushIngestedTables mutation to delete them explicitly:

mutation DeletePushTables($mcons: [String!]!) {
  deletePushIngestedTables(mcons: $mcons) {
    success
    deletedCount
  }
}

Maximum 1,000 MCONs per call. The mutation also removes associated lineage nodes from the graph.


Real-world examples

Example 1: Postgres metastore — push metadata and lineage for non-native tables

Monte Carlo's Postgres integration crawls the native Postgres catalog. If your data lives in S3 but metadata is registered in a Postgres metastore — and MC can't directly query those tables — push the metadata and lineage instead.

Push metadata:

1. Setup

Follow the steps in the prerequisites to get:

  • Ingestion keys, API keys, warehouse uuid

2. Install AI skills (Optional)

For now we only support the automatic installation of skills for Claude.

To install the skills follow the instructions from the mcd-agent-tookit on how to install the Monte Carlo skills.

Then in Claude Code / Cowork you should be able to see the skills by using the / command:

Choose the /mc-build-metadata-collector and it will help you build (as a best effort) a script to collect the metadata of your tables and then push them.

It will ask you questions, just keep answering them and it will try to do its best to come up with a script that will collect and push the tables metadata.

Once its done you can execute the script and it should return you an invocation_id or a list of invocation_ids in the case of using batches. These are useful for Monte Carlo engineers in the case you face issues after pushing the table metadata as it will allow us to understand the issue faster.

In the case you do not want to use AI to do this, the pushing script should look like this:

# Initialize service as shown in "Getting started — first push"
from pycarlo.features.ingestion.models import (
    AssetField, AssetFreshness, AssetMetadata, AssetVolume, RelationalAsset,
)

result = service.send_metadata(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    resource_type="data-lake",
    events=[
        RelationalAsset(
            type="TABLE",
            metadata=AssetMetadata(
                name="events_raw",
                database="metastore_db",
                schema="raw",
                description="Raw clickstream events from S3",
            ),
            fields=[
                AssetField(name="event_id",   type="VARCHAR"),
                AssetField(name="user_id",    type="BIGINT"),
                AssetField(name="event_type", type="VARCHAR"),
                AssetField(name="ts",         type="TIMESTAMP"),
            ],
            volume=AssetVolume(row_count=50_000_000, byte_count=8_000_000_000),
            freshness=AssetFreshness(last_update_time="2024-03-01T06:00:00Z"),
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

Push lineage (e.g., a Spark job writing events_rawevents_enriched):

In the case of wanting to use AI skills just use the /mc-build-lineage-collector this will do its best effort to build a script to collect the lineage (if it is possible) and push it to Monte Carlo.

If you do not want to use AI, the script should look similar to this one:

# Initialize service as shown in "Getting started — first push"
from pycarlo.features.ingestion.models import LineageAssetRef, LineageEvent

result = service.send_lineage(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    resource_type="data-lake",
    events=[
        LineageEvent(
            destination=LineageAssetRef(type="TABLE", name="events_enriched", database="metastore_db", schema="curated"),
            sources=[LineageAssetRef(type="TABLE", name="events_raw", database="metastore_db", schema="raw")],
        )
    ],
)
print("invocation_id:", service.extract_invocation_id(result))

Example 2: Data lake query logs — push Spark/custom ETL query history

If your data lake processes don't emit query logs to a supported warehouse connector, push them directly. This powers Monte Carlo's query-based lineage and usage analytics.

If you want to use AI use the /mc-build-query-log-collector skill that will help you doing its best effort to build a script that collects the query logs (when possible) and pushes them into Monte Carlo.

On the other hand if you do not want to use AI, the script would be similar to this one:

# Initialize service as shown in "Getting started — first push"
from datetime import datetime, timezone, timedelta
from pycarlo.features.ingestion.models import QueryLogEntry

now = datetime.now(timezone.utc)

# Push a batch of Spark job executions
spark_queries = [
    QueryLogEntry(
        start_time=now - timedelta(hours=1, minutes=5),
        end_time=now - timedelta(hours=1),
        query_text="INSERT INTO metastore_db.curated.events_enriched SELECT * FROM metastore_db.raw.events_raw WHERE ts > '2024-03-01'",
        user="[email protected]",
        returned_rows=1_234_567,
    ),
    QueryLogEntry(
        start_time=now - timedelta(minutes=30),
        end_time=now - timedelta(minutes=25),
        query_text="SELECT user_id, COUNT(*) as event_count FROM metastore_db.curated.events_enriched GROUP BY user_id",
        user="[email protected]",
        returned_rows=5_000,
    ),
]

result = service.send_query_logs(
    resource_uuid=os.environ["MC_RESOURCE_UUID"],
    log_type="s3",
    events=spark_queries,
)
print("invocation_id:", service.extract_invocation_id(result))

Verifying pushed data

All verification queries use the GraphQL API key at https://api.getmontecarlo.com/graphql — not the Ingestion key.

Confirm a table was ingested

fullTableId format: <database>:<schema>.<table> — e.g. analytics:public.orders

query GetTable($fullTableId: String!, $dwId: UUID!) {
  getTable(fullTableId: $fullTableId, dwId: $dwId) {
    mcon
    fullTableId
    displayName
    versions { edges { node { fields { name fieldType } } } }
  }
}

Check volume and freshness metrics

query GetMetrics($mcon: String!, $metricName: String!, $startTime: DateTime!, $endTime: DateTime!) {
  getMetricsV4(dwId: null, mcon: $mcon, metricName: $metricName, startTime: $startTime, endTime: $endTime) {
    metricsJson
  }
}

Useful metricName values: "total_row_count", "total_byte_count".

Check table lineage

Table lineage is typically visible within seconds to a few minutes after a push.

query GetTableLineage($mcon: String!) {
  getTableLineage(mcon: $mcon, direction: "upstream", hops: 1) {
    connectedNodes { objectId displayName mcon }
    flattenedEdges { directlyConnectedMcons }
  }
}

Check query logs

Allow 45–60 minutes before querying.

query GetAggregatedQueries($mcon: String!, $startTime: DateTime!, $endTime: DateTime!) {
  getAggregatedQueries(
    mcon: $mcon, queryType: "read",
    startTime: $startTime, endTime: $endTime,
    first: 100
  ) {
    edges { node { queryHash queryCount lastSeen } }
    pageInfo { hasNextPage endCursor }
  }
}

Troubleshooting

Push returns 401 — The key ID or secret is wrong, or the key does not exist. Check x-mcd-id and x-mcd-token.

Push returns 403 — The key is scoped to specific warehouse UUIDs and the resource.uuid in your payload is not in the allowed list. Use an unscoped key or add the warehouse UUID to the key's allowlist.

Push returns 400 with "Unsupported ingest query-log log_type" — You used resource_type on the query-log endpoint. Query logs use log_type; metadata and lineage use resource_type. Double-check the field name and that the value is in the supported list.

Push returns 413 — The compressed payload exceeds 1 MB. Split your events list into smaller batches.

Data isn't appearing after a push — Timing expectations: metadata (a few minutes), table lineage (seconds to a few minutes), column lineage (a few minutes), query logs (15–20 minutes). If data still hasn't appeared after these windows, note the invocation_id and contact Monte Carlo support.

Anomaly detectors aren't firing — Detectors need consistent historical data. Push at least once per hour over several weeks. A training detector hasn't reached the minimum sample count yet. An inactive detector hit a deactivation condition — most commonly: not enough samples, coverage too low, or a gap of 14+ days without any push. See Anomaly detection in the FAQ for full thresholds.


FAQ

Anomaly detection

Pushed freshness and volume data feeds the same anomaly detectors as the pull model, but only once the detectors have enough historical data to train on.

Recommended push frequency: once per hour. Do not push more frequently — the training pipeline aggregates into hourly buckets and sub-hourly pushes produce unpredictable detector behavior.

Freshness detector

ParameterValue
Training window35 days
Minimum samples to activate7 pushes with a changed last_update_time
Minimum coverage to activate0.15
Hard deactivation gap14 days without any push
Supported update cycle5 minutes – ~7.7 days

Each push must carry a freshness.last_update_time that has actually changed since the previous push. Repeated identical timestamps do not count as new samples.

Volume detector

ParameterValue
Minimum samples to activate10 (daily) / 48 (sub-daily) / 5 (weekly)
Minimum coverage to activate0.30
Training window42 days
DeactivationCoverage degrades as the 42-day window rolls forward with no new data

Both row_count and byte_count in the volume object contribute to volume anomaly detection.

What to expect

Detectors show training or inactive status during the ramp-up period — this is expected. Freshness detectors need roughly 7 samples spread over ~2 weeks. Volume detectors need 10–48 samples over up to 42 days.

Check detector status via GraphQL:

query GetDetectorStatus($mcon: String!) {
  getTable(mcon: $mcon) {
    thresholds {
      freshness { lower { value } upper { value } status }
      size      { lower { value } upper { value } status }
    }
  }
}

Are NAC (Network Access Controls) supported by the push ingest API endpoint?

Yes. The Ingest API (integrations.getmontecarlo.com/ingest/v1/) supports Network Access Controls. You can use the APIs to restrict access to this endpoints to the IPs you want.

Can I exclude some tables from the normal (pull) data collection and push them instead?

Yes. You need to configure the Ingestion Rules to exclude the tables that you do not want by adding exception rules and then using the APIs for push ingestion described above you can push the desired tables.