Sensors → Prefect Patterns

Airflow sensors wait for a condition to be met before proceeding. The right Prefect equivalent depends on two factors: how long the wait is, and whether the external system can push events.

Three Patterns

PatternWhen to useWorker held?
Polling retriesShort waits (seconds–minutes), or system can't pushYes
Webhooks + AutomationsLong waits, or system supports event notificationsNo
pause_untilHuman-in-the-loop, mid-flow approval gates (Prefect 3+)No

Pattern 1: Polling Retries

Convert the sensor to a @task that raises an exception when the condition is not yet met. Prefect retries on exception.

@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> str:
    from pathlib import Path
    if not Path(filepath).exists():
        raise FileNotFoundError(f"{filepath} not yet available")
    return filepath

Parameter mapping:

  • poke_intervalretry_delay_seconds
  • timeout / poke_intervalretries count
  • mode="poke" → this pattern

Pattern 2: Webhooks + Automations

Configure the external system to POST to a Prefect Webhook when the condition is met. Create an Automation to trigger the deployment when the event fires. No worker is held during the wait.

This is the resource-efficient equivalent of mode="reschedule" and deferrable=True.

# prefect.yaml
deployments:
  - name: process-on-arrival
    entrypoint: flows/process.py:process_flow
    triggers:
      - type: event
        match:
          prefect.resource.id: "webhook.*"
        expect:
          - file.arrived

The external system (a cron job, a cloud function, a CI pipeline) calls the Prefect Webhook URL when ready. Prefect emits the event and the Automation fires the deployment.

When to use:

  • mode="reschedule" sensors — don't poll, let the system notify you
  • deferrable=True sensors — same intent, no Triggerer component needed
  • Cloud storage events (S3, GCS) with event notification configured
  • APIs that support outbound webhooks
  • Any wait measured in hours rather than minutes

Pattern 3: pause_until (Prefect 3+)

Suspends the flow run without holding a worker slot. Best for human approval gates or long synchronization waits with a defined completion signal.

from prefect import flow, pause_flow_run

@flow
def approval_workflow():
    result = prepare_report()
    await pause_flow_run(timeout=86400)  # wait up to 24h
    publish_report(result)

Common Sensor Conversions

S3KeySensor

Airflow:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait = S3KeySensor(
    task_id="wait_for_s3",
    bucket_key="s3://my-bucket/data/output.parquet",
    aws_conn_id="aws_default",
    poke_interval=300,
    timeout=7200,
)

Prefect (Pattern 1 — polling):

from prefect_aws import S3Bucket
from prefect import task

@task(retries=24, retry_delay_seconds=300)
def wait_for_s3(key: str) -> str:
    bucket = S3Bucket.load("my-bucket")
    objects = bucket.list_objects(folder=key.rsplit("/", 1)[0])
    if not any(key in str(o) for o in objects):
        raise FileNotFoundError(f"S3 key {key} not yet available")
    return key

Prefect (Pattern 2 — event-driven):

Configure S3 Event Notifications on your bucket to POST to a Prefect Webhook. Create an Automation: trigger your deployment when the webhook event fires. No task needed — the flow starts automatically when the object arrives.


ExternalTaskSensor

Airflow:

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_upstream = ExternalTaskSensor(
    task_id="wait_for_upstream",
    external_dag_id="upstream_dag",
    external_task_id="final_task",
    timeout=7200,
)

Prefect (Automation on flow completion):

# prefect.yaml
deployments:
  - name: downstream-flow
    entrypoint: flows/downstream.py:downstream_flow
    triggers:
      - type: event
        match_related:
          - prefect.resource.role: flow
            prefect.resource.id: "prefect.flow.upstream-flow"
        expect:
          - prefect.flow-run.Completed

Prefect (explicit dependency):

from prefect.deployments import run_deployment

@flow
def orchestrator():
    upstream_result = run_deployment("upstream-flow/production", as_subflow=True)
    downstream_flow(upstream_result)

HttpSensor

Airflow:

from airflow.providers.http.sensors.http import HttpSensor

wait = HttpSensor(
    task_id="wait_for_api",
    http_conn_id="api",
    endpoint="status",
    response_check=lambda r: r.json()["ready"],
    poke_interval=30,
    timeout=600,
)

Prefect (Pattern 1 — polling):

import httpx
from prefect import task

