Python Beanie ODM: Async MongoDB with Pydantic Models

Beanie is the modern async MongoDB ODM for Python — built on top of Motor (async MongoDB driver) and Pydantic v2. Document models are Pydantic models, giving you automatic validation, serialization, and type hints for all MongoDB operations. Beanie handles the schema, indexes, and query builder, while Motor handles the async I/O. This guide covers document modeling, CRUD, aggregation, relationships, and production FastAPI integration.

Installation and Initialization

pip install beanie motor
import asyncio
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient


async def init_db():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    await init_beanie(
        database=client.myapp,
        document_models=[User, Product, Order],  # register all document classes
    )


# Run initialization at startup
asyncio.run(init_db())

Document Models

Beanie documents inherit from Document which inherits from BaseModel — all Pydantic validation applies. The inner Settings class configures collection name, indexes, and time-series settings.

from beanie import Document, Indexed, Link, before_event, after_event, Insert, Update
from pydantic import Field, EmailStr, field_validator
from typing import Optional, Annotated
from datetime import datetime, timezone
import uuid


class User(Document):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    email: Annotated[str, Indexed(unique=True)]   # unique index
    name: str
    role: str = "user"
    is_active: bool = True
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    @field_validator("email")
    @classmethod
    def lowercase_email(cls, v: str) -> str:
        return v.lower()

    @before_event(Update)
    def set_updated_at(self):
        self.updated_at = datetime.now(timezone.utc)

    class Settings:
        name = "users"
        indexes = [
            [("email", 1)],           # single field index
            [("role", 1), ("is_active", 1)],  # compound index
        ]


class Product(Document):
    sku: Annotated[str, Indexed(unique=True)]
    name: str
    description: str = ""
    price: float
    category: Annotated[str, Indexed()]
    tags: list[str] = Field(default_factory=list)
    stock: int = 0
    is_available: bool = True
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    class Settings:
        name = "products"


class OrderLine(Document):
    product_id: str
    product_name: str
    quantity: int
    unit_price: float

    @property
    def total(self) -> float:
        return self.quantity * self.unit_price


class Order(Document):
    order_number: Annotated[str, Indexed(unique=True)]
    customer_id: Annotated[str, Indexed()]
    lines: list[OrderLine] = Field(default_factory=list)
    status: str = "pending"
    total: float = 0.0
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    class Settings:
        name = "orders"
        indexes = [
            [("customer_id", 1), ("status", 1)],
            [("created_at", -1)],   # descending for recent orders
        ]

CRUD Operations

from beanie import PydanticObjectId


async def crud_examples():
    # CREATE
    user = User(email="alice@example.com", name="Alice", role="admin")
    await user.insert()
    print(f"Created user: {user.id}")

    # Or: insert multiple
    users = [User(email=f"user{i}@example.com", name=f"User {i}") for i in range(10)]
    await User.insert_many(users)

    # READ by ID
    found = await User.get(user.id)

    # UPDATE — modify attributes and save
    found.name = "Alice Johnson"
    found.role = "superadmin"
    await found.save()

    # Partial update (without fetching)
    await User.find_one(User.email == "alice@example.com").update(
        {"$set": {"is_active": False}}
    )

    # UPSERT
    await User.find_one(User.email == "new@example.com").upsert(
        {"$set": {"name": "New User", "role": "user"}},
        on_insert=User(email="new@example.com", name="New User"),
    )

    # DELETE
    await found.delete()

    # Delete by filter
    await User.find(User.is_active == False).delete()

Querying and Filtering

from beanie.operators import In, GTE, LTE, RegEx


async def query_examples():
    # All users
    all_users = await User.find_all().to_list()

    # Filter with Beanie operators
    admins = await User.find(
        User.role == "admin",
        User.is_active == True,
    ).to_list()

    # Complex filter with operators
    premium = await Product.find(
        Product.price >= 100,
        Product.is_available == True,
        In(Product.category, ["electronics", "software"]),
    ).to_list()

    # Projection — return only specific fields
    emails = await User.find(
        User.is_active == True
    ).project({"email": 1, "name": 1}).to_list()

    # Sort and paginate
    recent_orders = await Order.find(
        Order.customer_id == "cust-42",
    ).sort(-Order.created_at).skip(0).limit(20).to_list()

    # Count
    active_count = await User.find(User.is_active == True).count()

    # Exists check
    exists = await User.find_one(User.email == "alice@example.com") is not None

    # Text search (requires text index)
    results = await Product.find({"$text": {"$search": "keyboard gaming"}}).to_list()

    # Full MongoDB filter dict
    filtered = await Order.find(
        {"created_at": {"$gte": "2026-01-01"}, "status": {"$in": ["shipped", "delivered"]}}
    ).to_list()

