Python Event-Driven Architecture: CQRS and Event Sourcing
CQRS (Command Query Responsibility Segregation) separates write operations (commands that change state) from read operations (queries that return data), allowing each side to be optimised independently. Event Sourcing stores the history of domain events as the source of truth rather than the current state, enabling full audit trails, temporal queries, and event replay. Together, they power scalable, auditable systems. This guide implements both patterns in Python with a PostgreSQL event store and Kafka for event publishing.
Table of Contents
Commands and Command Bus
A command is an intent to change state. Commands are validated, dispatched to a handler, and produce domain events as a side effect. The command bus decouples the sender from the handler.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable
from decimal import Decimal
import uuid
@dataclass(frozen=True)
class Command:
command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
@dataclass(frozen=True)
class PlaceOrder(Command):
customer_id: str = ""
items: list[dict] = field(default_factory=list)
shipping_address: dict = field(default_factory=dict)
@dataclass(frozen=True)
class CancelOrder(Command):
order_id: str = ""
reason: str = ""
@dataclass(frozen=True)
class ShipOrder(Command):
order_id: str = ""
tracking_number: str = ""
class CommandBus:
def __init__(self):
self._handlers: dict[type, Callable] = {}
def register(self, command_type: type, handler: Callable):
self._handlers[command_type] = handler
def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler registered for {type(command).__name__}")
return handler(command)
# Command handlers
class OrderCommandHandlers:
def __init__(self, event_store: "EventStore", publisher: "EventPublisher"):
self._store = event_store
self._publisher = publisher
def handle_place_order(self, cmd: PlaceOrder) -> str:
order_id = str(uuid.uuid4())
events = [
OrderPlaced(
order_id=order_id,
customer_id=cmd.customer_id,
items=cmd.items,
shipping_address=cmd.shipping_address,
)
]
self._store.append(order_id, events, expected_version=0)
self._publisher.publish(events)
return order_id
def handle_cancel_order(self, cmd: CancelOrder) -> None:
history = self._store.load(cmd.order_id)
order = Order.replay(history)
if order.status == "cancelled":
raise ValueError("Order already cancelled")
events = [OrderCancelled(order_id=cmd.order_id, reason=cmd.reason)]
self._store.append(cmd.order_id, events, expected_version=len(history))
self._publisher.publish(events)
Queries and Query Bus
from dataclasses import dataclass
@dataclass(frozen=True)
class Query:
pass
@dataclass(frozen=True)
class GetOrder(Query):
order_id: str
@dataclass(frozen=True)
class ListCustomerOrders(Query):
customer_id: str
status: str | None = None
page: int = 1
size: int = 20
class QueryBus:
def __init__(self):
self._handlers: dict[type, Callable] = {}
def register(self, query_type: type, handler: Callable):
self._handlers[query_type] = handler
def ask(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return handler(query)
# Query handlers read from the optimised read model (not the event store)
class OrderQueryHandlers:
def __init__(self, read_db):
self._db = read_db
def get_order(self, query: GetOrder) -> dict | None:
return self._db.fetchone(
"SELECT * FROM orders_view WHERE order_id = %s", [query.order_id]
)
def list_orders(self, query: ListCustomerOrders) -> list[dict]:
sql = "SELECT * FROM orders_view WHERE customer_id = %s"
params = [query.customer_id]
if query.status:
sql += " AND status = %s"
params.append(query.status)
sql += " LIMIT %s OFFSET %s"
params += [query.size, (query.page - 1) * query.size]
return self._db.fetchall(sql, params)
Event Store
The event store is an append-only log of domain events indexed by aggregate ID and version. Optimistic concurrency control via expected_version prevents concurrent writes from corrupting the event stream.
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
@dataclass(frozen=True)
class DomainEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str = ""
customer_id: str = ""
items: list = field(default_factory=list)
shipping_address: dict = field(default_factory=dict)
@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
order_id: str = ""
reason: str = ""
class EventStore:
"""PostgreSQL-backed append-only event store."""
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
version INT NOT NULL,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_event_store_stream ON event_store(stream_id, version);
"""
EVENT_REGISTRY = {
"OrderPlaced": OrderPlaced,
"OrderCancelled": OrderCancelled,
}
def __init__(self, conn):
self._conn = conn
def append(self, stream_id: str, events: list[DomainEvent], expected_version: int):
with self._conn.cursor() as cur:
for i, event in enumerate(events):
version = expected_version + i + 1
try:
cur.execute(
"INSERT INTO event_store (stream_id, version, event_type, data) VALUES (%s, %s, %s, %s)",
[stream_id, version, type(event).__name__, json.dumps(asdict(event))],
)
except Exception as e:
if "unique" in str(e).lower():
raise ConcurrencyError(f"Version conflict on stream {stream_id} v{version}")
raise
self._conn.commit()
def load(self, stream_id: str, from_version: int = 0) -> list[DomainEvent]:
with self._conn.cursor() as cur:
cur.execute(
"SELECT event_type, data FROM event_store WHERE stream_id = %s AND version > %s ORDER BY version",
[stream_id, from_version],
)
events = []
for event_type, data in cur.fetchall():
cls = self.EVENT_REGISTRY[event_type]
events.append(cls(**data))
return events
class ConcurrencyError(Exception):
pass
Projections and Read Models
Projections consume the event stream and build denormalised read models optimised for queries. Each projection subscribes to specific event types and updates its own read table.
class OrderProjection:
"""Builds orders_view table from the event stream."""
def __init__(self, write_db):
self._db = write_db
def handle(self, event: DomainEvent):
if isinstance(event, OrderPlaced):
self._db.execute(
"""INSERT INTO orders_view (order_id, customer_id, status, total, item_count, created_at)
VALUES (%s, %s, 'pending', %s, %s, %s)
ON CONFLICT (order_id) DO NOTHING""",
[event.order_id, event.customer_id,
sum(i["price"] * i["qty"] for i in event.items),
len(event.items), event.occurred_at],
)
elif isinstance(event, OrderCancelled):
self._db.execute(
"UPDATE orders_view SET status = 'cancelled', cancelled_at = %s WHERE order_id = %s",
[event.occurred_at, event.order_id],
)
def rebuild_from_scratch(self, event_store: EventStore, stream_ids: list[str]):
"""Replay all events to rebuild the read model."""
self._db.execute("TRUNCATE TABLE orders_view")
for stream_id in stream_ids:
for event in event_store.load(stream_id):
self.handle(event)
Transactional Outbox Pattern
The outbox pattern solves the dual-write problem: you need to persist events to the event store AND publish them to Kafka atomically. Write both to Postgres in the same transaction, then a background relay process reads the outbox and publishes to Kafka.
class TransactionalOutbox:
"""Stores events in Postgres outbox table, relay publishes to Kafka."""
def save_with_outbox(self, stream_id: str, events: list, conn):
with conn.cursor() as cur:
# Write event store entries
for i, event in enumerate(events):
cur.execute(
"INSERT INTO event_store (stream_id, version, event_type, data) VALUES (%s, %s, %s, %s)",
[stream_id, i+1, type(event).__name__, json.dumps(asdict(event))],
)
# Write outbox entries (same transaction)
for event in events:
cur.execute(
"INSERT INTO outbox (event_type, payload, published) VALUES (%s, %s, FALSE)",
[type(event).__name__, json.dumps(asdict(event))],
)
conn.commit() # atomic — both succeed or both fail
class OutboxRelay:
"""Polls outbox table and publishes to Kafka, marks as published."""
def __init__(self, conn, kafka_producer):
self._conn = conn
self._producer = kafka_producer
def relay_batch(self, batch_size: int = 100):
with self._conn.cursor() as cur:
cur.execute(
"SELECT id, event_type, payload FROM outbox WHERE published = FALSE ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED",
[batch_size],
)
rows = cur.fetchall()
for row_id, event_type, payload in rows:
self._producer.produce("domain.events", value=json.dumps({"type": event_type, **payload}).encode())
cur.execute("UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = %s", [row_id])
self._producer.flush()
self._conn.commit()
Saga Orchestration
from enum import Enum
class OrderSagaState(str, Enum):
STARTED = "started"
PAYMENT_SENT = "payment_sent"
PAYMENT_OK = "payment_ok"
FULFILLED = "fulfilled"
COMPENSATING = "compensating"
FAILED = "failed"
class OrderFulfillmentSaga:
"""Orchestrates: reserve inventory → charge payment → ship order."""
def __init__(self, saga_id: str, order_id: str):
self.saga_id = saga_id
self.order_id = order_id
self.state = OrderSagaState.STARTED
self._commands_to_dispatch: list = []
def start(self, order_total: float):
self._commands_to_dispatch.append(
{"service": "inventory", "command": "reserve", "order_id": self.order_id}
)
return self
def on_inventory_reserved(self):
self.state = OrderSagaState.PAYMENT_SENT
self._commands_to_dispatch.append(
{"service": "payment", "command": "charge", "order_id": self.order_id}
)
def on_payment_succeeded(self):
self.state = OrderSagaState.PAYMENT_OK
self._commands_to_dispatch.append(
{"service": "fulfillment", "command": "ship", "order_id": self.order_id}
)
def on_payment_failed(self):
self.state = OrderSagaState.COMPENSATING
self._commands_to_dispatch.append(
{"service": "inventory", "command": "release", "order_id": self.order_id}
)
def on_shipped(self):
self.state = OrderSagaState.FULFILLED
def pop_commands(self) -> list:
cmds, self._commands_to_dispatch = self._commands_to_dispatch, []
return cmds
FastAPI CQRS Integration
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
app = FastAPI()
command_bus = CommandBus()
query_bus = QueryBus()
class PlaceOrderRequest(BaseModel):
customer_id: str
items: list[dict]
shipping_address: dict
@app.post("/orders", status_code=201)
async def place_order(req: PlaceOrderRequest):
try:
order_id = command_bus.dispatch(
PlaceOrder(
customer_id=req.customer_id,
items=req.items,
shipping_address=req.shipping_address,
)
)
return {"order_id": order_id}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
result = query_bus.ask(GetOrder(order_id=order_id))
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result
@app.delete("/orders/{order_id}")
async def cancel_order(order_id: str, reason: str = ""):
try:
command_bus.dispatch(CancelOrder(order_id=order_id, reason=reason))
return {"status": "cancelled"}
except (ValueError, ConcurrencyError) as e:
raise HTTPException(status_code=400, detail=str(e))
Frequently Asked Questions
- When should I use CQRS?
- CQRS pays off when read and write workloads have very different shapes — e.g., complex writes with many business rules but simple reads, or simple writes but complex joined read models. It adds complexity (two models, eventual consistency), so avoid it for simple CRUD apps. Start with standard layered architecture and introduce CQRS only where you feel the pain.
- What are the downsides of Event Sourcing?
- Event Sourcing is complex: event schema evolution is hard (old events must still be deserializable), rebuilding large streams is slow, querying event history is non-trivial, and debugging requires replaying events. Use it when you genuinely need audit trails, temporal queries, or the ability to replay state — not as a default architecture choice.
- How do I handle event schema changes?
- Use upcasters — functions that transform old event versions into the current schema on load. Maintain a version field in each event and register an upcaster for each version change. Never mutate stored events — the immutable log is the source of truth.