Provider Operator Mappings

Airflow provider operators map to Prefect integrations. This guide covers common providers and their Prefect equivalents.

Integration Packages

ProviderPrefect PackageInstall
AWSprefect-awspip install prefect-aws
GCPprefect-gcppip install prefect-gcp
Azureprefect-azurepip install prefect-azure
Databricksprefect-databrickspip install prefect-databricks
Snowflakeprefect-snowflakepip install prefect-snowflake
dbtprefect-dbtpip install prefect-dbt
Slackprefect-slackpip install prefect-slack
SQL (Postgres, MySQL, etc.)prefect-sqlalchemypip install prefect-sqlalchemy

AWS Operators

S3 Operations

Airflow:

from airflow.providers.amazon.aws.operators.s3 import (
    S3CreateObjectOperator,
    S3DeleteObjectsOperator,
    S3CopyObjectOperator,
)

upload = S3CreateObjectOperator(
    task_id="upload",
    s3_bucket="my-bucket",
    s3_key="data/output.json",
    data=json.dumps(data),
    aws_conn_id="aws_default"
)

Prefect:

from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_upload, s3_download, s3_copy

@task
def upload_to_s3(data: dict, bucket: str, key: str):
    aws_creds = AwsCredentials.load("aws-prod")
    s3_upload(
        data=json.dumps(data).encode(),
        bucket=bucket,
        key=key,
        aws_credentials=aws_creds
    )

Lambda

Airflow:

from airflow.providers.amazon.aws.operators.lambda_function import (
    LambdaInvokeFunctionOperator
)

invoke = LambdaInvokeFunctionOperator(
    task_id="invoke_lambda",
    function_name="my-function",
    payload=json.dumps({"key": "value"}),
    aws_conn_id="aws_default"
)

Prefect:

from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.lambda_function import LambdaFunction

@task
def invoke_lambda(function_name: str, payload: dict):
    aws_creds = AwsCredentials.load("aws-prod")
    lambda_fn = LambdaFunction(
        function_name=function_name,
        aws_credentials=aws_creds
    )
    return lambda_fn.invoke(payload=payload)

ECS

Airflow:

from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

run_task = EcsRunTaskOperator(
    task_id="run_ecs_task",
    cluster="my-cluster",
    task_definition="my-task-def",
    aws_conn_id="aws_default"
)

Prefect:

from prefect import task
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask

@task
def run_ecs_task(cluster: str, task_definition: str):
    aws_creds = AwsCredentials.load("aws-prod")
    ecs_task = ECSTask(
        aws_credentials=aws_creds,
        task_definition=task_definition,
        cluster=cluster
    )
    return ecs_task.run()

GCP Operators

BigQuery

Airflow:

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator
)

query = BigQueryInsertJobOperator(
    task_id="run_query",
    configuration={
        "query": {
            "query": "SELECT * FROM dataset.table",
            "useLegacySql": False
        }
    },
    gcp_conn_id="google_cloud_default"
)

Prefect:

from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_query

@task
def run_bigquery(query: str):
    gcp_creds = GcpCredentials.load("gcp-prod")
    return bigquery_query(
        query=query,
        gcp_credentials=gcp_creds
    )

Cloud Storage

Airflow:

from airflow.providers.google.cloud.transfers.gcs_to_gcs import (
    GCSToGCSOperator
)

copy = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket="source-bucket",
    source_object="data/*",
    destination_bucket="dest-bucket",
    gcp_conn_id="google_cloud_default"
)

Prefect:

from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket

@task
def copy_gcs_files(source_bucket: str, dest_bucket: str, prefix: str):
    gcp_creds = GcpCredentials.load("gcp-prod")
    source = GcsBucket(bucket=source_bucket, gcp_credentials=gcp_creds)
    dest = GcsBucket(bucket=dest_bucket, gcp_credentials=gcp_creds)

    for blob in source.list_blobs(prefix=prefix):
        data = source.read_path(blob.name)
        dest.write_path(blob.name, data)

Dataproc

Airflow:

from airflow.providers.google.cloud.operators.dataproc import (
    DataprocSubmitJobOperator
)

spark_job = DataprocSubmitJobOperator(
    task_id="spark_job",
    job={
        "reference": {"project_id": "my-project"},
        "placement": {"cluster_name": "my-cluster"},
        "pyspark_job": {"main_python_file_uri": "gs://bucket/script.py"}
    },
    gcp_conn_id="google_cloud_default"
)

