Quick Start

The Workflow

airflow-unfactor reads your DAG, provides translation knowledge, and the LLM generates the Prefect code.

1. read_dag(path)         → Raw DAG source code
2. lookup_concept(name)   → Airflow→Prefect mappings
3. search_prefect_docs(q) → Live Prefect docs (optional)
4. LLM generates          → Complete Prefect flow
5. validate(orig, flow)   → Syntax check + comparison

Step 1: Read the DAG

{
  "tool": "read_dag",
  "args": { "path": "dags/my_etl.py" }
}

Returns the raw source code, file path, size, and line count. The LLM reads the code directly.

Step 2: Look Up Translation Knowledge

For each Airflow concept found in the DAG:

{
  "tool": "lookup_concept",
  "args": { "concept": "PythonOperator" }
}

Returns:

  • Prefect equivalent (@task decorator)
  • Translation rules (how to convert)
  • Source: "colin" (pre-compiled) or "fallback" (built-in)

Repeat for each concept: connections, patterns, sensors, etc.

Step 3: Search Prefect Docs (Optional)

For anything not covered by lookup_concept:

{
  "tool": "search_prefect_docs",
  "args": { "query": "task caching and retries" }
}

Returns live search results from docs.prefect.io.

Step 4: LLM Generates Code

Using the DAG source and translation knowledge, your LLM generates complete Prefect flow code:

from prefect import flow, task

@task(retries=3, retry_delay_seconds=300)
def extract_data(source: str) -> dict:
    """Extract data from source."""
    return data

@task
def transform_data(raw: dict) -> dict:
    """Transform extracted data."""
    return processed

@flow(name="my-etl", log_prints=True)
def my_etl_flow():
    data = extract_data("s3://bucket/path")
    transformed = transform_data(data)
    load_data(transformed)

Step 5: Validate

{
  "tool": "validate",
  "args": {
    "original_dag": "dags/my_etl.py",
    "converted_flow": "from prefect import flow, task\n..."
  }
}

Returns:

  • Both source files for side-by-side comparison
  • Syntax validation of the generated code
  • Comparison checklist (tasks, dependencies, config)

Target Project Layout

Generated code should follow prefecthq/flows structure:

deployments/<workspace>/<flow-name>/
├── flow.py           # Main flow code
├── Dockerfile        # If custom deps needed
├── requirements.txt  # Python dependencies

prefect.yaml          # Deployment configuration

Example prefect.yaml

name: flows
prefect-version: 3.0.0

deployments:
  - name: My ETL Flow
    description: Converted from Airflow DAG 'my_etl'
    entrypoint: deployments/data/my-etl/flow.py:my_etl_flow
    schedules:
      - cron: "0 * * * *"
    work_pool:
      name: kubernetes-pool