@task(retries=20, retry_delay_seconds=30)
def wait_for_api(url: str) -> bool:
    resp = httpx.get(f"{url}/status")
    if not resp.json().get("ready"):
        raise Exception("API not ready")
    return True

Prefect (Pattern 2 — webhook):

If the API supports outbound webhooks, configure it to POST to a Prefect Webhook when it becomes ready. Create an Automation to trigger the deployment — no polling task needed.


FileSensor

Airflow:

from airflow.sensors.filesystem import FileSensor

wait = FileSensor(
    task_id="wait_for_file",
    filepath="/data/input.csv",
    poke_interval=60,
    timeout=3600,
)

Prefect:

from pathlib import Path
from prefect import task

@task(retries=60, retry_delay_seconds=60)
def wait_for_file(filepath: str) -> str:
    if not Path(filepath).exists():
        raise FileNotFoundError(f"{filepath} not yet available")
    return filepath

For glob patterns, use Path.glob() inside the task.


SqlSensor

Airflow:

from airflow.sensors.sql import SqlSensor

wait = SqlSensor(
    task_id="wait_for_data",
    conn_id="postgres_default",
    sql="SELECT COUNT(*) FROM staging WHERE date = '{{ ds }}'",
    poke_interval=120,
    timeout=3600,
)

Prefect:

from prefect_sqlalchemy import SqlAlchemyConnector
from sqlalchemy import text
from prefect import task

@task(retries=30, retry_delay_seconds=120)
def wait_for_data(date: str) -> bool:
    connector = SqlAlchemyConnector.load("postgres-default")
    with connector.get_connection() as conn:
        result = conn.execute(
            text("SELECT COUNT(*) FROM staging WHERE date = :d"),
            {"d": date}
        ).scalar()
    if not result:
        raise Exception(f"No data for {date} yet")
    return True

Jinja templates in SQL ({{ ds }}) must be replaced with parameterized queries.


DateTimeSensor

Airflow:

from airflow.sensors.date_time import DateTimeSensor

wait = DateTimeSensor(
    task_id="wait_until_noon",
    target_time="{{ execution_date.replace(hour=12) }}",
)

Prefect (short waits):

from datetime import datetime
import time
from prefect import task

@task
def wait_until(target: datetime) -> None:
    now = datetime.now(target.tzinfo)
    if now < target:
        time.sleep((target - now).total_seconds())

Prefect (long waits — preferred):

Schedule the deployment to run at the target time. Use a cron schedule in prefect.yaml rather than sleeping inside a flow.


GCSObjectExistenceSensor

Airflow:

from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor

wait = GCSObjectExistenceSensor(
    task_id="wait_for_gcs",
    bucket="my-bucket",
    object="data/output.parquet",
    poke_interval=300,
)

Prefect (Pattern 1 — polling):

from prefect_gcp import GcsBucket
from prefect import task

@task(retries=24, retry_delay_seconds=300)
def wait_for_gcs(blob_name: str) -> str:
    bucket = GcsBucket.load("my-gcs-bucket")
    blobs = bucket.list_blobs(folder=blob_name.rsplit("/", 1)[0])
    if not any(blob_name in b for b in blobs):
        raise FileNotFoundError(f"GCS object {blob_name} not found")
    return blob_name

Prefect (Pattern 2 — event-driven):

Configure GCS Pub/Sub notifications on the bucket. Route to a Prefect Webhook via a Cloud Function or direct Pub/Sub push. Create an Automation to trigger the deployment — no worker held during the wait.


Deferrable Operators

Airflow's deferrable=True suspends the operator and frees the worker slot by handing off to the Triggerer service. Prefect has no Triggerer component.

The equivalent architecture is Pattern 2 (Webhooks + Automations): configure the external system to fire an event when ready, and let the Automation trigger the deployment. This achieves the same resource efficiency — no worker held — without requiring special infrastructure.

For cases where the wait is internal to the flow (not driven by an external event), use pause_until (Pattern 3).


Decision Guide

Is the wait seconds to a few minutes?
  YES → Pattern 1 (polling retries)

Can the external system send a webhook or event notification?
  YES → Pattern 2 (webhooks + automations) — preferred for efficiency
  NO  → Pattern 1 (polling retries)

Is this a human approval gate or mid-flow suspension?
  YES → Pattern 3 (pause_until)

Was the Airflow sensor using mode='reschedule' or deferrable=True?
  → Pattern 2 is the correct architectural match