Datasets → Prefect Events
Convert Airflow Dataset/Asset dependencies into Prefect event-based automation.
Use lookup_concept("dataset-to-automation") for translation guidance.
What to Look For
Dataset("...")andAsset("...")definitions@task(outlets=[...])producers@dag(schedule=[...])consumers
Prefect Equivalents
| Airflow pattern | Prefect equivalent |
|---|---|
Dataset("s3://bucket/data.csv") | emit_event("dataset.s3.bucket.data.updated") |
Asset("s3://bucket/data.csv") | Event emission + asset materialization |
@task(outlets=[dataset_or_asset]) | emit_event(...) at end of task |
@dag(schedule=[dataset_or_asset]) | Event-based automation trigger in prefect.yaml |
Behavior Delta: Datasets vs Assets
- Both Datasets and Assets are converted into event-based automation outputs.
- Assets additionally produce
materialization_code. materialization_codeis intentionally a scaffold and must be finalized for your Prefect runtime.
Manual Follow-Up Required
- Replace materialization placeholders with your concrete Prefect asset implementation.
- Finalize naming/ownership/lineage conventions for assets.
- Validate trigger patterns in
deployment_yamlagainst your deployment topology.
Example
Airflow input:
from airflow.assets import Asset
from airflow.decorators import task
sales_asset = Asset("s3://sales/daily")
@task(outlets=[sales_asset])
def build_sales():
...
Generated output (simplified):
from prefect.events import emit_event
def emit_sales_asset_updated():
emit_event(
event="dataset.s3.sales.daily.updated",
resource={"prefect.resource.id": "s3://sales/daily"},
)
from prefect.assets import materialize
@materialize("s3://sales/daily")
def materialize_sales_asset() -> str:
# TODO: replace with concrete Prefect asset API usage
return "s3://sales/daily"