Sensors → Prefect Patterns
Airflow sensors wait for a condition to be met before proceeding. The right Prefect equivalent depends on two factors: how long the wait is, and whether the external system can push events.
Three Patterns
| Pattern | When to use | Worker held? |
|---|---|---|
| Polling retries | Short waits (seconds–minutes), or system can't push | Yes |
| Webhooks + Automations | Long waits, or system supports event notifications | No |
pause_until | Human-in-the-loop, mid-flow approval gates (Prefect 3+) | No |
Pattern 1: Polling Retries
Convert the sensor to a @task that raises an exception when the condition is not yet met. Prefect retries on exception.
@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> str:
from pathlib import Path
if not Path(filepath).exists():
raise FileNotFoundError(f"{filepath} not yet available")
return filepath
Parameter mapping:
poke_interval→retry_delay_secondstimeout / poke_interval≈retriescountmode="poke"→ this pattern
Pattern 2: Webhooks + Automations
Configure the external system to POST to a Prefect Webhook when the condition is met. Create an Automation to trigger the deployment when the event fires. No worker is held during the wait.
This is the resource-efficient equivalent of mode="reschedule" and deferrable=True.
# prefect.yaml
deployments:
- name: process-on-arrival
entrypoint: flows/process.py:process_flow
triggers:
- type: event
match:
prefect.resource.id: "webhook.*"
expect:
- file.arrived
The external system (a cron job, a cloud function, a CI pipeline) calls the Prefect Webhook URL when ready. Prefect emits the event and the Automation fires the deployment.
When to use:
mode="reschedule"sensors — don't poll, let the system notify youdeferrable=Truesensors — same intent, no Triggerer component needed- Cloud storage events (S3, GCS) with event notification configured
- APIs that support outbound webhooks
- Any wait measured in hours rather than minutes
Pattern 3: pause_until (Prefect 3+)
Suspends the flow run without holding a worker slot. Best for human approval gates or long synchronization waits with a defined completion signal.
from prefect import flow, pause_flow_run
@flow
def approval_workflow():
result = prepare_report()
await pause_flow_run(timeout=86400) # wait up to 24h
publish_report(result)
Common Sensor Conversions
S3KeySensor
Airflow:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait = S3KeySensor(
task_id="wait_for_s3",
bucket_key="s3://my-bucket/data/output.parquet",
aws_conn_id="aws_default",
poke_interval=300,
timeout=7200,
)
Prefect (Pattern 1 — polling):
from prefect_aws import S3Bucket
from prefect import task
@task(retries=24, retry_delay_seconds=300)
def wait_for_s3(key: str) -> str:
bucket = S3Bucket.load("my-bucket")
objects = bucket.list_objects(folder=key.rsplit("/", 1)[0])
if not any(key in str(o) for o in objects):
raise FileNotFoundError(f"S3 key {key} not yet available")
return key
Prefect (Pattern 2 — event-driven):
Configure S3 Event Notifications on your bucket to POST to a Prefect Webhook. Create an Automation: trigger your deployment when the webhook event fires. No task needed — the flow starts automatically when the object arrives.
ExternalTaskSensor
Airflow:
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_upstream",
external_dag_id="upstream_dag",
external_task_id="final_task",
timeout=7200,
)
Prefect (Automation on flow completion):
# prefect.yaml
deployments:
- name: downstream-flow
entrypoint: flows/downstream.py:downstream_flow
triggers:
- type: event
match_related:
- prefect.resource.role: flow
prefect.resource.id: "prefect.flow.upstream-flow"
expect:
- prefect.flow-run.Completed
Prefect (explicit dependency):
from prefect.deployments import run_deployment
@flow
def orchestrator():
upstream_result = run_deployment("upstream-flow/production", as_subflow=True)
downstream_flow(upstream_result)
HttpSensor
Airflow:
from airflow.providers.http.sensors.http import HttpSensor
wait = HttpSensor(
task_id="wait_for_api",
http_conn_id="api",
endpoint="status",
response_check=lambda r: r.json()["ready"],
poke_interval=30,
timeout=600,
)
Prefect (Pattern 1 — polling):
import httpx
from prefect import task
@task(retries=20, retry_delay_seconds=30)
def wait_for_api(url: str) -> bool:
resp = httpx.get(f"{url}/status")
if not resp.json().get("ready"):
raise Exception("API not ready")
return True
Prefect (Pattern 2 — webhook):
If the API supports outbound webhooks, configure it to POST to a Prefect Webhook when it becomes ready. Create an Automation to trigger the deployment — no polling task needed.
FileSensor
Airflow:
from airflow.sensors.filesystem import FileSensor
wait = FileSensor(
task_id="wait_for_file",
filepath="/data/input.csv",
poke_interval=60,
timeout=3600,
)
Prefect:
from pathlib import Path
from prefect import task
@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> str:
if not Path(filepath).exists():
raise FileNotFoundError(f"{filepath} not yet available")
return filepath
For glob patterns, use Path.glob() inside the task.
SqlSensor
Airflow:
from airflow.sensors.sql import SqlSensor
wait = SqlSensor(
task_id="wait_for_data",
conn_id="postgres_default",
sql="SELECT COUNT(*) FROM staging WHERE date = '{{ ds }}'",
poke_interval=120,
timeout=3600,
)
Prefect:
from prefect_sqlalchemy import SqlAlchemyConnector
from sqlalchemy import text
from prefect import task
@task(retries=30, retry_delay_seconds=120)
def wait_for_data(date: str) -> bool:
connector = SqlAlchemyConnector.load("postgres-default")
with connector.get_connection() as conn:
result = conn.execute(
text("SELECT COUNT(*) FROM staging WHERE date = :d"),
{"d": date}
).scalar()
if not result:
raise Exception(f"No data for {date} yet")
return True
Jinja templates in SQL ({{ ds }}) must be replaced with parameterized queries.
DateTimeSensor
Airflow:
from airflow.sensors.date_time import DateTimeSensor
wait = DateTimeSensor(
task_id="wait_until_noon",
target_time="{{ execution_date.replace(hour=12) }}",
)
Prefect (short waits):
from datetime import datetime
import time
from prefect import task
@task
def wait_until(target: datetime) -> None:
now = datetime.now(target.tzinfo)
if now < target:
time.sleep((target - now).total_seconds())
Prefect (long waits — preferred):
Schedule the deployment to run at the target time. Use a cron schedule in prefect.yaml rather than sleeping inside a flow.
GCSObjectExistenceSensor
Airflow:
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
wait = GCSObjectExistenceSensor(
task_id="wait_for_gcs",
bucket="my-bucket",
object="data/output.parquet",
poke_interval=300,
)
Prefect (Pattern 1 — polling):
from prefect_gcp import GcsBucket
from prefect import task
@task(retries=24, retry_delay_seconds=300)
def wait_for_gcs(blob_name: str) -> str:
bucket = GcsBucket.load("my-gcs-bucket")
blobs = bucket.list_blobs(folder=blob_name.rsplit("/", 1)[0])
if not any(blob_name in b for b in blobs):
raise FileNotFoundError(f"GCS object {blob_name} not found")
return blob_name
Prefect (Pattern 2 — event-driven):
Configure GCS Pub/Sub notifications on the bucket. Route to a Prefect Webhook via a Cloud Function or direct Pub/Sub push. Create an Automation to trigger the deployment — no worker held during the wait.
Deferrable Operators
Airflow's deferrable=True suspends the operator and frees the worker slot by handing off to the Triggerer service. Prefect has no Triggerer component.
The equivalent architecture is Pattern 2 (Webhooks + Automations): configure the external system to fire an event when ready, and let the Automation trigger the deployment. This achieves the same resource efficiency — no worker held — without requiring special infrastructure.
For cases where the wait is internal to the flow (not driven by an external event), use pause_until (Pattern 3).
Decision Guide
Is the wait seconds to a few minutes?
YES → Pattern 1 (polling retries)
Can the external system send a webhook or event notification?
YES → Pattern 2 (webhooks + automations) — preferred for efficiency
NO → Pattern 1 (polling retries)
Is this a human approval gate or mid-flow suspension?
YES → Pattern 3 (pause_until)
Was the Airflow sensor using mode='reschedule' or deferrable=True?
→ Pattern 2 is the correct architectural match