Python Airflow: DAG Authoring and Pipeline Orchestration
Apache Airflow is the industry-standard platform for authoring, scheduling, and monitoring data pipelines. A DAG (Directed Acyclic Graph) defines the tasks and their dependencies in Python. Airflow handles scheduling, retries, backfill, monitoring, and alerting. The modern TaskFlow API eliminates much of the boilerplate, making DAGs look like ordinary Python functions with decorators. This guide covers DAG patterns, sensors, XComs, dynamic tasks, and production deployment.
Table of Contents
Setup and Core Concepts
pip install "apache-airflow[postgres,redis,celery]"
# Initialize metadata DB
airflow db init
# Create admin user
airflow users create --username admin --password admin \
--firstname Admin --lastname User --role Admin \
--email admin@techoral.com
# Start scheduler and webserver (development)
airflow scheduler &
airflow webserver --port 8080
Airflow concepts: A DAG is a Python file defining tasks and their execution order. The Scheduler triggers DAG runs based on schedules. Workers (via Celery or Kubernetes) execute the tasks. The Webserver provides the UI. The Metadata DB (PostgreSQL in production) stores DAG/task state.
TaskFlow API (Modern)
The TaskFlow API (Airflow 2.0+) uses Python decorators to define tasks and their dependencies implicitly. Data returned from one task is automatically passed to downstream tasks via XCom — no boilerplate needed.
# dags/etl_pipeline.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pandas as pd
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@techoral.com"],
}
@dag(
dag_id="daily_etl_pipeline",
schedule="0 6 * * *", # Daily at 6am UTC
start_date=days_ago(1),
catchup=False,
default_args=default_args,
tags=["etl", "daily"],
doc_md="""
Daily ETL pipeline: extract from CRM → transform → load to warehouse.
Runs at 6am UTC. On failure alerts data-alerts@techoral.com.
""",
)
def daily_etl():
@task()
def extract_from_crm(**context) -> dict:
"""Pull yesterday's records from CRM API."""
execution_date = context["ds"] # YYYY-MM-DD string
# records = crm_api.get_events(date=execution_date)
records = [{"id": 1, "event": "purchase", "amount": 99.0}]
return {"date": execution_date, "count": len(records), "records": records}
@task()
def validate(raw: dict) -> dict:
"""Validate extracted data."""
records = raw["records"]
errors = [r for r in records if r.get("amount", 0) <= 0]
if errors:
raise ValueError(f"Found {len(errors)} invalid records")
return raw
@task()
def transform(validated: dict) -> list[dict]:
"""Clean and enrich records."""
return [
{
**rec,
"amount_usd": rec["amount"],
"processed_at": datetime.utcnow().isoformat(),
}
for rec in validated["records"]
]
@task()
def load_to_warehouse(transformed: list[dict], **context) -> str:
"""Upsert records into the data warehouse."""
# warehouse_client.upsert("events", transformed)
print(f"Loaded {len(transformed)} records for {context['ds']}")
return f"Loaded {len(transformed)} records"
@task()
def notify(load_result: str):
"""Send success notification."""
print(f"Pipeline complete: {load_result}")
# Dependencies defined by data flow — Airflow infers the graph
raw = extract_from_crm()
valid = validate(raw)
transformed = transform(valid)
result = load_to_warehouse(transformed)
notify(result)
dag_instance = daily_etl()
Classic DAG with Operators
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
def extract():
print("Extracting data...")
return {"rows": 1000}
def transform(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract_task")
print(f"Transforming {data['rows']} rows")
with DAG(
dag_id="classic_pipeline",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:
extract_task = PythonOperator(
task_id="extract_task",
python_callable=extract,
)
transform_task = PythonOperator(
task_id="transform_task",
python_callable=transform,
)
load_task = PostgresOperator(
task_id="load_task",
postgres_conn_id="warehouse_prod",
sql="""
INSERT INTO daily_stats (date, row_count)
VALUES ('{{ ds }}', {{ ti.xcom_pull(task_ids='extract_task')['rows'] }})
ON CONFLICT (date) DO UPDATE SET row_count = EXCLUDED.row_count
""",
)
cleanup_task = BashOperator(
task_id="cleanup_task",
bash_command="rm -f /tmp/staging_{{ ds }}.csv",
)
# Set dependencies with >> operator
extract_task >> transform_task >> load_task >> cleanup_task
Sensors: Wait for External Events
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.http_sensor import HttpSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(schedule="@hourly", start_date=days_ago(1), catchup=False)
def file_arrival_pipeline():
# Wait for a file to appear (polls every 30 seconds, times out after 1 hour)
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/data/incoming/{{ ds }}/report.csv",
poke_interval=30,
timeout=3600,
mode="reschedule", # release worker slot while waiting
)
# Wait for an S3 object
wait_for_s3 = S3KeySensor(
task_id="wait_for_s3",
bucket_name="techoral-data",
bucket_key="reports/{{ ds }}/data.parquet",
aws_conn_id="aws_prod",
poke_interval=60,
timeout=7200,
mode="reschedule",
)
# Wait for an API to return 200
wait_for_api = HttpSensor(
task_id="wait_for_api",
http_conn_id="crm_api",
endpoint="/api/v1/exports/{{ ds }}/status",
response_check=lambda response: response.json()["status"] == "ready",
poke_interval=60,
timeout=3600,
mode="reschedule",
)
@task()
def process():
print("Processing after file arrival")
wait_for_file >> process()
dag_instance = file_arrival_pipeline()
XComs: Sharing Data Between Tasks
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(schedule="@daily", start_date=days_ago(1), catchup=False)
def xcom_demo():
@task()
def get_config() -> dict:
return {"batch_size": 1000, "source": "crm", "target": "warehouse"}
@task()
def extract(config: dict) -> list:
print(f"Extracting with batch_size={config['batch_size']}")
return list(range(config["batch_size"])) # simulated records
@task()
def report(records: list, config: dict):
print(f"Processed {len(records)} records from {config['source']}")
# XComs flow implicitly between tasks
config = get_config()
records = extract(config)
report(records, config)
# Manual XCom push/pull (classic style)
from airflow.operators.python import PythonOperator
def push_value(**context):
context["ti"].xcom_push(key="my_key", value={"important": "data"})
def pull_value(**context):
val = context["ti"].xcom_pull(task_ids="push_task", key="my_key")
print(f"Got: {val}")
dag_instance = xcom_demo()
Dynamic Task Mapping
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(schedule="@weekly", start_date=days_ago(1), catchup=False)
def dynamic_report_pipeline():
@task()
def get_regions() -> list[str]:
return ["North", "South", "East", "West", "Central"]
@task()
def process_region(region: str) -> dict:
"""This runs once per region — in parallel."""
print(f"Processing region: {region}")
# load data, run analysis, generate report
return {"region": region, "records": 1000, "status": "ok"}
@task()
def aggregate(results: list[dict]) -> None:
total = sum(r["records"] for r in results)
print(f"All regions processed. Total records: {total}")
regions = get_regions()
# .expand() creates one task instance per element — runs in parallel
results = process_region.expand(region=regions)
aggregate(results)
dag_instance = dynamic_report_pipeline()
Hooks and Connections
# Configure connections in Airflow UI or via CLI:
# airflow connections add warehouse_prod \
# --conn-type postgres \
# --conn-host db.techoral.com \
# --conn-login etl_user \
# --conn-password secret \
# --conn-port 5432 \
# --conn-schema analytics
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
@task()
def load_to_postgres(records: list[dict]):
hook = PostgresHook(postgres_conn_id="warehouse_prod")
conn = hook.get_conn()
cursor = conn.cursor()
cursor.executemany(
"INSERT INTO events (id, type, amount) VALUES (%s, %s, %s)",
[(r["id"], r["event"], r["amount"]) for r in records],
)
conn.commit()
@task()
def upload_to_s3(local_path: str):
hook = S3Hook(aws_conn_id="aws_prod")
hook.load_file(
filename=local_path,
key=f"reports/{{ ds }}/report.csv",
bucket_name="techoral-data",
replace=True,
)
@task()
def call_api():
hook = HttpHook(http_conn_id="crm_api", method="POST")
response = hook.run("/api/v1/trigger", data={"action": "export"})
return response.json()
Production Deployment
# docker-compose.yml — Airflow with Celery executor
version: "3.8"
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: airflow
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
redis:
image: redis:7
airflow-webserver:
image: apache/airflow:2.9.0
depends_on: [postgres, redis]
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
command: webserver
ports:
- "8080:8080"
airflow-scheduler:
image: apache/airflow:2.9.0
depends_on: [postgres, redis]
volumes:
- ./dags:/opt/airflow/dags
command: scheduler
airflow-worker:
image: apache/airflow:2.9.0
depends_on: [postgres, redis]
volumes:
- ./dags:/opt/airflow/dags
command: celery worker
# Initialize and start
docker-compose up airflow-init
docker-compose up -d
# Backfill historical runs
airflow dags backfill daily_etl_pipeline --start-date 2026-01-01 --end-date 2026-06-13
# Trigger a DAG run manually
airflow dags trigger daily_etl_pipeline --conf '{"env": "prod"}'
# Test a single task without running the full DAG
airflow tasks test daily_etl_pipeline extract_from_crm 2026-06-13
Frequently Asked Questions
- Airflow vs Prefect vs Dagster — which to choose?
- Airflow is the most battle-tested with the largest ecosystem and cloud managed offerings (MWAA, Cloud Composer). Prefect 2.x and Dagster are more Python-native, with better local development experience and type safety. For teams starting fresh, Prefect and Dagster require less infrastructure overhead. For teams at large companies with existing Airflow investments, Airflow remains the standard.
- What is the best executor for production?
- Use the CeleryExecutor with Redis as the broker for most production setups — it is battle-tested and scales horizontally by adding workers. Use KubernetesExecutor when you need task-level resource isolation or have heterogeneous resource requirements (GPU tasks alongside CPU tasks). The LocalExecutor is fine for small teams on a single machine.
- How do I share large data between tasks?
- XComs are stored in the metadata DB — don't use them for large datasets (>1 MB). Instead, write data to a shared store (S3, GCS, a database) and pass only the file path or table name via XCom. This is the standard pattern: tasks produce artifacts in shared storage, downstream tasks consume from there.