Python MongoDB with Motor: Async Database Operations

Motor is the official async Python driver for MongoDB, built on top of PyMongo and asyncio. It enables non-blocking database operations in FastAPI, Starlette, and other async Python frameworks. This guide covers async CRUD operations, aggregation pipelines, indexing strategies, multi-document transactions, change streams for real-time events, and integrating Motor with FastAPI and Pydantic models.

Setup and Connection

pip install motor pydantic[email] fastapi uvicorn
import motor.motor_asyncio
import os
from contextlib import asynccontextmanager

MONGO_URL = os.environ.get("MONGO_URL", "mongodb://localhost:27017")
DB_NAME = os.environ.get("DB_NAME", "myapp")

# Create one client per application (not per request)
client = motor.motor_asyncio.AsyncIOMotorClient(
    MONGO_URL,
    maxPoolSize=10,
    minPoolSize=1,
    serverSelectionTimeoutMS=5000,
)
db = client[DB_NAME]

# Collection references
users_col = db["users"]
posts_col = db["posts"]
orders_col = db["orders"]

async def ping_db():
    await client.admin.command("ping")
    print("MongoDB connected")

Async CRUD Operations

import asyncio
from bson import ObjectId
from datetime import datetime, timezone

# INSERT
async def create_user(username: str, email: str) -> dict:
    doc = {
        "username": username,
        "email": email,
        "created_at": datetime.now(timezone.utc),
        "is_active": True,
        "roles": ["user"],
    }
    result = await users_col.insert_one(doc)
    doc["_id"] = str(result.inserted_id)
    return doc

# READ ONE
async def get_user(user_id: str) -> dict | None:
    doc = await users_col.find_one({"_id": ObjectId(user_id)})
    if doc:
        doc["id"] = str(doc.pop("_id"))
    return doc

# READ MANY with pagination
async def list_users(page: int = 1, page_size: int = 20) -> list[dict]:
    skip = (page - 1) * page_size
    cursor = users_col.find({"is_active": True}).skip(skip).limit(page_size).sort("created_at", -1)
    users = []
    async for doc in cursor:
        doc["id"] = str(doc.pop("_id"))
        users.append(doc)
    return users

# UPDATE
async def update_user(user_id: str, updates: dict) -> dict | None:
    updates["updated_at"] = datetime.now(timezone.utc)
    result = await users_col.find_one_and_update(
        {"_id": ObjectId(user_id)},
        {"$set": updates},
        return_document=True,
    )
    if result:
        result["id"] = str(result.pop("_id"))
    return result

# DELETE (soft delete)
async def deactivate_user(user_id: str) -> bool:
    result = await users_col.update_one(
        {"_id": ObjectId(user_id)},
        {"$set": {"is_active": False, "deactivated_at": datetime.now(timezone.utc)}},
    )
    return result.modified_count > 0

# UPSERT
async def upsert_session(session_id: str, user_id: str, data: dict) -> None:
    await db["sessions"].update_one(
        {"session_id": session_id},
        {"$set": {**data, "user_id": user_id, "updated_at": datetime.now(timezone.utc)}},
        upsert=True,
    )

Aggregation Pipelines

MongoDB aggregation pipelines are the right tool for analytics, joins, and complex transformations. Motor runs them asynchronously with the same pipeline syntax as PyMongo.

async def user_post_stats() -> list[dict]:
    """Count posts per user, include user info."""
    pipeline = [
        {"$match": {"published": True}},
        {"$group": {
            "_id": "$author_id",
            "post_count": {"$sum": 1},
            "latest_post": {"$max": "$created_at"},
        }},
        {"$lookup": {
            "from": "users",
            "localField": "_id",
            "foreignField": "_id",
            "as": "author",
        }},
        {"$unwind": "$author"},
        {"$project": {
            "username": "$author.username",
            "email": "$author.email",
            "post_count": 1,
            "latest_post": 1,
        }},
        {"$sort": {"post_count": -1}},
        {"$limit": 10},
    ]
    results = []
    async for doc in posts_col.aggregate(pipeline):
        doc["_id"] = str(doc["_id"])
        results.append(doc)
    return results

async def revenue_by_month(year: int) -> list[dict]:
    """Monthly revenue aggregation."""
    pipeline = [
        {"$match": {"status": "completed", "year": year}},
        {"$group": {
            "_id": {"month": {"$month": "$created_at"}},
            "revenue": {"$sum": "$total"},
            "orders": {"$sum": 1},
            "avg_order": {"$avg": "$total"},
        }},
        {"$sort": {"_id.month": 1}},
    ]
    return [doc async for doc in orders_col.aggregate(pipeline)]

