Provider Operator Mappings
Airflow provider operators map to Prefect integrations. This guide covers common providers and their Prefect equivalents.
Integration Packages
| Provider | Prefect Package | Install |
|---|---|---|
| AWS | prefect-aws | pip install prefect-aws |
| GCP | prefect-gcp | pip install prefect-gcp |
| Azure | prefect-azure | pip install prefect-azure |
| Databricks | prefect-databricks | pip install prefect-databricks |
| Snowflake | prefect-snowflake | pip install prefect-snowflake |
| dbt | prefect-dbt | pip install prefect-dbt |
| Slack | prefect-slack | pip install prefect-slack |
| SQL (Postgres, MySQL, etc.) | prefect-sqlalchemy | pip install prefect-sqlalchemy |
AWS Operators
S3 Operations
Airflow:
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator,
S3DeleteObjectsOperator,
S3CopyObjectOperator,
)
upload = S3CreateObjectOperator(
task_id="upload",
s3_bucket="my-bucket",
s3_key="data/output.json",
data=json.dumps(data),
aws_conn_id="aws_default"
)
Prefect:
from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_upload, s3_download, s3_copy
@task
def upload_to_s3(data: dict, bucket: str, key: str):
aws_creds = AwsCredentials.load("aws-prod")
s3_upload(
data=json.dumps(data).encode(),
bucket=bucket,
key=key,
aws_credentials=aws_creds
)
Lambda
Airflow:
from airflow.providers.amazon.aws.operators.lambda_function import (
LambdaInvokeFunctionOperator
)
invoke = LambdaInvokeFunctionOperator(
task_id="invoke_lambda",
function_name="my-function",
payload=json.dumps({"key": "value"}),
aws_conn_id="aws_default"
)
Prefect:
from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.lambda_function import LambdaFunction
@task
def invoke_lambda(function_name: str, payload: dict):
aws_creds = AwsCredentials.load("aws-prod")
lambda_fn = LambdaFunction(
function_name=function_name,
aws_credentials=aws_creds
)
return lambda_fn.invoke(payload=payload)
ECS
Airflow:
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
run_task = EcsRunTaskOperator(
task_id="run_ecs_task",
cluster="my-cluster",
task_definition="my-task-def",
aws_conn_id="aws_default"
)
Prefect:
from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask
@task
def run_ecs_task(cluster: str, task_definition: str):
aws_creds = AwsCredentials.load("aws-prod")
ecs_task = ECSTask(
aws_credentials=aws_creds,
task_definition=task_definition,
cluster=cluster
)
return ecs_task.run()
GCP Operators
BigQuery
Airflow:
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator
)
query = BigQueryInsertJobOperator(
task_id="run_query",
configuration={
"query": {
"query": "SELECT * FROM dataset.table",
"useLegacySql": False
}
},
gcp_conn_id="google_cloud_default"
)
Prefect:
from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_query
@task
def run_bigquery(query: str):
gcp_creds = GcpCredentials.load("gcp-prod")
return bigquery_query(
query=query,
gcp_credentials=gcp_creds
)
Cloud Storage
Airflow:
from airflow.providers.google.cloud.transfers.gcs_to_gcs import (
GCSToGCSOperator
)
copy = GCSToGCSOperator(
task_id="copy_files",
source_bucket="source-bucket",
source_object="data/*",
destination_bucket="dest-bucket",
gcp_conn_id="google_cloud_default"
)
Prefect:
from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket
@task
def copy_gcs_files(source_bucket: str, dest_bucket: str, prefix: str):
gcp_creds = GcpCredentials.load("gcp-prod")
source = GcsBucket(bucket=source_bucket, gcp_credentials=gcp_creds)
dest = GcsBucket(bucket=dest_bucket, gcp_credentials=gcp_creds)
for blob in source.list_blobs(prefix=prefix):
data = source.read_path(blob.name)
dest.write_path(blob.name, data)
Dataproc
Airflow:
from airflow.providers.google.cloud.operators.dataproc import (
DataprocSubmitJobOperator
)
spark_job = DataprocSubmitJobOperator(
task_id="spark_job",
job={
"reference": {"project_id": "my-project"},
"placement": {"cluster_name": "my-cluster"},
"pyspark_job": {"main_python_file_uri": "gs://bucket/script.py"}
},
gcp_conn_id="google_cloud_default"
)
Prefect:
from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.dataproc import DataprocCluster
@task
def run_spark_job(cluster_name: str, script_uri: str):
gcp_creds = GcpCredentials.load("gcp-prod")
cluster = DataprocCluster(
cluster_name=cluster_name,
gcp_credentials=gcp_creds
)
return cluster.submit_job(
job_type="pyspark",
main_python_file_uri=script_uri
)
Database Operators
PostgreSQL / MySQL
Airflow:
from airflow.providers.postgres.operators.postgres import PostgresOperator
query = PostgresOperator(
task_id="run_query",
sql="INSERT INTO table VALUES (%s, %s)",
parameters=[value1, value2],
postgres_conn_id="postgres_default"
)
Prefect:
from prefect import task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def run_postgres_query(query: str, params: list = None):
connector = SqlAlchemyConnector.load("postgres-prod")
with connector.get_connection() as conn:
if params:
result = conn.execute(query, params)
else:
result = conn.execute(query)
conn.commit()
return result.fetchall() if result.returns_rows else None
Snowflake
Airflow:
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
query = SnowflakeOperator(
task_id="run_query",
sql="SELECT * FROM table",
snowflake_conn_id="snowflake_default"
)
Prefect:
from prefect import task
from prefect_snowflake import SnowflakeConnector
@task
def run_snowflake_query(query: str):
connector = SnowflakeConnector.load("snowflake-prod")
return connector.fetch_all(query)
Databricks Operators
Airflow:
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator
)
run_job = DatabricksRunNowOperator(
task_id="run_job",
job_id=12345,
databricks_conn_id="databricks_default"
)
Prefect:
from prefect import task
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_run_now
@task
def run_databricks_job(job_id: int):
creds = DatabricksCredentials.load("databricks-prod")
return jobs_run_now(
job_id=job_id,
databricks_credentials=creds
)
dbt Operators
Airflow:
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
dbt_run = DbtCloudRunJobOperator(
task_id="dbt_run",
job_id=12345,
dbt_cloud_conn_id="dbt_cloud_default"
)
Prefect:
from prefect import task
from prefect_dbt import DbtCloudCredentials
from prefect_dbt.cloud import DbtCloudJob
@task
def run_dbt_cloud_job(job_id: int):
creds = DbtCloudCredentials.load("dbt-cloud-prod")
job = DbtCloudJob(
job_id=job_id,
dbt_cloud_credentials=creds
)
return job.run()
Notification Operators
Slack
Airflow:
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
notify = SlackAPIPostOperator(
task_id="notify",
channel="#alerts",
text="Pipeline completed",
slack_conn_id="slack_default"
)
Prefect:
from prefect import task
from prefect_slack import SlackWebhook
@task
def send_slack_notification(channel: str, message: str):
webhook = SlackWebhook.load("slack-alerts")
webhook.notify(body=message)
Airflow:
from airflow.operators.email import EmailOperator
email = EmailOperator(
task_id="send_email",
to="team@example.com",
subject="Pipeline Status",
html_content="<p>Pipeline completed</p>"
)
Prefect:
from prefect import task
from prefect_email import EmailServerCredentials, email_send_message
@task
def send_email(to: str, subject: str, body: str):
creds = EmailServerCredentials.load("email-prod")
email_send_message(
email_server_credentials=creds,
subject=subject,
msg=body,
email_to=to
)
Connection → Block Migration
Every Airflow connection becomes a Prefect block:
| Airflow Connection | Prefect Block |
|---|---|
aws_default | AwsCredentials |
google_cloud_default | GcpCredentials |
azure_default | AzureContainerInstanceCredentials |
postgres_default | SqlAlchemyConnector |
snowflake_default | SnowflakeConnector |
databricks_default | DatabricksCredentials |
slack_default | SlackWebhook |
Create blocks via UI or code, then reference by name in your flows.