MCP Examples

Practical examples of using the MCP tools for Airflow→Prefect migration.

Basic Workflow

1. Read the DAG

{
  "tool": "read_dag",
  "args": {
    "path": "dags/etl_pipeline.py"
  }
}

Response:

{
  "source": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
  "file_path": "/workspace/dags/etl_pipeline.py",
  "file_size_bytes": 1842,
  "line_count": 65
}

2. Look Up Concepts

For each Airflow concept found in the DAG, query translation knowledge:

{
  "tool": "lookup_concept",
  "args": { "concept": "PythonOperator" }
}

Response:

{
  "concept_type": "operator",
  "airflow": "PythonOperator",
  "prefect_equivalent": "@task decorator",
  "translation_rules": [
    "Replace PythonOperator with @task-decorated function",
    "Move python_callable body into the task function",
    "Replace op_kwargs with function parameters",
    "Replace provide_context/ti.xcom_push with return values"
  ],
  "source": "colin"
}
{
  "tool": "lookup_concept",
  "args": { "concept": "postgres" }
}

Response:

{
  "concept_type": "connection",
  "airflow": "postgres_default",
  "prefect_equivalent": "SqlAlchemyConnector block (prefect-sqlalchemy)",
  "translation_rules": [
    "Install prefect-sqlalchemy",
    "Create SqlAlchemyConnector block with connection URL",
    "Use connector.get_connection() in task code"
  ],
  "source": "colin"
}

3. Search Prefect Docs (Optional)

For concepts not covered by lookup_concept:

{
  "tool": "search_prefect_docs",
  "args": { "query": "task retries and caching" }
}

Response:

{
  "results": [
    "Title: Task Caching\nLink: https://docs.prefect.io/v3/develop/task-caching\nContent: ...",
    "Title: Tasks\nLink: https://docs.prefect.io/v3/develop/write-tasks\nContent: ..."
  ],
  "query": "task retries and caching",
  "source": "https://docs.prefect.io/mcp"
}

4. LLM Generates Code

Using the source code, translation knowledge, and documentation, the LLM generates:

from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector

@task(retries=3, retry_delay_seconds=300)
def extract() -> list[dict]:
    """Extract data from source."""
    return [{"id": 1, "value": 100}]

@task
def transform(raw_data: list[dict]) -> list[dict]:
    """Transform extracted data."""
    return [{"id": r["id"], "value": r["value"] * 2} for r in raw_data]

@task
def load(data: list[dict]) -> None:
    """Load data to Postgres."""
    connector = SqlAlchemyConnector.load("postgres-prod")
    with connector.get_connection() as conn:
        for row in data:
            conn.execute("INSERT INTO results VALUES (:id, :value)", row)

@flow(name="etl_pipeline", log_prints=True)
def etl_pipeline():
    raw = extract()
    transformed = transform(raw)
    load(transformed)

5. Validate the Conversion

{
  "tool": "validate",
  "args": {
    "original_dag": "dags/etl_pipeline.py",
    "converted_flow": "from prefect import flow, task\n..."
  }
}

Response:

{
  "original_source": "from airflow import DAG\n...",
  "converted_source": "from prefect import flow, task\n...",
  "syntax_valid": true,
  "syntax_errors": null,
  "comparison_guidance": "Compare the original Airflow DAG with the generated Prefect flow. Verify:\n1. All tasks from the DAG are represented in the flow\n2. Task dependencies are preserved\n..."
}

Complex Patterns

TaskFlow API DAG

Input DAG:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def taskflow_etl():
    @task
    def extract():
        return {"data": [1, 2, 3]}

    @task
    def transform(data):
        return [x * 2 for x in data["data"]]

    @task
    def load(results):
        print(f"Loading {len(results)} items")

    data = extract()
    transformed = transform(data)
    load(transformed)

taskflow_etl()

Lookup:

{ "tool": "lookup_concept", "args": { "concept": "dag-to-flow" } }

Returns: @dag maps directly to @flow, @task maps to @task. TaskFlow DAGs are the closest Airflow pattern to Prefect.

Generated Prefect code:

from prefect import flow, task

@task
def extract():
    return {"data": [1, 2, 3]}

@task
def transform(data):
    return [x * 2 for x in data["data"]]

@task
def load(results):
    print(f"Loading {len(results)} items")

@flow(name="taskflow_etl", log_prints=True)
def taskflow_etl():
    data = extract()
    transformed = transform(data)
    load(transformed)

DAG with Sensors

Lookup:

{ "tool": "lookup_concept", "args": { "concept": "FileSensor" } }

Returns: Convert to a polling task with retries and retry_delay_seconds.

Generated Prefect code:

from prefect import flow, task
from pathlib import Path

@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> bool:
    """Poll for file existence (replaces FileSensor)."""
    if not Path(filepath).exists():
        raise FileNotFoundError(f"Waiting for {filepath}")
    return True

@task
def process_file():
    """Process the detected file."""
    ...

@flow(name="sensor_example")
def sensor_example():
    wait_for_file("/data/input.csv")
    process_file()

DAG with Dynamic Tasks

Lookup:

{ "tool": "lookup_concept", "args": { "concept": "dynamic-mapping" } }

Returns: Use .map() for parallel execution over iterables.

Generated Prefect code:

from prefect import flow, task

@task
def process_region(region: str):
    """Process data for a specific region."""
    ...

@flow(name="dynamic_tasks")
def dynamic_tasks():
    regions = ["us-east-1", "eu-west-1", "ap-south-1"]
    process_region.map(regions)

Concept Lookups

Operator Lookup

{ "tool": "lookup_concept", "args": { "concept": "BigQueryInsertJobOperator" } }

Returns the Prefect equivalent (bigquery_query from prefect-gcp), example code, and migration notes.

Connection Lookup

{ "tool": "lookup_concept", "args": { "concept": "postgres" } }

Returns the Prefect block type (SqlAlchemyConnector), package, and setup guidance.

Pattern Lookup

{ "tool": "lookup_concept", "args": { "concept": "trigger-rule" } }

Returns how to handle Airflow trigger rules in Prefect (state inspection, wait_for, return_state).

Prompt Templates

Claude Code / Cursor

Read the DAG at dags/my_etl.py using read_dag, look up the Airflow concepts with lookup_concept, then generate a complete Prefect flow following prefecthq/flows conventions. Validate the result.

Agentic Workflow

1. Call read_dag(path="dags/my_etl.py")
2. Identify Airflow concepts in the source code
3. Call lookup_concept() for each concept (PythonOperator, XCom, etc.)
4. Call search_prefect_docs() for any gaps
5. Generate Prefect flow using source + translation knowledge
6. Call validate(original_dag="dags/my_etl.py", converted_flow=generated_code)
7. If syntax errors, fix and re-validate