MCP Examples
Practical examples of using the MCP tools for Airflow→Prefect migration.
Basic Workflow
1. Read the DAG
{
"tool": "read_dag",
"args": {
"path": "dags/etl_pipeline.py"
}
}
Response:
{
"source": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
"file_path": "/workspace/dags/etl_pipeline.py",
"file_size_bytes": 1842,
"line_count": 65
}
2. Look Up Concepts
For each Airflow concept found in the DAG, query translation knowledge:
{
"tool": "lookup_concept",
"args": { "concept": "PythonOperator" }
}
Response:
{
"concept_type": "operator",
"airflow": "PythonOperator",
"prefect_equivalent": "@task decorator",
"translation_rules": [
"Replace PythonOperator with @task-decorated function",
"Move python_callable body into the task function",
"Replace op_kwargs with function parameters",
"Replace provide_context/ti.xcom_push with return values"
],
"source": "colin"
}
{
"tool": "lookup_concept",
"args": { "concept": "postgres" }
}
Response:
{
"concept_type": "connection",
"airflow": "postgres_default",
"prefect_equivalent": "SqlAlchemyConnector block (prefect-sqlalchemy)",
"translation_rules": [
"Install prefect-sqlalchemy",
"Create SqlAlchemyConnector block with connection URL",
"Use connector.get_connection() in task code"
],
"source": "colin"
}
3. Search Prefect Docs (Optional)
For concepts not covered by lookup_concept:
{
"tool": "search_prefect_docs",
"args": { "query": "task retries and caching" }
}
Response:
{
"results": [
"Title: Task Caching\nLink: https://docs.prefect.io/v3/develop/task-caching\nContent: ...",
"Title: Tasks\nLink: https://docs.prefect.io/v3/develop/write-tasks\nContent: ..."
],
"query": "task retries and caching",
"source": "https://docs.prefect.io/mcp"
}
4. LLM Generates Code
Using the source code, translation knowledge, and documentation, the LLM generates:
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task(retries=3, retry_delay_seconds=300)
def extract() -> list[dict]:
"""Extract data from source."""
return [{"id": 1, "value": 100}]
@task
def transform(raw_data: list[dict]) -> list[dict]:
"""Transform extracted data."""
return [{"id": r["id"], "value": r["value"] * 2} for r in raw_data]
@task
def load(data: list[dict]) -> None:
"""Load data to Postgres."""
connector = SqlAlchemyConnector.load("postgres-prod")
with connector.get_connection() as conn:
for row in data:
conn.execute("INSERT INTO results VALUES (:id, :value)", row)
@flow(name="etl_pipeline", log_prints=True)
def etl_pipeline():
raw = extract()
transformed = transform(raw)
load(transformed)
5. Validate the Conversion
{
"tool": "validate",
"args": {
"original_dag": "dags/etl_pipeline.py",
"converted_flow": "from prefect import flow, task\n..."
}
}
Response:
{
"original_source": "from airflow import DAG\n...",
"converted_source": "from prefect import flow, task\n...",
"syntax_valid": true,
"syntax_errors": null,
"comparison_guidance": "Compare the original Airflow DAG with the generated Prefect flow. Verify:\n1. All tasks from the DAG are represented in the flow\n2. Task dependencies are preserved\n..."
}
Complex Patterns
TaskFlow API DAG
Input DAG:
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def taskflow_etl():
@task
def extract():
return {"data": [1, 2, 3]}
@task
def transform(data):
return [x * 2 for x in data["data"]]
@task
def load(results):
print(f"Loading {len(results)} items")
data = extract()
transformed = transform(data)
load(transformed)
taskflow_etl()
Lookup:
{ "tool": "lookup_concept", "args": { "concept": "dag-to-flow" } }
Returns: @dag maps directly to @flow, @task maps to @task. TaskFlow DAGs are the closest Airflow pattern to Prefect.
Generated Prefect code:
from prefect import flow, task
@task
def extract():
return {"data": [1, 2, 3]}
@task
def transform(data):
return [x * 2 for x in data["data"]]
@task
def load(results):
print(f"Loading {len(results)} items")
@flow(name="taskflow_etl", log_prints=True)
def taskflow_etl():
data = extract()
transformed = transform(data)
load(transformed)
DAG with Sensors
Lookup:
{ "tool": "lookup_concept", "args": { "concept": "FileSensor" } }
Returns: Convert to a polling task with retries and retry_delay_seconds.
Generated Prefect code:
from prefect import flow, task
from pathlib import Path
@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> bool:
"""Poll for file existence (replaces FileSensor)."""
if not Path(filepath).exists():
raise FileNotFoundError(f"Waiting for {filepath}")
return True
@task
def process_file():
"""Process the detected file."""
...
@flow(name="sensor_example")
def sensor_example():
wait_for_file("/data/input.csv")
process_file()
DAG with Dynamic Tasks
Lookup:
{ "tool": "lookup_concept", "args": { "concept": "dynamic-mapping" } }
Returns: Use .map() for parallel execution over iterables.
Generated Prefect code:
from prefect import flow, task
@task
def process_region(region: str):
"""Process data for a specific region."""
...
@flow(name="dynamic_tasks")
def dynamic_tasks():
regions = ["us-east-1", "eu-west-1", "ap-south-1"]
process_region.map(regions)
Concept Lookups
Operator Lookup
{ "tool": "lookup_concept", "args": { "concept": "BigQueryInsertJobOperator" } }
Returns the Prefect equivalent (bigquery_query from prefect-gcp), example code, and migration notes.
Connection Lookup
{ "tool": "lookup_concept", "args": { "concept": "postgres" } }
Returns the Prefect block type (SqlAlchemyConnector), package, and setup guidance.
Pattern Lookup
{ "tool": "lookup_concept", "args": { "concept": "trigger-rule" } }
Returns how to handle Airflow trigger rules in Prefect (state inspection, wait_for, return_state).
Prompt Templates
Claude Code / Cursor
Read the DAG at
dags/my_etl.pyusing read_dag, look up the Airflow concepts with lookup_concept, then generate a complete Prefect flow following prefecthq/flows conventions. Validate the result.
Agentic Workflow
1. Call read_dag(path="dags/my_etl.py")
2. Identify Airflow concepts in the source code
3. Call lookup_concept() for each concept (PythonOperator, XCom, etc.)
4. Call search_prefect_docs() for any gaps
5. Generate Prefect flow using source + translation knowledge
6. Call validate(original_dag="dags/my_etl.py", converted_flow=generated_code)
7. If syntax errors, fix and re-validate