Enterprise Migration Guide
Patterns and strategies for migrating large DAG portfolios to Prefect.
Assessment Phase
Before starting migration, assess your DAG portfolio:
1. Inventory Analysis
# Count DAGs and operators
find dags/ -name "*.py" -exec grep -l "DAG\|@dag" {} \; | wc -l
# Identify unique operators
grep -rh "Operator\|Sensor" dags/ | sort | uniq -c | sort -rn
Use read_dag to examine each DAG:
{
"tool": "read_dag",
"args": { "path": "dags/etl_pipeline.py" }
}
Then use lookup_concept for each operator and pattern found in the source.
2. Complexity Scoring
Categorize DAGs by migration complexity:
| Complexity | Characteristics | Approach |
|---|---|---|
| Simple | < 10 tasks, standard operators, no XCom | LLM converts with minimal guidance |
| Medium | 10-30 tasks, provider operators, basic XCom | LLM converts with concept lookups |
| Complex | 30+ tasks, custom operators, dynamic patterns | LLM converts with manual review |
3. Dependency Mapping
Identify cross-DAG dependencies:
TriggerDagRunOperatorcallsExternalTaskSensorwaits- Shared datasets/assets
- Common connections
These become flow-to-flow orchestration patterns in Prefect.
Phased Migration Strategy
Phase 1: Pilot (1-2 weeks)
Goal: Validate tooling and establish patterns.
-
Select 3-5 representative DAGs:
- 1 simple ETL
- 1 with provider operators
- 1 with complex dependencies
-
For each DAG, follow the conversion workflow:
read_dag(path) → Get raw source lookup_concept(name) → Get translation rules for each pattern search_prefect_docs(q) → Look up Prefect docs as needed LLM generates → Complete Prefect flow validate(orig, flow) → Syntax check + comparison -
Deploy to Prefect (staging):
prefect deploy --name pilot-dag -
Run in parallel with Airflow for comparison.
Phase 2: Batch Migration (2-4 weeks)
Goal: Convert remaining DAGs using proven patterns.
Work through DAGs systematically:
- Read each DAG with
read_dag - Look up concepts with
lookup_conceptfor each operator/pattern - Generate the flow — the LLM produces complete Prefect code
- Validate with
validateto check syntax and structure - Review the comparison guidance checklist
Use scaffold to set up the project structure:
{
"tool": "scaffold",
"args": {
"dag_names": ["etl_pipeline", "ml_training", "data_sync"],
"workspace": "data-team"
}
}
Phase 3: Validation at Scale (1-2 weeks)
Goal: Verify all conversions before cutover.
from pathlib import Path
for dag_file in Path("dags/").glob("*.py"):
flow_file = Path("flows/") / dag_file.name
if flow_file.exists():
result = await validate(
original_dag=str(dag_file),
converted_flow=str(flow_file)
)
if not result["syntax_valid"]:
print(f"SYNTAX ERROR: {dag_file.name}")
for err in result["syntax_errors"]:
print(f" Line {err['line']}: {err['message']}")
else:
print(f"OK: {dag_file.name}")
Phase 4: Cutover (1 week)
Goal: Switch production to Prefect.
- Final validation pass: Re-validate all converted flows
- Pause Airflow DAGs: Prevent new runs
- Activate Prefect deployments: Start scheduled runs
- Monitor: Watch for failures, compare outputs
- Archive Airflow: Keep DAGs read-only for reference
Validation at Scale
Automated Validation Pipeline
import json
from pathlib import Path
from datetime import datetime
def validate_all(dag_dir: Path, flow_dir: Path) -> dict:
results = {
"timestamp": datetime.now().isoformat(),
"passed": [],
"failed": [],
"skipped": [],
}
for dag_file in dag_dir.glob("*.py"):
flow_file = flow_dir / dag_file.name
if not flow_file.exists():
results["skipped"].append(dag_file.name)
continue
try:
result = validate(
original_dag=str(dag_file),
converted_flow=str(flow_file)
)
if result["syntax_valid"]:
results["passed"].append(dag_file.name)
else:
results["failed"].append({
"dag": dag_file.name,
"errors": result["syntax_errors"]
})
except Exception as e:
results["failed"].append({
"dag": dag_file.name,
"errors": [str(e)]
})
return results
# Run validation
results = validate_all(Path("dags/"), Path("flows/"))
# Report
print(f"Passed: {len(results['passed'])}")
print(f"Failed: {len(results['failed'])}")
print(f"Skipped: {len(results['skipped'])}")
# Save report
with open("validation_report.json", "w") as f:
json.dump(results, f, indent=2)
CI/CD Integration
# .github/workflows/validate-migration.yml
name: Validate Migration
on:
push:
paths:
- 'dags/**'
- 'flows/**'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install airflow-unfactor
- name: Validate conversions
run: python scripts/validate_all.py
- name: Upload report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: validation_report.json
Rollback Planning
Maintain Parallel Operations
During migration, keep both systems running:
+-------------+ +-------------+
| Airflow | | Prefect |
| (Primary) | --> | (Shadow) |
+-------------+ +-------------+
| |
v v
Same data Compare outputs
Quick Rollback
If issues arise after cutover:
-
Pause Prefect deployments:
prefect deployment pause "flow-name/deployment" -
Unpause Airflow DAGs:
airflow dags unpause dag_id -
Investigate and fix the Prefect flow
-
Resume migration when ready
Data Validation
Compare outputs between systems:
def compare_outputs(dag_id: str, execution_date: str):
# Get Airflow task outputs
airflow_outputs = get_airflow_xcom(dag_id, execution_date)
# Get Prefect task results
prefect_outputs = get_prefect_results(dag_id, execution_date)
# Compare
for task_id, airflow_value in airflow_outputs.items():
prefect_value = prefect_outputs.get(task_id)
if airflow_value != prefect_value:
print(f"Mismatch in {task_id}:")
print(f" Airflow: {airflow_value}")
print(f" Prefect: {prefect_value}")
Best Practices
- Start small — Pilot with representative DAGs first
- Automate validation — Run validation in CI/CD
- Maintain parallel — Keep Airflow running during migration
- Document decisions — Record patterns and exceptions
- Communicate progress — Share dashboards with stakeholders
- Plan rollback — Always have a way back to Airflow
- Test with real data — Validation checks syntax, not runtime correctness