Python Airflow: DAG Authoring and Pipeline Orchestration

Apache Airflow is the industry-standard platform for authoring, scheduling, and monitoring data pipelines. A DAG (Directed Acyclic Graph) defines the tasks and their dependencies in Python. Airflow handles scheduling, retries, backfill, monitoring, and alerting. The modern TaskFlow API eliminates much of the boilerplate, making DAGs look like ordinary Python functions with decorators. This guide covers DAG patterns, sensors, XComs, dynamic tasks, and production deployment.

Setup and Core Concepts

pip install "apache-airflow[postgres,redis,celery]"

# Initialize metadata DB
airflow db init

# Create admin user
airflow users create --username admin --password admin \
  --firstname Admin --lastname User --role Admin \
  --email admin@techoral.com

# Start scheduler and webserver (development)
airflow scheduler &
airflow webserver --port 8080
Airflow concepts: A DAG is a Python file defining tasks and their execution order. The Scheduler triggers DAG runs based on schedules. Workers (via Celery or Kubernetes) execute the tasks. The Webserver provides the UI. The Metadata DB (PostgreSQL in production) stores DAG/task state.

TaskFlow API (Modern)

The TaskFlow API (Airflow 2.0+) uses Python decorators to define tasks and their dependencies implicitly. Data returned from one task is automatically passed to downstream tasks via XCom — no boilerplate needed.

# dags/etl_pipeline.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["data-alerts@techoral.com"],
}

@dag(
    dag_id="daily_etl_pipeline",
    schedule="0 6 * * *",      # Daily at 6am UTC
    start_date=days_ago(1),
    catchup=False,
    default_args=default_args,
    tags=["etl", "daily"],
    doc_md="""
    Daily ETL pipeline: extract from CRM → transform → load to warehouse.
    Runs at 6am UTC. On failure alerts data-alerts@techoral.com.
    """,
)
def daily_etl():

    @task()
    def extract_from_crm(**context) -> dict:
        """Pull yesterday's records from CRM API."""
        execution_date = context["ds"]  # YYYY-MM-DD string
        # records = crm_api.get_events(date=execution_date)
        records = [{"id": 1, "event": "purchase", "amount": 99.0}]
        return {"date": execution_date, "count": len(records), "records": records}

    @task()
    def validate(raw: dict) -> dict:
        """Validate extracted data."""
        records = raw["records"]
        errors = [r for r in records if r.get("amount", 0) <= 0]
        if errors:
            raise ValueError(f"Found {len(errors)} invalid records")
        return raw

    @task()
    def transform(validated: dict) -> list[dict]:
        """Clean and enrich records."""
        return [
            {
                **rec,
                "amount_usd": rec["amount"],
                "processed_at": datetime.utcnow().isoformat(),
            }
            for rec in validated["records"]
        ]

    @task()
    def load_to_warehouse(transformed: list[dict], **context) -> str:
        """Upsert records into the data warehouse."""
        # warehouse_client.upsert("events", transformed)
        print(f"Loaded {len(transformed)} records for {context['ds']}")
        return f"Loaded {len(transformed)} records"

    @task()
    def notify(load_result: str):
        """Send success notification."""
        print(f"Pipeline complete: {load_result}")

    # Dependencies defined by data flow — Airflow infers the graph
    raw = extract_from_crm()
    valid = validate(raw)
    transformed = transform(valid)
    result = load_to_warehouse(transformed)
    notify(result)

dag_instance = daily_etl()

Classic DAG with Operators

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

def extract():
    print("Extracting data...")
    return {"rows": 1000}

