TaskFlow Conversion

TaskFlow API DAGs map directly to Prefect with minimal changes. This is the cleanest migration path.

Mapping Overview

AirflowPrefect
@dag@flow
@task@task
@task_groupNested function or subflow
Return valuesReturn values (identical)
Task parametersTask parameters (identical)

Basic Conversion

Simple TaskFlow DAG

Airflow:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1))
def etl_pipeline():
    @task
    def extract():
        return {"data": [1, 2, 3, 4, 5]}

    @task
    def transform(data: dict) -> list:
        return [x * 2 for x in data["data"]]

    @task
    def load(results: list):
        print(f"Loading {len(results)} items")

    data = extract()
    transformed = transform(data)
    load(transformed)

etl_pipeline()

Prefect:

from prefect import flow, task

@task
def extract():
    return {"data": [1, 2, 3, 4, 5]}

@task
def transform(data: dict) -> list:
    return [x * 2 for x in data["data"]]

@task
def load(results: list):
    print(f"Loading {len(results)} items")

@flow(name="etl_pipeline")
def etl_pipeline():
    data = extract()
    transformed = transform(data)
    load(transformed)

Key Differences

  1. Decorators at module level: In Prefect, tasks are typically defined at module level, not nested inside the flow
  2. No schedule in decorator: Schedules are defined in deployment configuration
  3. No start_date: Prefect doesn't use start_date—runs execute when triggered
  4. Flow invocation: No need to call etl_pipeline() at module level

Task Configuration

TaskFlow task decorators translate directly:

Airflow:

@task(
    retries=3,
    retry_delay=timedelta(minutes=5),
    execution_timeout=timedelta(hours=1),
    pool="default_pool",
)
def my_task():
    ...

Prefect:

@task(
    retries=3,
    retry_delay_seconds=300,  # 5 minutes
    timeout_seconds=3600,     # 1 hour
    # pool → use concurrency limits instead
)
def my_task():
    ...

Configuration Mapping

Airflow ParameterPrefect Parameter
retriesretries
retry_delayretry_delay_seconds
retry_exponential_backoffretry_jitter_factor
execution_timeouttimeout_seconds
poolConcurrency limits
trigger_ruleExplicit control flow
on_failure_callbackAutomations
on_success_callbackAutomations

Data Passing

TaskFlow's return-value-based data passing is identical to Prefect:

Both frameworks:

@task
def extract():
    return {"raw_data": [...]}

@task
def transform(data):
    return process(data)

@flow
def pipeline():
    data = extract()           # Returns dict
    result = transform(data)   # Receives dict
    return result

No XCom needed—data flows naturally through return values and parameters.

Task Groups → Subflows

Airflow:

from airflow.decorators import dag, task, task_group

@dag
def main_dag():
    @task_group
    def extract_group():
        @task
        def extract_users():
            return users

        @task
        def extract_orders():
            return orders

        return extract_users(), extract_orders()

    @task
    def merge(users, orders):
        return combined

    users, orders = extract_group()
    merge(users, orders)

Prefect (Option 1: Nested functions):

from prefect import flow, task

@task
def extract_users():
    return users

@task
def extract_orders():
    return orders

@task
def merge(users, orders):
    return combined

@flow
def main_flow():
    # Group tasks logically in code
    users = extract_users()
    orders = extract_orders()

    result = merge(users, orders)
    return result

Prefect (Option 2: Subflows):

from prefect import flow, task

@task
def extract_users():
    return users

@task
def extract_orders():
    return orders

@flow
def extract_group():
    """Subflow for extraction tasks."""
    users = extract_users()
    orders = extract_orders()
    return users, orders

@task
def merge(users, orders):
    return combined

@flow
def main_flow():
    users, orders = extract_group()
    result = merge(users, orders)
    return result

Multiple Outputs

Airflow:

@task(multiple_outputs=True)
def extract():
    return {"users": users, "orders": orders}

@dag
def pipeline():
    data = extract()
    process_users(data["users"])
    process_orders(data["orders"])

Prefect:

@task
def extract():
    return {"users": users, "orders": orders}

@flow
def pipeline():
    data = extract()
    process_users(data["users"])
    process_orders(data["orders"])

In Prefect, all task returns can be dictionaries—no special decorator needed.

Dynamic Task Mapping

Airflow:

@task
def get_items():
    return [1, 2, 3, 4, 5]

@task
def process(item):
    return item * 2

@dag
def pipeline():
    items = get_items()
    process.expand(item=items)

Prefect:

@task
def get_items():
    return [1, 2, 3, 4, 5]

@task
def process(item):
    return item * 2

@flow
def pipeline():
    items = get_items()
    results = process.map(items)  # .map() instead of .expand()
    return results

Best Practices

  1. Keep task bodies unchanged: The business logic inside tasks rarely needs modification
  2. Move tasks to module level: Better for testing and reuse
  3. Use subflows for grouping: When you need logical grouping with separate run tracking
  4. Configure via deployment: Move schedules and resource configuration to prefect.yaml
  5. Test independently: Tasks and flows can be tested as regular Python functions