Datasets → Prefect Events

Convert Airflow Dataset/Asset dependencies into Prefect event-based automation.

Use lookup_concept("dataset-to-automation") for translation guidance.

What to Look For

  • Dataset("...") and Asset("...") definitions
  • @task(outlets=[...]) producers
  • @dag(schedule=[...]) consumers

Prefect Equivalents

Airflow patternPrefect equivalent
Dataset("s3://bucket/data.csv")emit_event("dataset.s3.bucket.data.updated")
Asset("s3://bucket/data.csv")Event emission + asset materialization
@task(outlets=[dataset_or_asset])emit_event(...) at end of task
@dag(schedule=[dataset_or_asset])Event-based automation trigger in prefect.yaml

Behavior Delta: Datasets vs Assets

  • Both Datasets and Assets are converted into event-based automation outputs.
  • Assets additionally produce materialization_code.
  • materialization_code is intentionally a scaffold and must be finalized for your Prefect runtime.

Manual Follow-Up Required

  • Replace materialization placeholders with your concrete Prefect asset implementation.
  • Finalize naming/ownership/lineage conventions for assets.
  • Validate trigger patterns in deployment_yaml against your deployment topology.

Example

Airflow input:

from airflow.assets import Asset
from airflow.decorators import task

sales_asset = Asset("s3://sales/daily")

@task(outlets=[sales_asset])
def build_sales():
    ...

Generated output (simplified):

from prefect.events import emit_event

def emit_sales_asset_updated():
    emit_event(
        event="dataset.s3.sales.daily.updated",
        resource={"prefect.resource.id": "s3://sales/daily"},
    )
from prefect.assets import materialize

@materialize("s3://sales/daily")
def materialize_sales_asset() -> str:
    # TODO: replace with concrete Prefect asset API usage
    return "s3://sales/daily"