Skip to main content
Version: Next

Writing MCPs as Files

What is an MCP?

A MetadataChangeProposal (MCP) represents an atomic unit of change in the DataHub Metadata Graph. Each MCP carries a single aspect in its payload and is used to propose changes to DataHub's metadata.

  • Represents a single aspect change
  • Used for proposing metadata changes to DataHub
  • Serves as the basic building block for metadata ingestion

For more information, please see Guide in MCPs

Basic MCP Structure

Here's the essential structure of an MCP in JSON format:

{
"entityType": string, // Type of entity (e.g., "dataset", "chart")
"entityUrn": string, // URN of the entity being updated
"changeType": string, // Type of change (UPSERT, CREATE, DELETE, etc.)
"aspectName": string, // Name of the aspect being modified
"aspect": { // The aspect content
"value": bytes, // Serialized aspect value
"contentType": string // Serialization format (usually "application/json")
}
}

Example MCP File

Here's a practical example of an MCP for a dataset:

{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,example_dataset,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": {
"description": "Example dataset description",
"customProperties": {
"encoding": "utf-8"
}
},
"contentType": "application/json"
}
}

Why Write MCPs as Files?

MCPs in JSON file format are particularly valuable because they represent the lowest and most granular form of events in DataHub. There are two main use cases for working with MCP files:

Testing

MCPs allow you to test your metadata ingestion without setting up complex connector dependencies. You can:

  • Test entity ingestion by running a simple command:
    datahub ingest mcps <file_name>.json
  • Create reproducible test cases for metadata ingestion
  • Write and run tests when contributing to DataHub (see DataHub Testing Guide for more details)

Debugging

MCPs are valuable for debugging because they let you:

  • Examine entities in your DataHub instance at a granular level
  • Export existing entities to MCP files for analysis
  • Verify entity structures and relationships before ingestion

For example, if you want to understand the structure of entities in your DataHub instance, you can emit them as MCP files and examine their contents in detail.

How to write MCPs as files?

Exporting rom DataHub Instance

You can export MCPs directly from your DataHub instance using a recipe file. This is useful when you want to:

  • Examine existing entities in your DataHub instance
  • Create test cases based on real data
  • Debug entity relationships

First, create a recipe file (e.g., export_mcps.yaml):

source:
type: datahub
config:
# Add your DataHub connection configuration here
server: "http://localhost:8080"
token: "your-access-token" # If authentication is required

sink:
type: "file"
config:
filename: "mcps.json"

Run the ingestion:

datahub ingest -c export_mcps.yaml

This will write all the entities from your DataHub instance to mcps.json in MCP format.

Creating MCPs with Python SDK

You can use the FileEmitter class to generate MCPs programmatically:

import argparse
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
)

import time

from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.sink.file import FileSink, FileSinkConfig

class FileEmitter:
def __init__(
self, filename: str, run_id: str = f"test_{int(time.time() * 1000.0)}"
) -> None:
self.sink: FileSink = FileSink(
ctx=PipelineContext(run_id=run_id),
config=FileSinkConfig(filename=filename),
)

def emit(self, event):
self.sink.write_record_async(
record_envelope=RecordEnvelope(record=event, metadata={}),
write_callback=NoopWriteCallback(),
)

def close(self):
self.sink.close()

def create_mcps() -> list[MetadataChangeProposalWrapper]:
# Create dataset MCP
mcps = [
MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,example_dataset,PROD)",
changeType="UPSERT",
aspectName="datasetProperties",
aspect=DatasetPropertiesClass(
description="Example dataset description",
customProperties={"encoding": "utf-8"}
)
)]
return mcps

def emit_to_file(mcps: list[MetadataChangeProposalWrapper], filename: str):
file_emitter = FileEmitter(filename)
for mcp in mcps:
file_emitter.emit(mcp)
file_emitter.close()

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_file",
required=True,
help="Output file path for MCPs"
)
args = parser.parse_args()

mcps = create_mcps()
emit_to_file(mcps, args.output_file)

Edit mcps list in create_mcps to create the event and entities for your needs.

Run the Python script to generate your defined MCPs and save them to a file:

python <file_name>.py --output_file="mcps.json"