Enterprise Migration Guide

Patterns and strategies for migrating large DAG portfolios to Prefect.

Assessment Phase

Before starting migration, assess your DAG portfolio:

1. Inventory Analysis

# Count DAGs and operators
find dags/ -name "*.py" -exec grep -l "DAG\|@dag" {} \; | wc -l

# Identify unique operators
grep -rh "Operator\|Sensor" dags/ | sort | uniq -c | sort -rn

Use read_dag to examine each DAG:

{
  "tool": "read_dag",
  "args": { "path": "dags/etl_pipeline.py" }
}

Then use lookup_concept for each operator and pattern found in the source.

2. Complexity Scoring

Categorize DAGs by migration complexity:

ComplexityCharacteristicsApproach
Simple< 10 tasks, standard operators, no XComLLM converts with minimal guidance
Medium10-30 tasks, provider operators, basic XComLLM converts with concept lookups
Complex30+ tasks, custom operators, dynamic patternsLLM converts with manual review

3. Dependency Mapping

Identify cross-DAG dependencies:

  • TriggerDagRunOperator calls
  • ExternalTaskSensor waits
  • Shared datasets/assets
  • Common connections

These become flow-to-flow orchestration patterns in Prefect.

Phased Migration Strategy

Phase 1: Pilot (1-2 weeks)

Goal: Validate tooling and establish patterns.

  1. Select 3-5 representative DAGs:

    • 1 simple ETL
    • 1 with provider operators
    • 1 with complex dependencies
  2. For each DAG, follow the conversion workflow:

    read_dag(path)         → Get raw source
    lookup_concept(name)   → Get translation rules for each pattern
    search_prefect_docs(q) → Look up Prefect docs as needed
    LLM generates          → Complete Prefect flow
    validate(orig, flow)   → Syntax check + comparison
    
  3. Deploy to Prefect (staging):

    prefect deploy --name pilot-dag
    
  4. Run in parallel with Airflow for comparison.

Phase 2: Batch Migration (2-4 weeks)

Goal: Convert remaining DAGs using proven patterns.

Work through DAGs systematically:

  1. Read each DAG with read_dag
  2. Look up concepts with lookup_concept for each operator/pattern
  3. Generate the flow — the LLM produces complete Prefect code
  4. Validate with validate to check syntax and structure
  5. Review the comparison guidance checklist

Use scaffold to set up the project structure:

{
  "tool": "scaffold",
  "args": {
    "dag_names": ["etl_pipeline", "ml_training", "data_sync"],
    "workspace": "data-team"
  }
}

Phase 3: Validation at Scale (1-2 weeks)

Goal: Verify all conversions before cutover.

from pathlib import Path

for dag_file in Path("dags/").glob("*.py"):
    flow_file = Path("flows/") / dag_file.name
    if flow_file.exists():
        result = await validate(
            original_dag=str(dag_file),
            converted_flow=str(flow_file)
        )
        if not result["syntax_valid"]:
            print(f"SYNTAX ERROR: {dag_file.name}")
            for err in result["syntax_errors"]:
                print(f"  Line {err['line']}: {err['message']}")
        else:
            print(f"OK: {dag_file.name}")

Phase 4: Cutover (1 week)

Goal: Switch production to Prefect.

  1. Final validation pass: Re-validate all converted flows
  2. Pause Airflow DAGs: Prevent new runs
  3. Activate Prefect deployments: Start scheduled runs
  4. Monitor: Watch for failures, compare outputs
  5. Archive Airflow: Keep DAGs read-only for reference

Validation at Scale

Automated Validation Pipeline

import json
from pathlib import Path
from datetime import datetime

def validate_all(dag_dir: Path, flow_dir: Path) -> dict:
    results = {
        "timestamp": datetime.now().isoformat(),
        "passed": [],
        "failed": [],
        "skipped": [],
    }

    for dag_file in dag_dir.glob("*.py"):
        flow_file = flow_dir / dag_file.name

        if not flow_file.exists():
            results["skipped"].append(dag_file.name)
            continue

        try:
            result = validate(
                original_dag=str(dag_file),
                converted_flow=str(flow_file)
            )

            if result["syntax_valid"]:
                results["passed"].append(dag_file.name)
            else:
                results["failed"].append({
                    "dag": dag_file.name,
                    "errors": result["syntax_errors"]
                })
        except Exception as e:
            results["failed"].append({
                "dag": dag_file.name,
                "errors": [str(e)]
            })

    return results

# Run validation
results = validate_all(Path("dags/"), Path("flows/"))

# Report
print(f"Passed: {len(results['passed'])}")
print(f"Failed: {len(results['failed'])}")
print(f"Skipped: {len(results['skipped'])}")

# Save report
with open("validation_report.json", "w") as f:
    json.dump(results, f, indent=2)

CI/CD Integration

# .github/workflows/validate-migration.yml
name: Validate Migration

on:
  push:
    paths:
      - 'dags/**'
      - 'flows/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install airflow-unfactor

      - name: Validate conversions
        run: python scripts/validate_all.py

      - name: Upload report
        uses: actions/upload-artifact@v4
        with:
          name: validation-report
          path: validation_report.json

Rollback Planning

Maintain Parallel Operations

During migration, keep both systems running:

+-------------+     +-------------+
|   Airflow   |     |   Prefect   |
|  (Primary)  | --> |  (Shadow)   |
+-------------+     +-------------+
        |                  |
        v                  v
   Same data         Compare outputs

Quick Rollback

If issues arise after cutover:

  1. Pause Prefect deployments:

    prefect deployment pause "flow-name/deployment"
    
  2. Unpause Airflow DAGs:

    airflow dags unpause dag_id
    
  3. Investigate and fix the Prefect flow

  4. Resume migration when ready

Data Validation

Compare outputs between systems:

def compare_outputs(dag_id: str, execution_date: str):
    # Get Airflow task outputs
    airflow_outputs = get_airflow_xcom(dag_id, execution_date)

    # Get Prefect task results
    prefect_outputs = get_prefect_results(dag_id, execution_date)

    # Compare
    for task_id, airflow_value in airflow_outputs.items():
        prefect_value = prefect_outputs.get(task_id)
        if airflow_value != prefect_value:
            print(f"Mismatch in {task_id}:")
            print(f"  Airflow: {airflow_value}")
            print(f"  Prefect: {prefect_value}")

Best Practices

  1. Start small — Pilot with representative DAGs first
  2. Automate validation — Run validation in CI/CD
  3. Maintain parallel — Keep Airflow running during migration
  4. Document decisions — Record patterns and exceptions
  5. Communicate progress — Share dashboards with stakeholders
  6. Plan rollback — Always have a way back to Airflow
  7. Test with real data — Validation checks syntax, not runtime correctness