def transform(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract_task")
    print(f"Transforming {data['rows']} rows")

with DAG(
    dag_id="classic_pipeline",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:

    extract_task = PythonOperator(
        task_id="extract_task",
        python_callable=extract,
    )

    transform_task = PythonOperator(
        task_id="transform_task",
        python_callable=transform,
    )

    load_task = PostgresOperator(
        task_id="load_task",
        postgres_conn_id="warehouse_prod",
        sql="""
            INSERT INTO daily_stats (date, row_count)
            VALUES ('{{ ds }}', {{ ti.xcom_pull(task_ids='extract_task')['rows'] }})
            ON CONFLICT (date) DO UPDATE SET row_count = EXCLUDED.row_count
        """,
    )

    cleanup_task = BashOperator(
        task_id="cleanup_task",
        bash_command="rm -f /tmp/staging_{{ ds }}.csv",
    )

    # Set dependencies with >> operator
    extract_task >> transform_task >> load_task >> cleanup_task

Sensors: Wait for External Events

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.http_sensor import HttpSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(schedule="@hourly", start_date=days_ago(1), catchup=False)
def file_arrival_pipeline():

    # Wait for a file to appear (polls every 30 seconds, times out after 1 hour)
    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/data/incoming/{{ ds }}/report.csv",
        poke_interval=30,
        timeout=3600,
        mode="reschedule",  # release worker slot while waiting
    )

    # Wait for an S3 object
    wait_for_s3 = S3KeySensor(
        task_id="wait_for_s3",
        bucket_name="techoral-data",
        bucket_key="reports/{{ ds }}/data.parquet",
        aws_conn_id="aws_prod",
        poke_interval=60,
        timeout=7200,
        mode="reschedule",
    )

    # Wait for an API to return 200
    wait_for_api = HttpSensor(
        task_id="wait_for_api",
        http_conn_id="crm_api",
        endpoint="/api/v1/exports/{{ ds }}/status",
        response_check=lambda response: response.json()["status"] == "ready",
        poke_interval=60,
        timeout=3600,
        mode="reschedule",
    )

    @task()
    def process():
        print("Processing after file arrival")

    wait_for_file >> process()

dag_instance = file_arrival_pipeline()

XComs: Sharing Data Between Tasks

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(schedule="@daily", start_date=days_ago(1), catchup=False)
def xcom_demo():

    @task()
    def get_config() -> dict:
        return {"batch_size": 1000, "source": "crm", "target": "warehouse"}

    @task()
    def extract(config: dict) -> list:
        print(f"Extracting with batch_size={config['batch_size']}")
        return list(range(config["batch_size"]))  # simulated records

    @task()
    def report(records: list, config: dict):
        print(f"Processed {len(records)} records from {config['source']}")

    # XComs flow implicitly between tasks
    config = get_config()
    records = extract(config)
    report(records, config)

    # Manual XCom push/pull (classic style)
    from airflow.operators.python import PythonOperator

    def push_value(**context):
        context["ti"].xcom_push(key="my_key", value={"important": "data"})

    def pull_value(**context):
        val = context["ti"].xcom_pull(task_ids="push_task", key="my_key")
        print(f"Got: {val}")

dag_instance = xcom_demo()

Dynamic Task Mapping

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(schedule="@weekly", start_date=days_ago(1), catchup=False)
def dynamic_report_pipeline():

    @task()
    def get_regions() -> list[str]:
        return ["North", "South", "East", "West", "Central"]

    @task()
    def process_region(region: str) -> dict:
        """This runs once per region — in parallel."""
        print(f"Processing region: {region}")
        # load data, run analysis, generate report
        return {"region": region, "records": 1000, "status": "ok"}

    @task()
    def aggregate(results: list[dict]) -> None:
        total = sum(r["records"] for r in results)
        print(f"All regions processed. Total records: {total}")

    regions = get_regions()
    # .expand() creates one task instance per element — runs in parallel
    results = process_region.expand(region=regions)
    aggregate(results)

dag_instance = dynamic_report_pipeline()

Hooks and Connections

# Configure connections in Airflow UI or via CLI:
# airflow connections add warehouse_prod \
#   --conn-type postgres \
#   --conn-host db.techoral.com \
#   --conn-login etl_user \
#   --conn-password secret \
#   --conn-port 5432 \
#   --conn-schema analytics

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook

@task()
def load_to_postgres(records: list[dict]):
    hook = PostgresHook(postgres_conn_id="warehouse_prod")
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.executemany(
        "INSERT INTO events (id, type, amount) VALUES (%s, %s, %s)",
        [(r["id"], r["event"], r["amount"]) for r in records],
    )
    conn.commit()

@task()
def upload_to_s3(local_path: str):
    hook = S3Hook(aws_conn_id="aws_prod")
    hook.load_file(
        filename=local_path,
        key=f"reports/{{ ds }}/report.csv",
        bucket_name="techoral-data",
        replace=True,
    )

@task()
def call_api():
    hook = HttpHook(http_conn_id="crm_api", method="POST")
    response = hook.run("/api/v1/trigger", data={"action": "export"})
    return response.json()

Production Deployment

# docker-compose.yml — Airflow with Celery executor
version: "3.8"
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: airflow
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow

  redis:
    image: redis:7

  airflow-webserver:
    image: apache/airflow:2.9.0
    depends_on: [postgres, redis]
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins
    command: webserver
    ports:
      - "8080:8080"

  airflow-scheduler:
    image: apache/airflow:2.9.0
    depends_on: [postgres, redis]
    volumes:
      - ./dags:/opt/airflow/dags
    command: scheduler

  airflow-worker:
    image: apache/airflow:2.9.0
    depends_on: [postgres, redis]
    volumes:
      - ./dags:/opt/airflow/dags
    command: celery worker
# Initialize and start
docker-compose up airflow-init
docker-compose up -d

# Backfill historical runs
airflow dags backfill daily_etl_pipeline --start-date 2026-01-01 --end-date 2026-06-13

# Trigger a DAG run manually
airflow dags trigger daily_etl_pipeline --conf '{"env": "prod"}'

# Test a single task without running the full DAG
airflow tasks test daily_etl_pipeline extract_from_crm 2026-06-13

Frequently Asked Questions

Airflow vs Prefect vs Dagster — which to choose?
Airflow is the most battle-tested with the largest ecosystem and cloud managed offerings (MWAA, Cloud Composer). Prefect 2.x and Dagster are more Python-native, with better local development experience and type safety. For teams starting fresh, Prefect and Dagster require less infrastructure overhead. For teams at large companies with existing Airflow investments, Airflow remains the standard.
What is the best executor for production?
Use the CeleryExecutor with Redis as the broker for most production setups — it is battle-tested and scales horizontally by adding workers. Use KubernetesExecutor when you need task-level resource isolation or have heterogeneous resource requirements (GPU tasks alongside CPU tasks). The LocalExecutor is fine for small teams on a single machine.
How do I share large data between tasks?
XComs are stored in the metadata DB — don't use them for large datasets (>1 MB). Instead, write data to a shared store (S3, GCS, a database) and pass only the file path or table name via XCom. This is the standard pattern: tasks produce artifacts in shared storage, downstream tasks consume from there.