Python Domain-Driven Design: Aggregates and Repository Pattern

Domain-Driven Design (DDD) is an approach to software architecture that centres the codebase on the business domain rather than on database tables or HTTP verbs. The key building blocks are Entities (objects with identity), Value Objects (immutable objects defined by attributes), Aggregates (clusters of entities with a single root), Domain Events (facts that happened), and Repositories (persistence abstractions). In Python, combining dataclasses or Pydantic with SQLAlchemy and dependency injection produces clean, testable domain models.

Value Objects

Value objects have no identity — they are defined entirely by their attributes. Two Money instances with the same amount and currency are equal and interchangeable. They are immutable: changing a money value produces a new instance. Use frozen dataclasses or Pydantic models with frozen=True.

from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from typing import ClassVar


@dataclass(frozen=True)
class Money:
    amount: Decimal
    currency: str

    ZERO: ClassVar[Money]

    def __post_init__(self):
        if self.amount < Decimal("0"):
            raise ValueError(f"Money amount cannot be negative: {self.amount}")
        if self.currency not in {"USD", "EUR", "GBP", "INR"}:
            raise ValueError(f"Unsupported currency: {self.currency}")
        # Round to 2 decimal places
        object.__setattr__(self, "amount", self.amount.quantize(Decimal("0.01"), ROUND_HALF_UP))

    def add(self, other: Money) -> Money:
        if self.currency != other.currency:
            raise ValueError(f"Cannot add {self.currency} and {other.currency}")
        return Money(self.amount + other.amount, self.currency)

    def subtract(self, other: Money) -> Money:
        return Money(self.amount - other.amount, self.currency)

    def multiply(self, factor: int | float | Decimal) -> Money:
        return Money(self.amount * Decimal(str(factor)), self.currency)

    def __str__(self) -> str:
        return f"{self.currency} {self.amount:.2f}"


Money.ZERO = Money(Decimal("0"), "USD")


@dataclass(frozen=True)
class Address:
    street: str
    city: str
    state: str
    postal_code: str
    country: str

    def __post_init__(self):
        if not self.postal_code.strip():
            raise ValueError("Postal code cannot be empty")


@dataclass(frozen=True)
class EmailAddress:
    value: str

    def __post_init__(self):
        import re
        if not re.match(r"[^@]+@[^@]+\.[^@]+", self.value):
            raise ValueError(f"Invalid email: {self.value}")


# Value objects compare by value, not identity
m1 = Money(Decimal("99.99"), "USD")
m2 = Money(Decimal("99.99"), "USD")
assert m1 == m2   # True — equal by value
assert m1 is not m2  # different objects

Entities and Aggregates

An Aggregate is a cluster of domain objects (entities + value objects) treated as a single unit for persistence and consistency. The Aggregate Root is the only object through which the outside world interacts with the cluster — it enforces all invariants and business rules.

from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from decimal import Decimal
from enum import Enum
from datetime import datetime, timezone


class OrderStatus(str, Enum):
    PENDING   = "pending"
    CONFIRMED = "confirmed"
    SHIPPED   = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"


@dataclass
class OrderLine:
    """Entity within the Order aggregate."""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    product_id: str = ""
    product_name: str = ""
    quantity: int = 0
    unit_price: Money = field(default_factory=lambda: Money(Decimal("0"), "USD"))

    @property
    def total(self) -> Money:
        return self.unit_price.multiply(self.quantity)


