Prefect Cloud Integration
Leverage Prefect Cloud features for production deployments of migrated flows.
Work Pools and Workers
Migrated runbooks include work pool recommendations. Choose based on your needs:
Process Work Pool
Best for: Local development, simple flows, quick iteration.
# Create the work pool
prefect work-pool create "my-flow-pool" --type process
# Start a worker
prefect worker start --pool "my-flow-pool"
Docker Work Pool
Best for: Production deployments, isolated environments, reproducible builds.
# Create the work pool
prefect work-pool create "my-flow-pool" --type docker
# Start a worker
prefect worker start --pool "my-flow-pool"
Configure in prefect.yaml:
deployments:
- name: my-etl
work_pool:
name: "my-flow-pool"
build:
- prefect_docker.deployments.steps.build_docker_image:
image_name: "my-registry/my-flow"
dockerfile: "Dockerfile"
Kubernetes Work Pool
Best for: Kubernetes clusters, auto-scaling, cloud-native deployments.
# Create the work pool
prefect work-pool create "my-flow-pool" --type kubernetes
# Deploy workers via Helm
helm install prefect-worker prefect/prefect-worker \
--set worker.config.workPool="my-flow-pool"
Automations for Callbacks
Replace Airflow callbacks with Prefect Automations for robust alerting.
Failure Notifications (on_failure_callback)
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger
from prefect.events.actions import SendNotification
failure_automation = Automation(
name="my-flow-failure-alert",
description="Alert on flow failures",
trigger=EventTrigger(
match={"prefect.resource.id": "prefect.flow-run.*"},
expect=["prefect.flow-run.Failed"],
posture="Reactive",
threshold=1,
),
actions=[
SendNotification(
block_document_id="your-slack-webhook-block-id",
subject="Flow Failed",
body="{{ flow_run.name }} has failed. Check logs for details.",
)
],
)
Via Prefect UI:
- Navigate to Automations -> Create Automation
- Trigger: Flow run enters "Failed" state
- Action: Send notification (Slack, Email, PagerDuty, etc.)
Success Notifications (on_success_callback)
success_automation = Automation(
name="my-flow-success-alert",
description="Notify on successful completion",
trigger=EventTrigger(
match={"prefect.resource.id": "prefect.flow-run.*"},
expect=["prefect.flow-run.Completed"],
posture="Reactive",
threshold=1,
),
actions=[
SendNotification(
block_document_id="your-slack-webhook-block-id",
subject="Flow Completed",
body="{{ flow_run.name }} completed successfully.",
)
],
)
SLA Monitoring (sla_miss_callback)
from datetime import timedelta
sla_automation = Automation(
name="my-flow-sla-monitor",
description="Alert when flow exceeds expected duration",
trigger=EventTrigger(
match={"prefect.resource.id": "prefect.flow-run.*"},
expect=["prefect.flow-run.Running"],
posture="Proactive",
threshold=1,
within=timedelta(hours=2), # SLA threshold
),
actions=[
SendNotification(
block_document_id="your-slack-webhook-block-id",
subject="SLA Breach Warning",
body="Flow has been running longer than expected.",
)
],
)
Concurrency Management
Replace Airflow's pool and pool_slots with Prefect concurrency controls.
Work Pool Concurrency
Limit concurrent flow runs in a work pool:
prefect work-pool update my-pool --concurrency-limit 5
Global Concurrency Limits
For cross-flow resource management:
# Create a concurrency limit
prefect concurrency-limit create "database-connections" 10
Use in your flow:
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def my_flow():
with concurrency("database-connections", occupy=1):
# Database operations here
pass
Task Concurrency
Limit concurrent task executions:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def process_item(item):
return transform(item)
@flow(task_runner=ConcurrentTaskRunner(max_workers=3))
def my_flow():
items = get_items()
results = process_item.map(items)
return results
Secrets and Blocks
Replace Airflow connections with Prefect Blocks.
Database Connections
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents
# Create the block
db_block = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver="postgresql+psycopg2",
host="your-host",
port=5432,
database="your-db",
username="your-user",
password="your-password", # Or use Secret block
)
)
db_block.save("postgres-prod")
# Use in your flow
@task
def query_database():
connector = SqlAlchemyConnector.load("postgres-prod")
with connector.get_connection() as conn:
return conn.execute("SELECT * FROM users").fetchall()
Cloud Provider Credentials
AWS:
from prefect_aws import AwsCredentials
aws_creds = AwsCredentials(
aws_access_key_id="your-key-id",
aws_secret_access_key="your-secret-key",
region_name="us-east-1",
)
aws_creds.save("aws-prod")
GCP:
from prefect_gcp import GcpCredentials
gcp_creds = GcpCredentials(
service_account_file="/path/to/service-account.json",
)
gcp_creds.save("gcp-prod")
Secret Values
For sensitive values that shouldn't be in code:
from prefect.blocks.system import Secret
# Create via UI or API
api_key = Secret(value="your-secret-api-key")
api_key.save("api-key")
# Use in flow
@task
def call_api():
api_key = Secret.load("api-key")
return requests.get(url, headers={"Authorization": api_key.get()})
Observability and Alerting
Flow Run Logs
All flow/task logs are automatically captured:
from prefect import flow, task, get_run_logger
@task
def my_task():
logger = get_run_logger()
logger.info("Processing started")
logger.warning("Something to watch")
logger.error("Something went wrong")
View logs in Prefect Cloud UI or via CLI:
prefect flow-run logs <flow-run-id>
Custom Metrics
Emit custom metrics with artifacts:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
@task
def process_data():
results = {"processed": 1000, "errors": 5}
create_markdown_artifact(
key="processing-summary",
markdown=f"""
# Processing Summary
- Processed: {results['processed']}
- Errors: {results['errors']}
- Error rate: {results['errors']/results['processed']:.2%}
""",
)
return results
Dashboard Integration
Prefect Cloud provides built-in dashboards. For external tools:
# Send metrics to Datadog, Prometheus, etc.
from prefect import flow
import datadog
@flow
def my_flow():
result = do_work()
# Report to Datadog
datadog.statsd.gauge("prefect.flow.items_processed", result["count"])
datadog.statsd.increment("prefect.flow.runs", tags=["status:success"])
Deployment Configuration
prefect.yaml
Full deployment configuration:
name: my-migrated-flows
deployments:
- name: etl-daily
entrypoint: "flows/etl.py:etl_flow"
work_pool:
name: "production-pool"
schedules:
- cron: "0 2 * * *"
timezone: "UTC"
parameters:
batch_size: 1000
tags: ["production", "etl"]
- name: ml-training
entrypoint: "flows/ml.py:training_flow"
work_pool:
name: "gpu-pool"
schedules:
- cron: "0 0 * * 0" # Weekly
timezone: "UTC"
tags: ["production", "ml"]
Deploy All
# Deploy all deployments
prefect deploy --all
# Deploy specific deployment
prefect deploy --name "etl-daily"
Best Practices
- Use work pools — Match pool type to execution environment
- Configure automations — Don't rely on polling for alerts
- Use Blocks for secrets — Never hardcode credentials
- Set concurrency limits — Protect shared resources
- Tag deployments — Enable filtering and organization
- Monitor with artifacts — Create visibility into flow execution
- Use prefect.yaml — Version control your deployment config