Aggregations

async def aggregation_examples():
    # Revenue by category
    pipeline = [
        {"$match": {"status": "delivered"}},
        {"$group": {
            "_id": "$customer_id",
            "order_count": {"$sum": 1},
            "total_spent": {"$sum": "$total"},
            "avg_order": {"$avg": "$total"},
        }},
        {"$sort": {"total_spent": -1}},
        {"$limit": 10},
    ]
    top_customers = await Order.aggregate(pipeline).to_list()

    # Product performance with unwind
    product_pipeline = [
        {"$unwind": "$lines"},
        {"$group": {
            "_id": "$lines.product_id",
            "product_name": {"$first": "$lines.product_name"},
            "units_sold": {"$sum": "$lines.quantity"},
            "revenue": {"$sum": {"$multiply": ["$lines.quantity", "$lines.unit_price"]}},
        }},
        {"$sort": {"revenue": -1}},
    ]
    product_stats = await Order.aggregate(product_pipeline).to_list()

    # Daily order count for last 30 days
    daily_pipeline = [
        {"$match": {"created_at": {"$gte": "2026-05-15"}}},
        {"$group": {
            "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
            "count": {"$sum": 1},
            "revenue": {"$sum": "$total"},
        }},
        {"$sort": {"_id": 1}},
    ]
    daily_stats = await Order.aggregate(daily_pipeline).to_list()
    return top_customers, product_stats, daily_stats

Relations and Linked Documents

from beanie import Link, BackLink


class Author(Document):
    name: str
    email: str

    class Settings:
        name = "authors"


class Article(Document):
    title: str
    content: str
    author: Link[Author]   # foreign reference (stored as ObjectId)
    tags: list[str] = []

    class Settings:
        name = "articles"


async def relations_example():
    author = Author(name="Avinash", email="avi@techoral.com")
    await author.insert()

    article = Article(
        title="Python DDD Guide",
        content="...",
        author=author,   # pass the object — Beanie stores the reference
    )
    await article.insert()

    # Fetch with linked document resolved
    loaded = await Article.get(article.id, fetch_links=True)
    print(loaded.author.name)   # "Avinash" — fully loaded

    # Without fetch_links, author is a Link (not resolved)
    lazy = await Article.get(article.id)
    await lazy.fetch_link(Article.author)   # resolve on demand
    print(lazy.author.name)

FastAPI Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from beanie import init_beanie
import os


@asynccontextmanager
async def lifespan(app: FastAPI):
    client = AsyncIOMotorClient(os.environ.get("MONGO_URL", "mongodb://localhost:27017"))
    await init_beanie(
        database=client[os.environ.get("MONGO_DB", "myapp")],
        document_models=[User, Product, Order],
    )
    yield
    client.close()


app = FastAPI(lifespan=lifespan)


class CreateUserRequest(BaseModel):
    email: str
    name: str


@app.post("/users", status_code=201)
async def create_user(req: CreateUserRequest):
    existing = await User.find_one(User.email == req.email)
    if existing:
        raise HTTPException(status_code=409, detail="Email already registered")
    user = User(email=req.email, name=req.name)
    await user.insert()
    return {"id": user.id, "email": user.email}


@app.get("/users/{user_id}")
async def get_user(user_id: str):
    user = await User.get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user.dict(exclude={"updated_at"})


@app.get("/products")
async def list_products(category: str | None = None, min_price: float = 0, page: int = 1):
    query = Product.find(Product.is_available == True)
    if category:
        query = query.find(Product.category == category)
    if min_price > 0:
        query = query.find(Product.price >= min_price)
    total = await query.count()
    items = await query.sort(-Product.created_at).skip((page-1)*20).limit(20).to_list()
    return {"total": total, "page": page, "items": [p.dict() for p in items]}

Frequently Asked Questions

Beanie vs MongoEngine vs Motor — which to use?
Use Beanie for new async Python projects — Pydantic v2 integration, type hints, and native async are first-class. Use MongoEngine for sync Django projects with legacy code. Use Motor directly when you need maximum control and don't want an ODM abstraction.
How do I handle schema migrations in MongoDB?
MongoDB is schema-flexible — adding new fields with defaults requires no migration. For breaking changes (renaming or removing fields), use Beanie's lazy migration: add a revision_id field and write update scripts that migrate documents on first read. For bulk migrations, write a one-off script using aggregation with $merge.
How does Beanie handle ObjectId vs custom string IDs?
By default, Beanie uses MongoDB's ObjectId as PydanticObjectId for the id field. To use custom string IDs, define id: str = Field(default_factory=lambda: str(uuid.uuid4())) in your document class — Beanie uses whatever you define as id.