Prefect:

from prefect import task
from prefect_gcp import GcpCredentials
from prefect_gcp.dataproc import DataprocCluster

@task
def run_spark_job(cluster_name: str, script_uri: str):
    gcp_creds = GcpCredentials.load("gcp-prod")
    cluster = DataprocCluster(
        cluster_name=cluster_name,
        gcp_credentials=gcp_creds
    )
    return cluster.submit_job(
        job_type="pyspark",
        main_python_file_uri=script_uri
    )

Database Operators

PostgreSQL / MySQL

Airflow:

from airflow.providers.postgres.operators.postgres import PostgresOperator

query = PostgresOperator(
    task_id="run_query",
    sql="INSERT INTO table VALUES (%s, %s)",
    parameters=[value1, value2],
    postgres_conn_id="postgres_default"
)

Prefect:

from prefect import task
from prefect_sqlalchemy import SqlAlchemyConnector

@task
def run_postgres_query(query: str, params: list = None):
    connector = SqlAlchemyConnector.load("postgres-prod")
    with connector.get_connection() as conn:
        if params:
            result = conn.execute(query, params)
        else:
            result = conn.execute(query)
        conn.commit()
        return result.fetchall() if result.returns_rows else None

Snowflake

Airflow:

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

query = SnowflakeOperator(
    task_id="run_query",
    sql="SELECT * FROM table",
    snowflake_conn_id="snowflake_default"
)

Prefect:

from prefect import task
from prefect_snowflake import SnowflakeConnector

@task
def run_snowflake_query(query: str):
    connector = SnowflakeConnector.load("snowflake-prod")
    return connector.fetch_all(query)

Databricks Operators

Airflow:

from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator
)

run_job = DatabricksRunNowOperator(
    task_id="run_job",
    job_id=12345,
    databricks_conn_id="databricks_default"
)

Prefect:

from prefect import task
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_run_now

@task
def run_databricks_job(job_id: int):
    creds = DatabricksCredentials.load("databricks-prod")
    return jobs_run_now(
        job_id=job_id,
        databricks_credentials=creds
    )

dbt Operators

Airflow:

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

dbt_run = DbtCloudRunJobOperator(
    task_id="dbt_run",
    job_id=12345,
    dbt_cloud_conn_id="dbt_cloud_default"
)

Prefect:

from prefect import task
from prefect_dbt import DbtCloudCredentials
from prefect_dbt.cloud import DbtCloudJob

@task
def run_dbt_cloud_job(job_id: int):
    creds = DbtCloudCredentials.load("dbt-cloud-prod")
    job = DbtCloudJob(
        job_id=job_id,
        dbt_cloud_credentials=creds
    )
    return job.run()

Notification Operators

Slack

Airflow:

from airflow.providers.slack.operators.slack import SlackAPIPostOperator

notify = SlackAPIPostOperator(
    task_id="notify",
    channel="#alerts",
    text="Pipeline completed",
    slack_conn_id="slack_default"
)

Prefect:

from prefect import task
from prefect_slack import SlackWebhook

@task
def send_slack_notification(channel: str, message: str):
    webhook = SlackWebhook.load("slack-alerts")
    webhook.notify(body=message)

Email

Airflow:

from airflow.operators.email import EmailOperator

email = EmailOperator(
    task_id="send_email",
    to="team@example.com",
    subject="Pipeline Status",
    html_content="<p>Pipeline completed</p>"
)

Prefect:

from prefect import task
from prefect_email import EmailServerCredentials, email_send_message

@task
def send_email(to: str, subject: str, body: str):
    creds = EmailServerCredentials.load("email-prod")
    email_send_message(
        email_server_credentials=creds,
        subject=subject,
        msg=body,
        email_to=to
    )

Connection → Block Migration

Every Airflow connection becomes a Prefect block:

Airflow ConnectionPrefect Block
aws_defaultAwsCredentials
google_cloud_defaultGcpCredentials
azure_defaultAzureContainerInstanceCredentials
postgres_defaultSqlAlchemyConnector
snowflake_defaultSnowflakeConnector
databricks_defaultDatabricksCredentials
slack_defaultSlackWebhook

Create blocks via UI or code, then reference by name in your flows.