Indexes and Performance

Always create indexes explicitly — never rely on MongoDB to figure out what to index. Create indexes at application startup, not in migrations.

import asyncio
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT

async def create_indexes():
    # Unique index on email
    await users_col.create_index("email", unique=True)

    # Compound index for common query patterns
    await users_col.create_indexes([
        IndexModel([("is_active", ASCENDING), ("created_at", DESCENDING)]),
        IndexModel([("username", ASCENDING)], unique=True),
    ])

    # Text index for search
    await posts_col.create_index([("title", TEXT), ("content", TEXT)])

    # TTL index for expiring sessions automatically
    await db["sessions"].create_index(
        "expires_at",
        expireAfterSeconds=0,  # MongoDB removes docs when expires_at is past
    )

    # Partial index — only index active users
    await users_col.create_index(
        "email",
        partialFilterExpression={"is_active": True},
        name="active_users_email",
    )

# Text search using the text index
async def search_posts(query: str, limit: int = 20) -> list[dict]:
    cursor = posts_col.find(
        {"$text": {"$search": query}},
        {"score": {"$meta": "textScore"}},
    ).sort([("score", {"$meta": "textScore"})]).limit(limit)
    return [doc async for doc in cursor]

Multi-Document Transactions

MongoDB supports ACID transactions across multiple documents and collections since version 4.0 (replica sets) and 4.2 (sharded clusters). Use sessions for transactional operations.

async def transfer_credits(from_user_id: str, to_user_id: str, amount: int) -> bool:
    async with await client.start_session() as session:
        async with session.start_transaction():
            # Debit
            result = await users_col.update_one(
                {"_id": ObjectId(from_user_id), "credits": {"$gte": amount}},
                {"$inc": {"credits": -amount}},
                session=session,
            )
            if result.modified_count == 0:
                await session.abort_transaction()
                return False  # insufficient credits

            # Credit
            await users_col.update_one(
                {"_id": ObjectId(to_user_id)},
                {"$inc": {"credits": amount}},
                session=session,
            )

            # Record transaction
            await db["transactions"].insert_one({
                "from": from_user_id,
                "to": to_user_id,
                "amount": amount,
                "ts": datetime.now(timezone.utc),
            }, session=session)

            # Transaction commits automatically if no exception raised
            return True

Change Streams

Change streams let you react to database changes in real time — perfect for invalidating caches, syncing search indexes, or pushing WebSocket updates.

import asyncio

async def watch_orders():
    """Stream new completed orders — useful for triggering fulfillment."""
    pipeline = [
        {"$match": {
            "operationType": "update",
            "updateDescription.updatedFields.status": "completed",
        }}
    ]
    async with orders_col.watch(pipeline, full_document="updateLookup") as stream:
        async for change in stream:
            order = change["fullDocument"]
            print(f"Order {order['_id']} completed — amount: {order['total']}")
            await trigger_fulfillment(order)

# Run as a background task
async def main():
    asyncio.create_task(watch_orders())
    # ... rest of application startup

FastAPI + Pydantic Integration

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr, Field
from bson import ObjectId
from typing import Optional

class PyObjectId(str):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid ObjectId")
        return str(v)

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr

class UserResponse(BaseModel):
    id: str
    username: str
    email: str
    is_active: bool
    created_at: datetime

app = FastAPI()

@app.on_event("startup")
async def startup():
    await create_indexes()

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user_endpoint(data: UserCreate):
    existing = await users_col.find_one({"email": data.email})
    if existing:
        raise HTTPException(status_code=409, detail="Email already registered")
    user = await create_user(data.username, data.email)
    return UserResponse(**user)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user_endpoint(user_id: str):
    if not ObjectId.is_valid(user_id):
        raise HTTPException(status_code=400, detail="Invalid user ID")
    user = await get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return UserResponse(**user)

Frequently Asked Questions

Motor vs PyMongo vs Beanie — which should I use?
Use Motor for raw async MongoDB access with full control. Use Beanie (built on Motor) when you want an ODM with Pydantic models, automatic schema validation, and migration support. Use PyMongo for sync scripts, data pipelines, or when async isn't needed.
How do I handle ObjectId serialization in FastAPI?
FastAPI's JSON encoder doesn't know about BSON ObjectId. Either convert to string in your response models (as shown above), use a custom JSON encoder, or use Beanie which handles this automatically.
When should I NOT use transactions?
Avoid transactions for high-throughput single-document operations — they add latency and limit horizontal scalability. Design your document structure so most operations are single-document, using embedded documents instead of joins where possible.