Python Prefect: Modern Workflow Orchestration Guide

Prefect is a modern Python workflow orchestration platform that eliminates Airflow's boilerplate. Flows are just Python functions decorated with @flow, tasks with @task — no DAG class, no complex setup. Prefect handles retries, caching, parallelism, state tracking, and scheduling. The same code runs locally for development and deploys to Prefect Cloud or a self-hosted server for production scheduling. This guide covers flows, tasks, deployments, caching, concurrency, and a complete ETL pipeline example.

Installation and First Flow

pip install prefect
prefect server start   # local UI at http://localhost:4200
from prefect import flow, task, get_run_logger
import httpx


@task
def fetch_user(user_id: int) -> dict:
    logger = get_run_logger()
    logger.info(f"Fetching user {user_id}")
    response = httpx.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    response.raise_for_status()
    return response.json()


@task
def extract_email(user: dict) -> str:
    return user["email"]


@flow(name="fetch-user-email")
def user_email_flow(user_id: int) -> str:
    user = fetch_user(user_id)
    email = extract_email(user)
    return email


if __name__ == "__main__":
    result = user_email_flow(1)
    print(f"Email: {result}")
# Run it
python flow.py
# View in UI: http://localhost:4200

Tasks: Retries and Caching

from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx


@task(
    retries=3,
    retry_delay_seconds=[5, 30, 120],  # exponential-like backoff
    retry_jitter_factor=0.5,            # adds randomness to prevent thundering herd
    timeout_seconds=60,
    name="fetch-data-from-api",
    description="Fetches records from the CRM API for a given date",
)
def fetch_from_api(date: str, endpoint: str) -> list[dict]:
    logger = get_run_logger()
    logger.info(f"Fetching {endpoint} for {date}")
    response = httpx.get(f"https://crm.api.com/{endpoint}", params={"date": date}, timeout=30)
    response.raise_for_status()
    return response.json()


# Cached task — result reused if inputs haven't changed within cache_expiration
@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    retries=2,
    retry_delay_seconds=10,
)
def fetch_exchange_rates(currency_pair: str) -> float:
    logger = get_run_logger()
    logger.info(f"Fetching exchange rate for {currency_pair}")
    # This task will reuse a cached result for 1 hour if called with the same currency_pair
    response = httpx.get(f"https://forex.api.com/rate/{currency_pair}")
    return response.json()["rate"]


# Async task
@task(retries=2, retry_delay_seconds=5)
async def async_db_write(records: list[dict]) -> int:
    """Async tasks work natively in Prefect."""
    import asyncpg
    conn = await asyncpg.connect("postgresql://localhost/mydb")
    await conn.executemany(
        "INSERT INTO records (id, data) VALUES ($1, $2) ON CONFLICT DO NOTHING",
        [(r["id"], str(r)) for r in records],
    )
    await conn.close()
    return len(records)

Parallel Task Execution

from prefect import flow, task
from prefect.futures import PrefectFuture
from prefect.task_runners import ConcurrentTaskRunner, ThreadPoolTaskRunner
import httpx


@task
def process_region(region: str, date: str) -> dict:
    logger = get_run_logger()
    logger.info(f"Processing {region} for {date}")
    response = httpx.get(f"https://api.example.com/reports/{region}", params={"date": date})
    return {"region": region, "records": response.json().get("count", 0)}


@flow(
    name="parallel-regional-report",
    task_runner=ConcurrentTaskRunner(),  # runs tasks as asyncio coroutines
)
def regional_report_flow(date: str, regions: list[str]) -> dict:
    # Submit all tasks in parallel
    futures: list[PrefectFuture] = [
        process_region.submit(region=region, date=date)
        for region in regions
    ]

    # Wait for all results
    results = [f.result() for f in futures]

    total = sum(r["records"] for r in results)
    return {"date": date, "regions": results, "total_records": total}


# Run parallel flow
if __name__ == "__main__":
    report = regional_report_flow(
        date="2026-06-14",
        regions=["north", "south", "east", "west", "central"],
    )
    print(f"Total records: {report['total_records']}")

Complete ETL Pipeline

from prefect import flow, task, get_run_logger
from prefect.artifacts import create_table_artifact
import pandas as pd
import httpx


@task(retries=3, retry_delay_seconds=30, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=2))
def extract_orders(date: str) -> list[dict]:
    logger = get_run_logger()
    logger.info(f"Extracting orders for {date}")
    # response = httpx.get(f"https://api.example.com/orders", params={"date": date})
    return [
        {"id": i, "amount": i * 9.99, "category": "electronics", "date": date}
        for i in range(1, 101)
    ]


@task
def validate(records: list[dict]) -> list[dict]:
    logger = get_run_logger()
    valid = [r for r in records if r.get("amount", 0) > 0 and r.get("id")]
    invalid_count = len(records) - len(valid)
    if invalid_count:
        logger.warning(f"Dropped {invalid_count} invalid records")
    return valid


