TaskFlow Conversion
TaskFlow API DAGs map directly to Prefect with minimal changes. This is the cleanest migration path.
Mapping Overview
| Airflow | Prefect |
|---|---|
@dag | @flow |
@task | @task |
@task_group | Nested function or subflow |
| Return values | Return values (identical) |
| Task parameters | Task parameters (identical) |
Basic Conversion
Simple TaskFlow DAG
Airflow:
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def etl_pipeline():
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data: dict) -> list:
return [x * 2 for x in data["data"]]
@task
def load(results: list):
print(f"Loading {len(results)} items")
data = extract()
transformed = transform(data)
load(transformed)
etl_pipeline()
Prefect:
from prefect import flow, task
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data: dict) -> list:
return [x * 2 for x in data["data"]]
@task
def load(results: list):
print(f"Loading {len(results)} items")
@flow(name="etl_pipeline")
def etl_pipeline():
data = extract()
transformed = transform(data)
load(transformed)
Key Differences
- Decorators at module level: In Prefect, tasks are typically defined at module level, not nested inside the flow
- No schedule in decorator: Schedules are defined in deployment configuration
- No start_date: Prefect doesn't use start_date—runs execute when triggered
- Flow invocation: No need to call
etl_pipeline()at module level
Task Configuration
TaskFlow task decorators translate directly:
Airflow:
@task(
retries=3,
retry_delay=timedelta(minutes=5),
execution_timeout=timedelta(hours=1),
pool="default_pool",
)
def my_task():
...
Prefect:
@task(
retries=3,
retry_delay_seconds=300, # 5 minutes
timeout_seconds=3600, # 1 hour
# pool → use concurrency limits instead
)
def my_task():
...
Configuration Mapping
| Airflow Parameter | Prefect Parameter |
|---|---|
retries | retries |
retry_delay | retry_delay_seconds |
retry_exponential_backoff | retry_jitter_factor |
execution_timeout | timeout_seconds |
pool | Concurrency limits |
trigger_rule | Explicit control flow |
on_failure_callback | Automations |
on_success_callback | Automations |
Data Passing
TaskFlow's return-value-based data passing is identical to Prefect:
Both frameworks:
@task
def extract():
return {"raw_data": [...]}
@task
def transform(data):
return process(data)
@flow
def pipeline():
data = extract() # Returns dict
result = transform(data) # Receives dict
return result
No XCom needed—data flows naturally through return values and parameters.
Task Groups → Subflows
Airflow:
from airflow.decorators import dag, task, task_group
@dag
def main_dag():
@task_group
def extract_group():
@task
def extract_users():
return users
@task
def extract_orders():
return orders
return extract_users(), extract_orders()
@task
def merge(users, orders):
return combined
users, orders = extract_group()
merge(users, orders)
Prefect (Option 1: Nested functions):
from prefect import flow, task
@task
def extract_users():
return users
@task
def extract_orders():
return orders
@task
def merge(users, orders):
return combined
@flow
def main_flow():
# Group tasks logically in code
users = extract_users()
orders = extract_orders()
result = merge(users, orders)
return result
Prefect (Option 2: Subflows):
from prefect import flow, task
@task
def extract_users():
return users
@task
def extract_orders():
return orders
@flow
def extract_group():
"""Subflow for extraction tasks."""
users = extract_users()
orders = extract_orders()
return users, orders
@task
def merge(users, orders):
return combined
@flow
def main_flow():
users, orders = extract_group()
result = merge(users, orders)
return result
Multiple Outputs
Airflow:
@task(multiple_outputs=True)
def extract():
return {"users": users, "orders": orders}
@dag
def pipeline():
data = extract()
process_users(data["users"])
process_orders(data["orders"])
Prefect:
@task
def extract():
return {"users": users, "orders": orders}
@flow
def pipeline():
data = extract()
process_users(data["users"])
process_orders(data["orders"])
In Prefect, all task returns can be dictionaries—no special decorator needed.
Dynamic Task Mapping
Airflow:
@task
def get_items():
return [1, 2, 3, 4, 5]
@task
def process(item):
return item * 2
@dag
def pipeline():
items = get_items()
process.expand(item=items)
Prefect:
@task
def get_items():
return [1, 2, 3, 4, 5]
@task
def process(item):
return item * 2
@flow
def pipeline():
items = get_items()
results = process.map(items) # .map() instead of .expand()
return results
Best Practices
- Keep task bodies unchanged: The business logic inside tasks rarely needs modification
- Move tasks to module level: Better for testing and reuse
- Use subflows for grouping: When you need logical grouping with separate run tracking
- Configure via deployment: Move schedules and resource configuration to prefect.yaml
- Test independently: Tasks and flows can be tested as regular Python functions