Python Domain-Driven Design: Aggregates and Repository Pattern
Domain-Driven Design (DDD) is an approach to software architecture that centres the codebase on the business domain rather than on database tables or HTTP verbs. The key building blocks are Entities (objects with identity), Value Objects (immutable objects defined by attributes), Aggregates (clusters of entities with a single root), Domain Events (facts that happened), and Repositories (persistence abstractions). In Python, combining dataclasses or Pydantic with SQLAlchemy and dependency injection produces clean, testable domain models.
Table of Contents
Value Objects
Value objects have no identity — they are defined entirely by their attributes. Two Money instances with the same amount and currency are equal and interchangeable. They are immutable: changing a money value produces a new instance. Use frozen dataclasses or Pydantic models with frozen=True.
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from typing import ClassVar
@dataclass(frozen=True)
class Money:
amount: Decimal
currency: str
ZERO: ClassVar[Money]
def __post_init__(self):
if self.amount < Decimal("0"):
raise ValueError(f"Money amount cannot be negative: {self.amount}")
if self.currency not in {"USD", "EUR", "GBP", "INR"}:
raise ValueError(f"Unsupported currency: {self.currency}")
# Round to 2 decimal places
object.__setattr__(self, "amount", self.amount.quantize(Decimal("0.01"), ROUND_HALF_UP))
def add(self, other: Money) -> Money:
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def subtract(self, other: Money) -> Money:
return Money(self.amount - other.amount, self.currency)
def multiply(self, factor: int | float | Decimal) -> Money:
return Money(self.amount * Decimal(str(factor)), self.currency)
def __str__(self) -> str:
return f"{self.currency} {self.amount:.2f}"
Money.ZERO = Money(Decimal("0"), "USD")
@dataclass(frozen=True)
class Address:
street: str
city: str
state: str
postal_code: str
country: str
def __post_init__(self):
if not self.postal_code.strip():
raise ValueError("Postal code cannot be empty")
@dataclass(frozen=True)
class EmailAddress:
value: str
def __post_init__(self):
import re
if not re.match(r"[^@]+@[^@]+\.[^@]+", self.value):
raise ValueError(f"Invalid email: {self.value}")
# Value objects compare by value, not identity
m1 = Money(Decimal("99.99"), "USD")
m2 = Money(Decimal("99.99"), "USD")
assert m1 == m2 # True — equal by value
assert m1 is not m2 # different objects
Entities and Aggregates
An Aggregate is a cluster of domain objects (entities + value objects) treated as a single unit for persistence and consistency. The Aggregate Root is the only object through which the outside world interacts with the cluster — it enforces all invariants and business rules.
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from decimal import Decimal
from enum import Enum
from datetime import datetime, timezone
class OrderStatus(str, Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class OrderLine:
"""Entity within the Order aggregate."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
product_id: str = ""
product_name: str = ""
quantity: int = 0
unit_price: Money = field(default_factory=lambda: Money(Decimal("0"), "USD"))
@property
def total(self) -> Money:
return self.unit_price.multiply(self.quantity)
@dataclass
class Order:
"""Aggregate Root — enforces all Order invariants."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
customer_id: str = ""
status: OrderStatus = OrderStatus.PENDING
lines: list[OrderLine] = field(default_factory=list)
shipping_address: Address | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
_events: list = field(default_factory=list, repr=False)
@property
def total(self) -> Money:
if not self.lines:
return Money(Decimal("0"), "USD")
result = self.lines[0].total
for line in self.lines[1:]:
result = result.add(line.total)
return result
def add_line(self, product_id: str, name: str, qty: int, price: Money) -> OrderLine:
if self.status != OrderStatus.PENDING:
raise ValueError(f"Cannot modify order in status {self.status}")
if qty <= 0:
raise ValueError("Quantity must be positive")
line = OrderLine(product_id=product_id, product_name=name, quantity=qty, unit_price=price)
self.lines.append(line)
self._events.append(OrderLineAdded(order_id=self.id, line_id=line.id))
return line
def confirm(self, shipping_address: Address) -> None:
if self.status != OrderStatus.PENDING:
raise ValueError(f"Cannot confirm order in status {self.status}")
if not self.lines:
raise ValueError("Cannot confirm empty order")
self.shipping_address = shipping_address
self.status = OrderStatus.CONFIRMED
self.updated_at = datetime.now(timezone.utc)
self._events.append(OrderConfirmed(order_id=self.id, total=str(self.total)))
def cancel(self, reason: str = "") -> None:
if self.status in {OrderStatus.SHIPPED, OrderStatus.DELIVERED}:
raise ValueError(f"Cannot cancel order in status {self.status}")
self.status = OrderStatus.CANCELLED
self.updated_at = datetime.now(timezone.utc)
self._events.append(OrderCancelled(order_id=self.id, reason=reason))
def pop_events(self) -> list:
events, self._events = self._events, []
return events
Domain Events
from dataclasses import dataclass, field
from datetime import datetime, timezone
import uuid
@dataclass(frozen=True)
class DomainEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@dataclass(frozen=True)
class OrderLineAdded(DomainEvent):
order_id: str = ""
line_id: str = ""
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
order_id: str = ""
total: str = ""
@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
order_id: str = ""
reason: str = ""
# Event dispatcher — publish events after transaction commits
class EventDispatcher:
def __init__(self):
self._handlers: dict[type, list] = {}
def subscribe(self, event_type: type, handler):
self._handlers.setdefault(event_type, []).append(handler)
def dispatch(self, events: list):
for event in events:
for handler in self._handlers.get(type(event), []):
handler(event)
dispatcher = EventDispatcher()
dispatcher.subscribe(OrderConfirmed, lambda e: print(f"Send confirmation email for {e.order_id}"))
dispatcher.subscribe(OrderCancelled, lambda e: print(f"Process refund for {e.order_id}"))
Repository Pattern
A repository provides a collection-like interface for persisting and retrieving aggregates. It abstracts the database away from the domain, allowing the domain model to be tested without a database.
from abc import ABC, abstractmethod
from sqlalchemy.orm import Session
class OrderRepository(ABC):
@abstractmethod
def get(self, order_id: str) -> Order | None: ...
@abstractmethod
def save(self, order: Order) -> None: ...
@abstractmethod
def list_by_customer(self, customer_id: str) -> list[Order]: ...
class SqlAlchemyOrderRepository(OrderRepository):
def __init__(self, session: Session):
self._session = session
def get(self, order_id: str) -> Order | None:
from app.models import OrderModel
row = self._session.get(OrderModel, order_id)
return self._to_domain(row) if row else None
def save(self, order: Order) -> None:
from app.models import OrderModel
existing = self._session.get(OrderModel, order.id)
if existing:
self._update_model(existing, order)
else:
model = self._to_model(order)
self._session.add(model)
def list_by_customer(self, customer_id: str) -> list[Order]:
from app.models import OrderModel
rows = self._session.query(OrderModel).filter_by(customer_id=customer_id).all()
return [self._to_domain(r) for r in rows]
def _to_domain(self, row) -> Order:
# Map ORM model → domain aggregate
return Order(id=row.id, customer_id=row.customer_id, status=row.status, ...)
def _to_model(self, order: Order):
from app.models import OrderModel
return OrderModel(id=order.id, customer_id=order.customer_id, status=order.status.value, ...)
def _update_model(self, model, order: Order):
model.status = order.status.value
model.updated_at = order.updated_at
# In-memory repository for tests
class InMemoryOrderRepository(OrderRepository):
def __init__(self):
self._store: dict[str, Order] = {}
def get(self, order_id: str) -> Order | None:
return self._store.get(order_id)
def save(self, order: Order) -> None:
self._store[order.id] = order
def list_by_customer(self, customer_id: str) -> list[Order]:
return [o for o in self._store.values() if o.customer_id == customer_id]
Unit of Work
from sqlalchemy.orm import Session
from contextlib import contextmanager
class UnitOfWork:
def __init__(self, session_factory):
self._session_factory = session_factory
self.orders: OrderRepository | None = None
self._session: Session | None = None
def __enter__(self):
self._session = self._session_factory()
self.orders = SqlAlchemyOrderRepository(self._session)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self._session.rollback()
self._session.close()
def commit(self, aggregate=None):
self._session.commit()
if aggregate:
events = aggregate.pop_events()
dispatcher.dispatch(events)
def rollback(self):
self._session.rollback()
Application Services
from decimal import Decimal
class OrderService:
def __init__(self, uow: UnitOfWork):
self._uow = uow
def place_order(self, customer_id: str, items: list[dict]) -> str:
with self._uow:
order = Order(customer_id=customer_id)
for item in items:
order.add_line(
product_id=item["product_id"],
name=item["name"],
qty=item["quantity"],
price=Money(Decimal(str(item["unit_price"])), "USD"),
)
self._uow.orders.save(order)
self._uow.commit(order)
return order.id
def confirm_order(self, order_id: str, address_data: dict) -> None:
with self._uow:
order = self._uow.orders.get(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
address = Address(**address_data)
order.confirm(address)
self._uow.orders.save(order)
self._uow.commit(order)
def cancel_order(self, order_id: str, reason: str = "") -> None:
with self._uow:
order = self._uow.orders.get(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order.cancel(reason)
self._uow.orders.save(order)
self._uow.commit(order)
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
app = FastAPI()
class PlaceOrderRequest(BaseModel):
customer_id: str
items: list[dict]
class ConfirmOrderRequest(BaseModel):
street: str
city: str
state: str
postal_code: str
country: str
def get_order_service() -> OrderService:
from app.database import SessionLocal
uow = UnitOfWork(SessionLocal)
return OrderService(uow)
@app.post("/orders", status_code=201)
def place_order(
req: PlaceOrderRequest,
service: OrderService = Depends(get_order_service),
):
order_id = service.place_order(req.customer_id, req.items)
return {"order_id": order_id}
@app.post("/orders/{order_id}/confirm")
def confirm_order(
order_id: str,
req: ConfirmOrderRequest,
service: OrderService = Depends(get_order_service),
):
try:
service.confirm_order(order_id, req.dict())
return {"status": "confirmed"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
Frequently Asked Questions
- Is DDD overkill for small projects?
- Yes. Apply DDD selectively to the core domain — the part of the system that gives your business a competitive advantage. Use simple CRUD with SQLAlchemy for peripheral concerns (user settings, logs, configuration). The value of DDD appears when the domain has complex rules, many states, and rapid change — not for simple data management.
- How do I test domain logic without a database?
- This is DDD's greatest benefit — domain classes are plain Python objects with no ORM imports. Test the Order aggregate by constructing it directly:
order = Order(...); order.confirm(address); assert order.status == OrderStatus.CONFIRMED. UseInMemoryOrderRepositoryin service tests. Only integration tests touch the database. - What is the difference between a Domain Service and an Application Service?
- A Domain Service contains domain logic that doesn't naturally belong to a single aggregate (e.g., a pricing engine that combines product catalog + customer tier + promotions). An Application Service orchestrates use cases — it coordinates repositories, domain services, and external services, but contains no business logic itself.