@dataclass
class Order:
    """Aggregate Root — enforces all Order invariants."""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    customer_id: str = ""
    status: OrderStatus = OrderStatus.PENDING
    lines: list[OrderLine] = field(default_factory=list)
    shipping_address: Address | None = None
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    _events: list = field(default_factory=list, repr=False)

    @property
    def total(self) -> Money:
        if not self.lines:
            return Money(Decimal("0"), "USD")
        result = self.lines[0].total
        for line in self.lines[1:]:
            result = result.add(line.total)
        return result

    def add_line(self, product_id: str, name: str, qty: int, price: Money) -> OrderLine:
        if self.status != OrderStatus.PENDING:
            raise ValueError(f"Cannot modify order in status {self.status}")
        if qty <= 0:
            raise ValueError("Quantity must be positive")
        line = OrderLine(product_id=product_id, product_name=name, quantity=qty, unit_price=price)
        self.lines.append(line)
        self._events.append(OrderLineAdded(order_id=self.id, line_id=line.id))
        return line

    def confirm(self, shipping_address: Address) -> None:
        if self.status != OrderStatus.PENDING:
            raise ValueError(f"Cannot confirm order in status {self.status}")
        if not self.lines:
            raise ValueError("Cannot confirm empty order")
        self.shipping_address = shipping_address
        self.status = OrderStatus.CONFIRMED
        self.updated_at = datetime.now(timezone.utc)
        self._events.append(OrderConfirmed(order_id=self.id, total=str(self.total)))

    def cancel(self, reason: str = "") -> None:
        if self.status in {OrderStatus.SHIPPED, OrderStatus.DELIVERED}:
            raise ValueError(f"Cannot cancel order in status {self.status}")
        self.status = OrderStatus.CANCELLED
        self.updated_at = datetime.now(timezone.utc)
        self._events.append(OrderCancelled(order_id=self.id, reason=reason))

    def pop_events(self) -> list:
        events, self._events = self._events, []
        return events

Domain Events

from dataclasses import dataclass, field
from datetime import datetime, timezone
import uuid


@dataclass(frozen=True)
class DomainEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))


@dataclass(frozen=True)
class OrderLineAdded(DomainEvent):
    order_id: str = ""
    line_id: str = ""


@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
    order_id: str = ""
    total: str = ""


@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
    order_id: str = ""
    reason: str = ""


# Event dispatcher — publish events after transaction commits
class EventDispatcher:
    def __init__(self):
        self._handlers: dict[type, list] = {}

    def subscribe(self, event_type: type, handler):
        self._handlers.setdefault(event_type, []).append(handler)

    def dispatch(self, events: list):
        for event in events:
            for handler in self._handlers.get(type(event), []):
                handler(event)


dispatcher = EventDispatcher()
dispatcher.subscribe(OrderConfirmed, lambda e: print(f"Send confirmation email for {e.order_id}"))
dispatcher.subscribe(OrderCancelled, lambda e: print(f"Process refund for {e.order_id}"))

Repository Pattern

A repository provides a collection-like interface for persisting and retrieving aggregates. It abstracts the database away from the domain, allowing the domain model to be tested without a database.

from abc import ABC, abstractmethod
from sqlalchemy.orm import Session


class OrderRepository(ABC):
    @abstractmethod
    def get(self, order_id: str) -> Order | None: ...

    @abstractmethod
    def save(self, order: Order) -> None: ...

    @abstractmethod
    def list_by_customer(self, customer_id: str) -> list[Order]: ...


class SqlAlchemyOrderRepository(OrderRepository):
    def __init__(self, session: Session):
        self._session = session

    def get(self, order_id: str) -> Order | None:
        from app.models import OrderModel
        row = self._session.get(OrderModel, order_id)
        return self._to_domain(row) if row else None

    def save(self, order: Order) -> None:
        from app.models import OrderModel
        existing = self._session.get(OrderModel, order.id)
        if existing:
            self._update_model(existing, order)
        else:
            model = self._to_model(order)
            self._session.add(model)

    def list_by_customer(self, customer_id: str) -> list[Order]:
        from app.models import OrderModel
        rows = self._session.query(OrderModel).filter_by(customer_id=customer_id).all()
        return [self._to_domain(r) for r in rows]

    def _to_domain(self, row) -> Order:
        # Map ORM model → domain aggregate
        return Order(id=row.id, customer_id=row.customer_id, status=row.status, ...)

    def _to_model(self, order: Order):
        from app.models import OrderModel
        return OrderModel(id=order.id, customer_id=order.customer_id, status=order.status.value, ...)

    def _update_model(self, model, order: Order):
        model.status = order.status.value
        model.updated_at = order.updated_at


