Python Beanie ODM: Async MongoDB with Pydantic Models
Beanie is the modern async MongoDB ODM for Python — built on top of Motor (async MongoDB driver) and Pydantic v2. Document models are Pydantic models, giving you automatic validation, serialization, and type hints for all MongoDB operations. Beanie handles the schema, indexes, and query builder, while Motor handles the async I/O. This guide covers document modeling, CRUD, aggregation, relationships, and production FastAPI integration.
Table of Contents
Installation and Initialization
pip install beanie motor
import asyncio
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
async def init_db():
client = AsyncIOMotorClient("mongodb://localhost:27017")
await init_beanie(
database=client.myapp,
document_models=[User, Product, Order], # register all document classes
)
# Run initialization at startup
asyncio.run(init_db())
Document Models
Beanie documents inherit from Document which inherits from BaseModel — all Pydantic validation applies. The inner Settings class configures collection name, indexes, and time-series settings.
from beanie import Document, Indexed, Link, before_event, after_event, Insert, Update
from pydantic import Field, EmailStr, field_validator
from typing import Optional, Annotated
from datetime import datetime, timezone
import uuid
class User(Document):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
email: Annotated[str, Indexed(unique=True)] # unique index
name: str
role: str = "user"
is_active: bool = True
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
@field_validator("email")
@classmethod
def lowercase_email(cls, v: str) -> str:
return v.lower()
@before_event(Update)
def set_updated_at(self):
self.updated_at = datetime.now(timezone.utc)
class Settings:
name = "users"
indexes = [
[("email", 1)], # single field index
[("role", 1), ("is_active", 1)], # compound index
]
class Product(Document):
sku: Annotated[str, Indexed(unique=True)]
name: str
description: str = ""
price: float
category: Annotated[str, Indexed()]
tags: list[str] = Field(default_factory=list)
stock: int = 0
is_available: bool = True
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Settings:
name = "products"
class OrderLine(Document):
product_id: str
product_name: str
quantity: int
unit_price: float
@property
def total(self) -> float:
return self.quantity * self.unit_price
class Order(Document):
order_number: Annotated[str, Indexed(unique=True)]
customer_id: Annotated[str, Indexed()]
lines: list[OrderLine] = Field(default_factory=list)
status: str = "pending"
total: float = 0.0
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Settings:
name = "orders"
indexes = [
[("customer_id", 1), ("status", 1)],
[("created_at", -1)], # descending for recent orders
]
CRUD Operations
from beanie import PydanticObjectId
async def crud_examples():
# CREATE
user = User(email="alice@example.com", name="Alice", role="admin")
await user.insert()
print(f"Created user: {user.id}")
# Or: insert multiple
users = [User(email=f"user{i}@example.com", name=f"User {i}") for i in range(10)]
await User.insert_many(users)
# READ by ID
found = await User.get(user.id)
# UPDATE — modify attributes and save
found.name = "Alice Johnson"
found.role = "superadmin"
await found.save()
# Partial update (without fetching)
await User.find_one(User.email == "alice@example.com").update(
{"$set": {"is_active": False}}
)
# UPSERT
await User.find_one(User.email == "new@example.com").upsert(
{"$set": {"name": "New User", "role": "user"}},
on_insert=User(email="new@example.com", name="New User"),
)
# DELETE
await found.delete()
# Delete by filter
await User.find(User.is_active == False).delete()
Querying and Filtering
from beanie.operators import In, GTE, LTE, RegEx
async def query_examples():
# All users
all_users = await User.find_all().to_list()
# Filter with Beanie operators
admins = await User.find(
User.role == "admin",
User.is_active == True,
).to_list()
# Complex filter with operators
premium = await Product.find(
Product.price >= 100,
Product.is_available == True,
In(Product.category, ["electronics", "software"]),
).to_list()
# Projection — return only specific fields
emails = await User.find(
User.is_active == True
).project({"email": 1, "name": 1}).to_list()
# Sort and paginate
recent_orders = await Order.find(
Order.customer_id == "cust-42",
).sort(-Order.created_at).skip(0).limit(20).to_list()
# Count
active_count = await User.find(User.is_active == True).count()
# Exists check
exists = await User.find_one(User.email == "alice@example.com") is not None
# Text search (requires text index)
results = await Product.find({"$text": {"$search": "keyboard gaming"}}).to_list()
# Full MongoDB filter dict
filtered = await Order.find(
{"created_at": {"$gte": "2026-01-01"}, "status": {"$in": ["shipped", "delivered"]}}
).to_list()
Aggregations
async def aggregation_examples():
# Revenue by category
pipeline = [
{"$match": {"status": "delivered"}},
{"$group": {
"_id": "$customer_id",
"order_count": {"$sum": 1},
"total_spent": {"$sum": "$total"},
"avg_order": {"$avg": "$total"},
}},
{"$sort": {"total_spent": -1}},
{"$limit": 10},
]
top_customers = await Order.aggregate(pipeline).to_list()
# Product performance with unwind
product_pipeline = [
{"$unwind": "$lines"},
{"$group": {
"_id": "$lines.product_id",
"product_name": {"$first": "$lines.product_name"},
"units_sold": {"$sum": "$lines.quantity"},
"revenue": {"$sum": {"$multiply": ["$lines.quantity", "$lines.unit_price"]}},
}},
{"$sort": {"revenue": -1}},
]
product_stats = await Order.aggregate(product_pipeline).to_list()
# Daily order count for last 30 days
daily_pipeline = [
{"$match": {"created_at": {"$gte": "2026-05-15"}}},
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
"count": {"$sum": 1},
"revenue": {"$sum": "$total"},
}},
{"$sort": {"_id": 1}},
]
daily_stats = await Order.aggregate(daily_pipeline).to_list()
return top_customers, product_stats, daily_stats
Relations and Linked Documents
from beanie import Link, BackLink
class Author(Document):
name: str
email: str
class Settings:
name = "authors"
class Article(Document):
title: str
content: str
author: Link[Author] # foreign reference (stored as ObjectId)
tags: list[str] = []
class Settings:
name = "articles"
async def relations_example():
author = Author(name="Avinash", email="avi@techoral.com")
await author.insert()
article = Article(
title="Python DDD Guide",
content="...",
author=author, # pass the object — Beanie stores the reference
)
await article.insert()
# Fetch with linked document resolved
loaded = await Article.get(article.id, fetch_links=True)
print(loaded.author.name) # "Avinash" — fully loaded
# Without fetch_links, author is a Link (not resolved)
lazy = await Article.get(article.id)
await lazy.fetch_link(Article.author) # resolve on demand
print(lazy.author.name)
FastAPI Integration
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from beanie import init_beanie
import os
@asynccontextmanager
async def lifespan(app: FastAPI):
client = AsyncIOMotorClient(os.environ.get("MONGO_URL", "mongodb://localhost:27017"))
await init_beanie(
database=client[os.environ.get("MONGO_DB", "myapp")],
document_models=[User, Product, Order],
)
yield
client.close()
app = FastAPI(lifespan=lifespan)
class CreateUserRequest(BaseModel):
email: str
name: str
@app.post("/users", status_code=201)
async def create_user(req: CreateUserRequest):
existing = await User.find_one(User.email == req.email)
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
user = User(email=req.email, name=req.name)
await user.insert()
return {"id": user.id, "email": user.email}
@app.get("/users/{user_id}")
async def get_user(user_id: str):
user = await User.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user.dict(exclude={"updated_at"})
@app.get("/products")
async def list_products(category: str | None = None, min_price: float = 0, page: int = 1):
query = Product.find(Product.is_available == True)
if category:
query = query.find(Product.category == category)
if min_price > 0:
query = query.find(Product.price >= min_price)
total = await query.count()
items = await query.sort(-Product.created_at).skip((page-1)*20).limit(20).to_list()
return {"total": total, "page": page, "items": [p.dict() for p in items]}
Frequently Asked Questions
- Beanie vs MongoEngine vs Motor — which to use?
- Use Beanie for new async Python projects — Pydantic v2 integration, type hints, and native async are first-class. Use MongoEngine for sync Django projects with legacy code. Use Motor directly when you need maximum control and don't want an ODM abstraction.
- How do I handle schema migrations in MongoDB?
- MongoDB is schema-flexible — adding new fields with defaults requires no migration. For breaking changes (renaming or removing fields), use Beanie's lazy migration: add a
revision_idfield and write update scripts that migrate documents on first read. For bulk migrations, write a one-off script using aggregation with$merge. - How does Beanie handle ObjectId vs custom string IDs?
- By default, Beanie uses MongoDB's
ObjectIdasPydanticObjectIdfor theidfield. To use custom string IDs, defineid: str = Field(default_factory=lambda: str(uuid.uuid4()))in your document class — Beanie uses whatever you define asid.