Quick Start
The Workflow
airflow-unfactor reads your DAG, provides translation knowledge, and the LLM generates the Prefect code.
1. read_dag(path) → Raw DAG source code
2. lookup_concept(name) → Airflow→Prefect mappings
3. search_prefect_docs(q) → Live Prefect docs (optional)
4. LLM generates → Complete Prefect flow
5. validate(orig, flow) → Syntax check + comparison
Step 1: Read the DAG
{
"tool": "read_dag",
"args": { "path": "dags/my_etl.py" }
}
Returns the raw source code, file path, size, and line count. The LLM reads the code directly.
Step 2: Look Up Translation Knowledge
For each Airflow concept found in the DAG:
{
"tool": "lookup_concept",
"args": { "concept": "PythonOperator" }
}
Returns:
- Prefect equivalent (
@taskdecorator) - Translation rules (how to convert)
- Source: "colin" (pre-compiled) or "fallback" (built-in)
Repeat for each concept: connections, patterns, sensors, etc.
Step 3: Search Prefect Docs (Optional)
For anything not covered by lookup_concept:
{
"tool": "search_prefect_docs",
"args": { "query": "task caching and retries" }
}
Returns live search results from docs.prefect.io.
Step 4: LLM Generates Code
Using the DAG source and translation knowledge, your LLM generates complete Prefect flow code:
from prefect import flow, task
@task(retries=3, retry_delay_seconds=300)
def extract_data(source: str) -> dict:
"""Extract data from source."""
return data
@task
def transform_data(raw: dict) -> dict:
"""Transform extracted data."""
return processed
@flow(name="my-etl", log_prints=True)
def my_etl_flow():
data = extract_data("s3://bucket/path")
transformed = transform_data(data)
load_data(transformed)
Step 5: Validate
{
"tool": "validate",
"args": {
"original_dag": "dags/my_etl.py",
"converted_flow": "from prefect import flow, task\n..."
}
}
Returns:
- Both source files for side-by-side comparison
- Syntax validation of the generated code
- Comparison checklist (tasks, dependencies, config)
Target Project Layout
Generated code should follow prefecthq/flows structure:
deployments/<workspace>/<flow-name>/
├── flow.py # Main flow code
├── Dockerfile # If custom deps needed
├── requirements.txt # Python dependencies
prefect.yaml # Deployment configuration
Example prefect.yaml
name: flows
prefect-version: 3.0.0
deployments:
- name: My ETL Flow
description: Converted from Airflow DAG 'my_etl'
entrypoint: deployments/data/my-etl/flow.py:my_etl_flow
schedules:
- cron: "0 * * * *"
work_pool:
name: kubernetes-pool