# In-memory repository for tests
class InMemoryOrderRepository(OrderRepository):
    def __init__(self):
        self._store: dict[str, Order] = {}

    def get(self, order_id: str) -> Order | None:
        return self._store.get(order_id)

    def save(self, order: Order) -> None:
        self._store[order.id] = order

    def list_by_customer(self, customer_id: str) -> list[Order]:
        return [o for o in self._store.values() if o.customer_id == customer_id]

Unit of Work

from sqlalchemy.orm import Session
from contextlib import contextmanager


class UnitOfWork:
    def __init__(self, session_factory):
        self._session_factory = session_factory
        self.orders: OrderRepository | None = None
        self._session: Session | None = None

    def __enter__(self):
        self._session = self._session_factory()
        self.orders = SqlAlchemyOrderRepository(self._session)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self._session.rollback()
        self._session.close()

    def commit(self, aggregate=None):
        self._session.commit()
        if aggregate:
            events = aggregate.pop_events()
            dispatcher.dispatch(events)

    def rollback(self):
        self._session.rollback()

Application Services

from decimal import Decimal


class OrderService:
    def __init__(self, uow: UnitOfWork):
        self._uow = uow

    def place_order(self, customer_id: str, items: list[dict]) -> str:
        with self._uow:
            order = Order(customer_id=customer_id)
            for item in items:
                order.add_line(
                    product_id=item["product_id"],
                    name=item["name"],
                    qty=item["quantity"],
                    price=Money(Decimal(str(item["unit_price"])), "USD"),
                )
            self._uow.orders.save(order)
            self._uow.commit(order)
            return order.id

    def confirm_order(self, order_id: str, address_data: dict) -> None:
        with self._uow:
            order = self._uow.orders.get(order_id)
            if not order:
                raise ValueError(f"Order {order_id} not found")
            address = Address(**address_data)
            order.confirm(address)
            self._uow.orders.save(order)
            self._uow.commit(order)

    def cancel_order(self, order_id: str, reason: str = "") -> None:
        with self._uow:
            order = self._uow.orders.get(order_id)
            if not order:
                raise ValueError(f"Order {order_id} not found")
            order.cancel(reason)
            self._uow.orders.save(order)
            self._uow.commit(order)

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel

app = FastAPI()


class PlaceOrderRequest(BaseModel):
    customer_id: str
    items: list[dict]


class ConfirmOrderRequest(BaseModel):
    street: str
    city: str
    state: str
    postal_code: str
    country: str


def get_order_service() -> OrderService:
    from app.database import SessionLocal
    uow = UnitOfWork(SessionLocal)
    return OrderService(uow)


@app.post("/orders", status_code=201)
def place_order(
    req: PlaceOrderRequest,
    service: OrderService = Depends(get_order_service),
):
    order_id = service.place_order(req.customer_id, req.items)
    return {"order_id": order_id}


@app.post("/orders/{order_id}/confirm")
def confirm_order(
    order_id: str,
    req: ConfirmOrderRequest,
    service: OrderService = Depends(get_order_service),
):
    try:
        service.confirm_order(order_id, req.dict())
        return {"status": "confirmed"}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

Frequently Asked Questions

Is DDD overkill for small projects?
Yes. Apply DDD selectively to the core domain — the part of the system that gives your business a competitive advantage. Use simple CRUD with SQLAlchemy for peripheral concerns (user settings, logs, configuration). The value of DDD appears when the domain has complex rules, many states, and rapid change — not for simple data management.
How do I test domain logic without a database?
This is DDD's greatest benefit — domain classes are plain Python objects with no ORM imports. Test the Order aggregate by constructing it directly: order = Order(...); order.confirm(address); assert order.status == OrderStatus.CONFIRMED. Use InMemoryOrderRepository in service tests. Only integration tests touch the database.
What is the difference between a Domain Service and an Application Service?
A Domain Service contains domain logic that doesn't naturally belong to a single aggregate (e.g., a pricing engine that combines product catalog + customer tier + promotions). An Application Service orchestrates use cases — it coordinates repositories, domain services, and external services, but contains no business logic itself.