Python MongoDB with Motor: Async Database Operations
Motor is the official async Python driver for MongoDB, built on top of PyMongo and designed for use with asyncio. It provides non-blocking database I/O that fits naturally into FastAPI, Starlette and any other async Python framework. Unlike PyMongo's synchronous model, Motor never blocks the event loop — every query, insert, and aggregation runs as an awaitable coroutine, making it possible to handle thousands of concurrent connections without spawning extra threads.
Table of Contents
Installation and Connection Setup
Install Motor with pip. For production use, also install dnspython to enable SRV connection strings used by MongoDB Atlas. Motor requires Python 3.7+ and asyncio — it integrates with the running event loop without requiring any extra configuration.
pip install motor dnspython
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi
# Local development
client = AsyncIOMotorClient("mongodb://localhost:27017")
# MongoDB Atlas (SRV connection string)
MONGO_URI = "mongodb+srv://user:password@cluster0.abc.mongodb.net/?retryWrites=true&w=majority"
client = AsyncIOMotorClient(MONGO_URI, server_api=ServerApi('1'))
# Connection with explicit options
client = AsyncIOMotorClient(
MONGO_URI,
maxPoolSize=50, # max connections in pool
minPoolSize=5, # keep alive minimum connections
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=30000,
)
db = client["myapp"]
users = db["users"]
orders = db["orders"]
async def ping():
await client.admin.command('ping')
print("Connected to MongoDB")
asyncio.run(ping())
AsyncIOMotorClient per request.
CRUD Operations
Motor's API mirrors PyMongo exactly, but every method that touches the database returns a coroutine. You use await for single-document operations and async for to iterate over cursors. The document model is standard Python dicts, and MongoDB's _id field is automatically handled as a BSON ObjectId.
from bson import ObjectId
from datetime import datetime, timezone
# INSERT
async def create_user(name: str, email: str) -> str:
doc = {
"name": name,
"email": email,
"created_at": datetime.now(timezone.utc),
"active": True,
}
result = await users.insert_one(doc)
return str(result.inserted_id)
# INSERT MANY
async def bulk_insert(records: list[dict]):
result = await users.insert_many(records, ordered=False)
return len(result.inserted_ids)
# FIND ONE
async def get_user(user_id: str) -> dict | None:
return await users.find_one({"_id": ObjectId(user_id)})
# FIND MANY with cursor
async def list_active_users(limit: int = 100) -> list[dict]:
cursor = users.find({"active": True}).sort("created_at", -1).limit(limit)
return await cursor.to_list(length=limit)
# Async iteration (preferred for large result sets)
async def stream_users():
async for user in users.find({"active": True}):
yield user # use in an async generator
# UPDATE
async def update_email(user_id: str, new_email: str) -> bool:
result = await users.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"email": new_email, "updated_at": datetime.now(timezone.utc)}}
)
return result.modified_count == 1
# UPSERT
async def upsert_user(email: str, data: dict):
await users.update_one(
{"email": email},
{"$set": data, "$setOnInsert": {"created_at": datetime.now(timezone.utc)}},
upsert=True
)
# DELETE
async def delete_user(user_id: str) -> bool:
result = await users.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count == 1
# COUNT
async def count_active() -> int:
return await users.count_documents({"active": True})
Aggregation Pipelines
The aggregation pipeline is MongoDB's most powerful query feature. Motor supports the full pipeline syntax including $lookup, $group, $facet, $bucket and $graphLookup. Pipelines run server-side and are far more efficient than fetching documents and processing in Python. Use to_list() for small result sets or iterate with async for for large ones.
async def order_stats_by_user():
"""Aggregate total order value per user with join."""
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$user_id",
"total": {"$sum": "$amount"},
"count": {"$sum": 1},
"avg": {"$avg": "$amount"},
}},
{"$sort": {"total": -1}},
{"$limit": 10},
{"$lookup": {
"from": "users",
"localField": "_id",
"foreignField": "_id",
"as": "user",
}},
{"$unwind": "$user"},
{"$project": {
"user_name": "$user.name",
"total": 1,
"count": 1,
"avg": {"$round": ["$avg", 2]},
}},
]
return await orders.aggregate(pipeline).to_list(length=10)
async def monthly_revenue():
"""Group revenue by month using date operators."""
pipeline = [
{"$match": {"status": "completed", "created_at": {"$gte": datetime(2026, 1, 1)}}},
{"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"},
},
"revenue": {"$sum": "$amount"},
"orders": {"$sum": 1},
}},
{"$sort": {"_id.year": 1, "_id.month": 1}},
]
return await orders.aggregate(pipeline).to_list(length=None)
allowDiskUse=True to aggregate() when your pipeline groups large datasets that exceed MongoDB's 100 MB in-memory limit.
Indexes and Performance
Indexes are critical for MongoDB query performance. Motor provides async methods to create, list and drop indexes programmatically. Always create indexes at application startup (idempotent — safe to call repeatedly) and use compound indexes to cover your most frequent query patterns. The explain() method helps diagnose slow queries.
async def create_indexes():
"""Create all required indexes at startup."""
# Single field index
await users.create_index("email", unique=True)
# Compound index — covers queries that filter by active + sort by created_at
await users.create_index([("active", 1), ("created_at", -1)])
# Text index for full-text search
await users.create_index([("name", "text"), ("bio", "text")])
# TTL index — automatically delete expired sessions after 1 hour
await db["sessions"].create_index(
"expires_at",
expireAfterSeconds=0 # MongoDB reads the date in expires_at
)
# Partial index — only index active users (smaller, faster)
await users.create_index(
"last_login",
partialFilterExpression={"active": True}
)
async def explain_query():
"""Use explain to check if indexes are being used."""
explanation = await users.find({"email": "alice@example.com"}).explain()
winning_plan = explanation["queryPlanner"]["winningPlan"]
print(winning_plan) # Look for IXSCAN (index scan) vs COLLSCAN (full scan)
async def list_indexes():
async for index in users.list_indexes():
print(index)
Multi-Document Transactions
MongoDB supports ACID multi-document transactions on replica sets (MongoDB Atlas always has a replica set). Motor exposes transactions through a session context manager. Transactions are essential when you need to update multiple collections atomically — for example, deducting inventory and creating an order record together.
async def place_order(user_id: str, product_id: str, qty: int, price: float):
"""Deduct inventory and create order in a single transaction."""
async with await client.start_session() as session:
async with session.start_transaction():
# Check and deduct inventory
result = await db["inventory"].find_one_and_update(
{"_id": ObjectId(product_id), "stock": {"$gte": qty}},
{"$inc": {"stock": -qty}},
session=session,
return_document=True,
)
if not result:
raise ValueError("Insufficient stock")
# Create order record
order = {
"user_id": ObjectId(user_id),
"product_id": ObjectId(product_id),
"qty": qty,
"amount": qty * price,
"status": "pending",
"created_at": datetime.now(timezone.utc),
}
await orders.insert_one(order, session=session)
# Transaction commits automatically when context exits without exception
async def transfer_funds(from_id: str, to_id: str, amount: float):
"""Atomic balance transfer between two accounts."""
async with await client.start_session() as session:
async with session.start_transaction():
debit = await db["accounts"].update_one(
{"_id": ObjectId(from_id), "balance": {"$gte": amount}},
{"$inc": {"balance": -amount}},
session=session,
)
if debit.modified_count == 0:
raise ValueError("Insufficient funds")
await db["accounts"].update_one(
{"_id": ObjectId(to_id)},
{"$inc": {"balance": amount}},
session=session,
)
FastAPI Integration
The standard pattern for FastAPI + Motor is to create the client as a module-level singleton, expose it through a dependency function, and use lifespan events to control startup and shutdown. Pydantic models with a custom PyObjectId type cleanly handle MongoDB's ObjectId serialization in JSON responses.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field
from bson import ObjectId
from typing import Annotated
# database.py
motor_client: AsyncIOMotorClient | None = None
def get_db():
return motor_client["myapp"]
@asynccontextmanager
async def lifespan(app: FastAPI):
global motor_client
motor_client = AsyncIOMotorClient(MONGO_URI)
await motor_client.admin.command("ping")
print("MongoDB connected")
yield
motor_client.close()
print("MongoDB disconnected")
app = FastAPI(lifespan=lifespan)
# Pydantic model
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: str = Field(alias="_id")
name: str
email: str
model_config = {"populate_by_name": True}
# Route
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user_endpoint(body: UserCreate, db=Depends(get_db)):
doc = body.model_dump()
doc["created_at"] = datetime.now(timezone.utc)
result = await db["users"].insert_one(doc)
created = await db["users"].find_one({"_id": result.inserted_id})
created["_id"] = str(created["_id"])
return created
@app.get("/users/{user_id}")
async def get_user_endpoint(user_id: str, db=Depends(get_db)):
user = await db["users"].find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(404, "User not found")
user["_id"] = str(user["_id"])
return user
Error Handling and Retries
Motor raises PyMongo exceptions. The most common are DuplicateKeyError for unique constraint violations, ServerSelectionTimeoutError when the cluster is unreachable, and OperationFailure for query errors. Wrapping database calls with retry logic using tenacity handles transient network issues common in cloud environments.
from pymongo.errors import (
DuplicateKeyError, ServerSelectionTimeoutError, OperationFailure
)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
retry=retry_if_exception_type(ServerSelectionTimeoutError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def robust_insert(collection, document: dict):
try:
return await collection.insert_one(document)
except DuplicateKeyError as e:
# Extract which field caused the duplicate
field = list(e.details.get("keyPattern", {}).keys())[0]
raise ValueError(f"Duplicate value for {field}") from e
except OperationFailure as e:
raise RuntimeError(f"MongoDB operation failed: {e.details}") from e
# Graceful handling in FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse
@app.exception_handler(DuplicateKeyError)
async def duplicate_key_handler(request: Request, exc: DuplicateKeyError):
return JSONResponse(status_code=409, content={"detail": "Resource already exists"})
Production Best Practices
Running Motor in production requires attention to connection pool sizing, document validation, and monitoring. MongoDB's server-side schema validation (JSON Schema) prevents invalid documents from ever reaching the database. Setting maxPoolSize to match your concurrency level avoids connection exhaustion, and enabling retryWrites in the connection string automatically handles transient write failures on replica set elections.
async def setup_collection_validation():
"""Add server-side JSON Schema validation to a collection."""
validator = {
"$jsonSchema": {
"bsonType": "object",
"required": ["name", "email", "created_at"],
"properties": {
"name": {"bsonType": "string", "minLength": 1},
"email": {"bsonType": "string", "pattern": r"^[^@]+@[^@]+\.[^@]+$"},
"active": {"bsonType": "bool"},
}
}
}
await db.command({
"collMod": "users",
"validator": validator,
"validationLevel": "moderate", # only validate new/updated docs
})
# Projection — never fetch fields you don't need
async def get_user_email_only(user_id: str):
return await users.find_one(
{"_id": ObjectId(user_id)},
projection={"email": 1, "_id": 0} # only return email field
)
# Bulk operations for high-throughput writes
from pymongo import InsertOne, UpdateOne, DeleteOne
async def bulk_upsert(records: list[dict]):
operations = [
UpdateOne(
{"email": r["email"]},
{"$set": r, "$setOnInsert": {"created_at": datetime.now(timezone.utc)}},
upsert=True,
)
for r in records
]
result = await users.bulk_write(operations, ordered=False)
return {"upserted": result.upserted_count, "modified": result.modified_count}