Python MongoDB with Motor: Async Database Operations
Motor is the official async Python driver for MongoDB, built on top of PyMongo and asyncio. It enables non-blocking database operations in FastAPI, Starlette, and other async Python frameworks. This guide covers async CRUD operations, aggregation pipelines, indexing strategies, multi-document transactions, change streams for real-time events, and integrating Motor with FastAPI and Pydantic models.
Table of Contents
Setup and Connection
pip install motor pydantic[email] fastapi uvicorn
import motor.motor_asyncio
import os
from contextlib import asynccontextmanager
MONGO_URL = os.environ.get("MONGO_URL", "mongodb://localhost:27017")
DB_NAME = os.environ.get("DB_NAME", "myapp")
# Create one client per application (not per request)
client = motor.motor_asyncio.AsyncIOMotorClient(
MONGO_URL,
maxPoolSize=10,
minPoolSize=1,
serverSelectionTimeoutMS=5000,
)
db = client[DB_NAME]
# Collection references
users_col = db["users"]
posts_col = db["posts"]
orders_col = db["orders"]
async def ping_db():
await client.admin.command("ping")
print("MongoDB connected")
Async CRUD Operations
import asyncio
from bson import ObjectId
from datetime import datetime, timezone
# INSERT
async def create_user(username: str, email: str) -> dict:
doc = {
"username": username,
"email": email,
"created_at": datetime.now(timezone.utc),
"is_active": True,
"roles": ["user"],
}
result = await users_col.insert_one(doc)
doc["_id"] = str(result.inserted_id)
return doc
# READ ONE
async def get_user(user_id: str) -> dict | None:
doc = await users_col.find_one({"_id": ObjectId(user_id)})
if doc:
doc["id"] = str(doc.pop("_id"))
return doc
# READ MANY with pagination
async def list_users(page: int = 1, page_size: int = 20) -> list[dict]:
skip = (page - 1) * page_size
cursor = users_col.find({"is_active": True}).skip(skip).limit(page_size).sort("created_at", -1)
users = []
async for doc in cursor:
doc["id"] = str(doc.pop("_id"))
users.append(doc)
return users
# UPDATE
async def update_user(user_id: str, updates: dict) -> dict | None:
updates["updated_at"] = datetime.now(timezone.utc)
result = await users_col.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$set": updates},
return_document=True,
)
if result:
result["id"] = str(result.pop("_id"))
return result
# DELETE (soft delete)
async def deactivate_user(user_id: str) -> bool:
result = await users_col.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"is_active": False, "deactivated_at": datetime.now(timezone.utc)}},
)
return result.modified_count > 0
# UPSERT
async def upsert_session(session_id: str, user_id: str, data: dict) -> None:
await db["sessions"].update_one(
{"session_id": session_id},
{"$set": {**data, "user_id": user_id, "updated_at": datetime.now(timezone.utc)}},
upsert=True,
)
Aggregation Pipelines
MongoDB aggregation pipelines are the right tool for analytics, joins, and complex transformations. Motor runs them asynchronously with the same pipeline syntax as PyMongo.
async def user_post_stats() -> list[dict]:
"""Count posts per user, include user info."""
pipeline = [
{"$match": {"published": True}},
{"$group": {
"_id": "$author_id",
"post_count": {"$sum": 1},
"latest_post": {"$max": "$created_at"},
}},
{"$lookup": {
"from": "users",
"localField": "_id",
"foreignField": "_id",
"as": "author",
}},
{"$unwind": "$author"},
{"$project": {
"username": "$author.username",
"email": "$author.email",
"post_count": 1,
"latest_post": 1,
}},
{"$sort": {"post_count": -1}},
{"$limit": 10},
]
results = []
async for doc in posts_col.aggregate(pipeline):
doc["_id"] = str(doc["_id"])
results.append(doc)
return results
async def revenue_by_month(year: int) -> list[dict]:
"""Monthly revenue aggregation."""
pipeline = [
{"$match": {"status": "completed", "year": year}},
{"$group": {
"_id": {"month": {"$month": "$created_at"}},
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1},
"avg_order": {"$avg": "$total"},
}},
{"$sort": {"_id.month": 1}},
]
return [doc async for doc in orders_col.aggregate(pipeline)]
Indexes and Performance
Always create indexes explicitly — never rely on MongoDB to figure out what to index. Create indexes at application startup, not in migrations.
import asyncio
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
async def create_indexes():
# Unique index on email
await users_col.create_index("email", unique=True)
# Compound index for common query patterns
await users_col.create_indexes([
IndexModel([("is_active", ASCENDING), ("created_at", DESCENDING)]),
IndexModel([("username", ASCENDING)], unique=True),
])
# Text index for search
await posts_col.create_index([("title", TEXT), ("content", TEXT)])
# TTL index for expiring sessions automatically
await db["sessions"].create_index(
"expires_at",
expireAfterSeconds=0, # MongoDB removes docs when expires_at is past
)
# Partial index — only index active users
await users_col.create_index(
"email",
partialFilterExpression={"is_active": True},
name="active_users_email",
)
# Text search using the text index
async def search_posts(query: str, limit: int = 20) -> list[dict]:
cursor = posts_col.find(
{"$text": {"$search": query}},
{"score": {"$meta": "textScore"}},
).sort([("score", {"$meta": "textScore"})]).limit(limit)
return [doc async for doc in cursor]
Multi-Document Transactions
MongoDB supports ACID transactions across multiple documents and collections since version 4.0 (replica sets) and 4.2 (sharded clusters). Use sessions for transactional operations.
async def transfer_credits(from_user_id: str, to_user_id: str, amount: int) -> bool:
async with await client.start_session() as session:
async with session.start_transaction():
# Debit
result = await users_col.update_one(
{"_id": ObjectId(from_user_id), "credits": {"$gte": amount}},
{"$inc": {"credits": -amount}},
session=session,
)
if result.modified_count == 0:
await session.abort_transaction()
return False # insufficient credits
# Credit
await users_col.update_one(
{"_id": ObjectId(to_user_id)},
{"$inc": {"credits": amount}},
session=session,
)
# Record transaction
await db["transactions"].insert_one({
"from": from_user_id,
"to": to_user_id,
"amount": amount,
"ts": datetime.now(timezone.utc),
}, session=session)
# Transaction commits automatically if no exception raised
return True
Change Streams
Change streams let you react to database changes in real time — perfect for invalidating caches, syncing search indexes, or pushing WebSocket updates.
import asyncio
async def watch_orders():
"""Stream new completed orders — useful for triggering fulfillment."""
pipeline = [
{"$match": {
"operationType": "update",
"updateDescription.updatedFields.status": "completed",
}}
]
async with orders_col.watch(pipeline, full_document="updateLookup") as stream:
async for change in stream:
order = change["fullDocument"]
print(f"Order {order['_id']} completed — amount: {order['total']}")
await trigger_fulfillment(order)
# Run as a background task
async def main():
asyncio.create_task(watch_orders())
# ... rest of application startup
FastAPI + Pydantic Integration
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr, Field
from bson import ObjectId
from typing import Optional
class PyObjectId(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return str(v)
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
class UserResponse(BaseModel):
id: str
username: str
email: str
is_active: bool
created_at: datetime
app = FastAPI()
@app.on_event("startup")
async def startup():
await create_indexes()
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user_endpoint(data: UserCreate):
existing = await users_col.find_one({"email": data.email})
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
user = await create_user(data.username, data.email)
return UserResponse(**user)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user_endpoint(user_id: str):
if not ObjectId.is_valid(user_id):
raise HTTPException(status_code=400, detail="Invalid user ID")
user = await get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(**user)
Frequently Asked Questions
- Motor vs PyMongo vs Beanie — which should I use?
- Use Motor for raw async MongoDB access with full control. Use Beanie (built on Motor) when you want an ODM with Pydantic models, automatic schema validation, and migration support. Use PyMongo for sync scripts, data pipelines, or when async isn't needed.
- How do I handle ObjectId serialization in FastAPI?
- FastAPI's JSON encoder doesn't know about BSON ObjectId. Either convert to string in your response models (as shown above), use a custom JSON encoder, or use Beanie which handles this automatically.
- When should I NOT use transactions?
- Avoid transactions for high-throughput single-document operations — they add latency and limit horizontal scalability. Design your document structure so most operations are single-document, using embedded documents instead of joins where possible.