@task
def transform(records: list[dict]) -> pd.DataFrame:
    df = pd.DataFrame(records)
    df["amount_usd"] = df["amount"].round(2)
    df["month"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m")
    return df


@task
def load_to_warehouse(df: pd.DataFrame, table: str) -> int:
    logger = get_run_logger()
    logger.info(f"Loading {len(df)} rows into {table}")
    # df.to_sql(table, engine, if_exists="append", index=False)
    return len(df)


@task
def compute_summary(df: pd.DataFrame) -> dict:
    return {
        "total_records": len(df),
        "total_revenue": float(df["amount_usd"].sum()),
        "avg_order": float(df["amount_usd"].mean()),
        "by_category": df.groupby("category")["amount_usd"].sum().to_dict(),
    }


@flow(name="daily-orders-etl", log_prints=True)
def daily_orders_etl(date: str = "2026-06-14") -> dict:
    logger = get_run_logger()
    logger.info(f"Starting ETL for {date}")

    raw = extract_orders(date)
    valid = validate(raw)
    df = transform(valid)

    # Parallel: load and summarise simultaneously
    load_future = load_to_warehouse.submit(df, "orders_warehouse")
    summary_future = compute_summary.submit(df)

    rows_loaded = load_future.result()
    summary = summary_future.result()

    # Create an artifact visible in the Prefect UI
    await create_table_artifact(
        key="etl-summary",
        table=[summary],
        description=f"ETL summary for {date}",
    )

    logger.info(f"ETL complete: {rows_loaded} rows loaded")
    return summary


if __name__ == "__main__":
    from datetime import date
    result = daily_orders_etl(date=str(date.today()))
    print(result)

Deployments and Schedules

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule, IntervalSchedule
from datetime import timedelta


@flow
def my_pipeline(date: str):
    pass


# Create a deployment with a schedule
deployment = Deployment.build_from_flow(
    flow=daily_orders_etl,
    name="daily-etl-production",
    parameters={"date": "2026-06-14"},   # default — overridden at runtime
    schedule=CronSchedule(cron="0 6 * * *", timezone="Asia/Kolkata"),  # 6am IST daily
    tags=["production", "etl"],
    work_queue_name="default",
    infra_overrides={"env": {"ENVIRONMENT": "production"}},
)

if __name__ == "__main__":
    deployment.apply()
# Apply deployment
python deploy.py

# List deployments
prefect deployment ls

# Trigger a manual run
prefect deployment run daily-orders-etl/daily-etl-production --param date=2026-06-01

# Start a worker to execute flows
prefect worker start --pool default-agent-pool

Artifacts and Results

from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact
from prefect.results import ResultSerializer


@task
async def generate_report(records: list[dict]) -> str:
    summary = f"Processed {len(records)} records"

    # Create a markdown artifact visible in Prefect UI
    await create_markdown_artifact(
        key="pipeline-report",
        markdown=f"""
## Pipeline Report

| Metric | Value |
|--------|-------|
| Records | {len(records)} |
| Date | 2026-06-14 |
        """,
        description="Daily pipeline report",
    )
    return summary


@flow
def pipeline_with_artifacts():
    records = [{"id": i} for i in range(100)]
    report = generate_report(records)
    return report

Prefect Cloud Integration

# Connect to Prefect Cloud
prefect cloud login --key YOUR_API_KEY

# Or set via environment variable
export PREFECT_API_KEY="your-api-key"
export PREFECT_API_URL="https://api.prefect.cloud/api/accounts/ACCOUNT_ID/workspaces/WORKSPACE_ID"

# Deploy to cloud
python deploy.py  # same code, now runs in Prefect Cloud

# View flow runs in Prefect Cloud UI
# https://app.prefect.cloud/

Frequently Asked Questions

Prefect vs Airflow — which to choose?
Choose Prefect for Python-native teams that want fast iteration, local testing, and minimal infrastructure. Flows are plain Python — no DAG class, no YAML manifests, easier local debugging. Choose Airflow for teams with existing Airflow investment or who need its vast operator ecosystem (Spark, BigQuery, DBT integrations). Prefect's local development experience is significantly better.
How do I test Prefect flows locally?
Run flow_function() directly — Prefect flows are just Python functions. Use prefect server start locally to see the UI and task state. For unit tests, call tasks directly (they're also plain Python functions) or use with prefect.testing.utilities.prefect_test_harness() to get a fresh test environment.
How does Prefect handle task failures in the middle of a flow?
Failed tasks mark the flow run as FAILED. With retries=N on a task, Prefect automatically retries before marking it failed. If a task succeeds on retry, the flow continues. You can also set on_failure hooks on flows to send alerts. Failed flow runs can be resumed from the UI — Prefect skips already-completed tasks (using cached results).