Python MongoDB with Motor: Async Database Operations

Motor is the official async Python driver for MongoDB, built on top of PyMongo and designed for use with asyncio. It provides non-blocking database I/O that fits naturally into FastAPI, Starlette and any other async Python framework. Unlike PyMongo's synchronous model, Motor never blocks the event loop — every query, insert, and aggregation runs as an awaitable coroutine, making it possible to handle thousands of concurrent connections without spawning extra threads.

Installation and Connection Setup

Install Motor with pip. For production use, also install dnspython to enable SRV connection strings used by MongoDB Atlas. Motor requires Python 3.7+ and asyncio — it integrates with the running event loop without requiring any extra configuration.

pip install motor dnspython
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi

# Local development
client = AsyncIOMotorClient("mongodb://localhost:27017")

# MongoDB Atlas (SRV connection string)
MONGO_URI = "mongodb+srv://user:password@cluster0.abc.mongodb.net/?retryWrites=true&w=majority"
client = AsyncIOMotorClient(MONGO_URI, server_api=ServerApi('1'))

# Connection with explicit options
client = AsyncIOMotorClient(
    MONGO_URI,
    maxPoolSize=50,          # max connections in pool
    minPoolSize=5,           # keep alive minimum connections
    serverSelectionTimeoutMS=5000,
    connectTimeoutMS=10000,
    socketTimeoutMS=30000,
)

db = client["myapp"]
users = db["users"]
orders = db["orders"]

async def ping():
    await client.admin.command('ping')
    print("Connected to MongoDB")

asyncio.run(ping())
Connection pooling: Motor maintains a connection pool shared across all coroutines. Create the client once at application startup and reuse it — never create a new AsyncIOMotorClient per request.

CRUD Operations

Motor's API mirrors PyMongo exactly, but every method that touches the database returns a coroutine. You use await for single-document operations and async for to iterate over cursors. The document model is standard Python dicts, and MongoDB's _id field is automatically handled as a BSON ObjectId.

from bson import ObjectId
from datetime import datetime, timezone

# INSERT
async def create_user(name: str, email: str) -> str:
    doc = {
        "name": name,
        "email": email,
        "created_at": datetime.now(timezone.utc),
        "active": True,
    }
    result = await users.insert_one(doc)
    return str(result.inserted_id)

# INSERT MANY
async def bulk_insert(records: list[dict]):
    result = await users.insert_many(records, ordered=False)
    return len(result.inserted_ids)

# FIND ONE
async def get_user(user_id: str) -> dict | None:
    return await users.find_one({"_id": ObjectId(user_id)})

# FIND MANY with cursor
async def list_active_users(limit: int = 100) -> list[dict]:
    cursor = users.find({"active": True}).sort("created_at", -1).limit(limit)
    return await cursor.to_list(length=limit)

# Async iteration (preferred for large result sets)
async def stream_users():
    async for user in users.find({"active": True}):
        yield user  # use in an async generator

# UPDATE
async def update_email(user_id: str, new_email: str) -> bool:
    result = await users.update_one(
        {"_id": ObjectId(user_id)},
        {"$set": {"email": new_email, "updated_at": datetime.now(timezone.utc)}}
    )
    return result.modified_count == 1

# UPSERT
async def upsert_user(email: str, data: dict):
    await users.update_one(
        {"email": email},
        {"$set": data, "$setOnInsert": {"created_at": datetime.now(timezone.utc)}},
        upsert=True
    )

# DELETE
async def delete_user(user_id: str) -> bool:
    result = await users.delete_one({"_id": ObjectId(user_id)})
    return result.deleted_count == 1

# COUNT
async def count_active() -> int:
    return await users.count_documents({"active": True})

Aggregation Pipelines

The aggregation pipeline is MongoDB's most powerful query feature. Motor supports the full pipeline syntax including $lookup, $group, $facet, $bucket and $graphLookup. Pipelines run server-side and are far more efficient than fetching documents and processing in Python. Use to_list() for small result sets or iterate with async for for large ones.

async def order_stats_by_user():
    """Aggregate total order value per user with join."""
    pipeline = [
        {"$match": {"status": "completed"}},
        {"$group": {
            "_id": "$user_id",
            "total": {"$sum": "$amount"},
            "count": {"$sum": 1},
            "avg": {"$avg": "$amount"},
        }},
        {"$sort": {"total": -1}},
        {"$limit": 10},
        {"$lookup": {
            "from": "users",
            "localField": "_id",
            "foreignField": "_id",
            "as": "user",
        }},
        {"$unwind": "$user"},
        {"$project": {
            "user_name": "$user.name",
            "total": 1,
            "count": 1,
            "avg": {"$round": ["$avg", 2]},
        }},
    ]
    return await orders.aggregate(pipeline).to_list(length=10)

async def monthly_revenue():
    """Group revenue by month using date operators."""
    pipeline = [
        {"$match": {"status": "completed", "created_at": {"$gte": datetime(2026, 1, 1)}}},
        {"$group": {
            "_id": {
                "year": {"$year": "$created_at"},
                "month": {"$month": "$created_at"},
            },
            "revenue": {"$sum": "$amount"},
            "orders": {"$sum": 1},
        }},
        {"$sort": {"_id.year": 1, "_id.month": 1}},
    ]
    return await orders.aggregate(pipeline).to_list(length=None)
Tip: Pass allowDiskUse=True to aggregate() when your pipeline groups large datasets that exceed MongoDB's 100 MB in-memory limit.

Indexes and Performance

Indexes are critical for MongoDB query performance. Motor provides async methods to create, list and drop indexes programmatically. Always create indexes at application startup (idempotent — safe to call repeatedly) and use compound indexes to cover your most frequent query patterns. The explain() method helps diagnose slow queries.

async def create_indexes():
    """Create all required indexes at startup."""
    # Single field index
    await users.create_index("email", unique=True)

    # Compound index — covers queries that filter by active + sort by created_at
    await users.create_index([("active", 1), ("created_at", -1)])

    # Text index for full-text search
    await users.create_index([("name", "text"), ("bio", "text")])

    # TTL index — automatically delete expired sessions after 1 hour
    await db["sessions"].create_index(
        "expires_at",
        expireAfterSeconds=0  # MongoDB reads the date in expires_at
    )

    # Partial index — only index active users (smaller, faster)
    await users.create_index(
        "last_login",
        partialFilterExpression={"active": True}
    )

async def explain_query():
    """Use explain to check if indexes are being used."""
    explanation = await users.find({"email": "alice@example.com"}).explain()
    winning_plan = explanation["queryPlanner"]["winningPlan"]
    print(winning_plan)  # Look for IXSCAN (index scan) vs COLLSCAN (full scan)

async def list_indexes():
    async for index in users.list_indexes():
        print(index)

Multi-Document Transactions

MongoDB supports ACID multi-document transactions on replica sets (MongoDB Atlas always has a replica set). Motor exposes transactions through a session context manager. Transactions are essential when you need to update multiple collections atomically — for example, deducting inventory and creating an order record together.

async def place_order(user_id: str, product_id: str, qty: int, price: float):
    """Deduct inventory and create order in a single transaction."""
    async with await client.start_session() as session:
        async with session.start_transaction():
            # Check and deduct inventory
            result = await db["inventory"].find_one_and_update(
                {"_id": ObjectId(product_id), "stock": {"$gte": qty}},
                {"$inc": {"stock": -qty}},
                session=session,
                return_document=True,
            )
            if not result:
                raise ValueError("Insufficient stock")

            # Create order record
            order = {
                "user_id": ObjectId(user_id),
                "product_id": ObjectId(product_id),
                "qty": qty,
                "amount": qty * price,
                "status": "pending",
                "created_at": datetime.now(timezone.utc),
            }
            await orders.insert_one(order, session=session)
            # Transaction commits automatically when context exits without exception

async def transfer_funds(from_id: str, to_id: str, amount: float):
    """Atomic balance transfer between two accounts."""
    async with await client.start_session() as session:
        async with session.start_transaction():
            debit = await db["accounts"].update_one(
                {"_id": ObjectId(from_id), "balance": {"$gte": amount}},
                {"$inc": {"balance": -amount}},
                session=session,
            )
            if debit.modified_count == 0:
                raise ValueError("Insufficient funds")
            await db["accounts"].update_one(
                {"_id": ObjectId(to_id)},
                {"$inc": {"balance": amount}},
                session=session,
            )

FastAPI Integration

The standard pattern for FastAPI + Motor is to create the client as a module-level singleton, expose it through a dependency function, and use lifespan events to control startup and shutdown. Pydantic models with a custom PyObjectId type cleanly handle MongoDB's ObjectId serialization in JSON responses.

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field
from bson import ObjectId
from typing import Annotated

# database.py
motor_client: AsyncIOMotorClient | None = None

def get_db():
    return motor_client["myapp"]

@asynccontextmanager
async def lifespan(app: FastAPI):
    global motor_client
    motor_client = AsyncIOMotorClient(MONGO_URI)
    await motor_client.admin.command("ping")
    print("MongoDB connected")
    yield
    motor_client.close()
    print("MongoDB disconnected")

app = FastAPI(lifespan=lifespan)

# Pydantic model
class UserCreate(BaseModel):
    name: str
    email: str

class UserResponse(BaseModel):
    id: str = Field(alias="_id")
    name: str
    email: str

    model_config = {"populate_by_name": True}

# Route
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user_endpoint(body: UserCreate, db=Depends(get_db)):
    doc = body.model_dump()
    doc["created_at"] = datetime.now(timezone.utc)
    result = await db["users"].insert_one(doc)
    created = await db["users"].find_one({"_id": result.inserted_id})
    created["_id"] = str(created["_id"])
    return created

@app.get("/users/{user_id}")
async def get_user_endpoint(user_id: str, db=Depends(get_db)):
    user = await db["users"].find_one({"_id": ObjectId(user_id)})
    if not user:
        raise HTTPException(404, "User not found")
    user["_id"] = str(user["_id"])
    return user

Error Handling and Retries

Motor raises PyMongo exceptions. The most common are DuplicateKeyError for unique constraint violations, ServerSelectionTimeoutError when the cluster is unreachable, and OperationFailure for query errors. Wrapping database calls with retry logic using tenacity handles transient network issues common in cloud environments.

from pymongo.errors import (
    DuplicateKeyError, ServerSelectionTimeoutError, OperationFailure
)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    retry=retry_if_exception_type(ServerSelectionTimeoutError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def robust_insert(collection, document: dict):
    try:
        return await collection.insert_one(document)
    except DuplicateKeyError as e:
        # Extract which field caused the duplicate
        field = list(e.details.get("keyPattern", {}).keys())[0]
        raise ValueError(f"Duplicate value for {field}") from e
    except OperationFailure as e:
        raise RuntimeError(f"MongoDB operation failed: {e.details}") from e

# Graceful handling in FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse

@app.exception_handler(DuplicateKeyError)
async def duplicate_key_handler(request: Request, exc: DuplicateKeyError):
    return JSONResponse(status_code=409, content={"detail": "Resource already exists"})

Production Best Practices

Running Motor in production requires attention to connection pool sizing, document validation, and monitoring. MongoDB's server-side schema validation (JSON Schema) prevents invalid documents from ever reaching the database. Setting maxPoolSize to match your concurrency level avoids connection exhaustion, and enabling retryWrites in the connection string automatically handles transient write failures on replica set elections.

async def setup_collection_validation():
    """Add server-side JSON Schema validation to a collection."""
    validator = {
        "$jsonSchema": {
            "bsonType": "object",
            "required": ["name", "email", "created_at"],
            "properties": {
                "name": {"bsonType": "string", "minLength": 1},
                "email": {"bsonType": "string", "pattern": r"^[^@]+@[^@]+\.[^@]+$"},
                "active": {"bsonType": "bool"},
            }
        }
    }
    await db.command({
        "collMod": "users",
        "validator": validator,
        "validationLevel": "moderate",  # only validate new/updated docs
    })

# Projection — never fetch fields you don't need
async def get_user_email_only(user_id: str):
    return await users.find_one(
        {"_id": ObjectId(user_id)},
        projection={"email": 1, "_id": 0}  # only return email field
    )

# Bulk operations for high-throughput writes
from pymongo import InsertOne, UpdateOne, DeleteOne

async def bulk_upsert(records: list[dict]):
    operations = [
        UpdateOne(
            {"email": r["email"]},
            {"$set": r, "$setOnInsert": {"created_at": datetime.now(timezone.utc)}},
            upsert=True,
        )
        for r in records
    ]
    result = await users.bulk_write(operations, ordered=False)
    return {"upserted": result.upserted_count, "modified": result.modified_count}
Monitoring: Enable MongoDB's built-in command monitoring with Motor's event listeners to log slow queries. Set the slow query threshold in your Atlas cluster to 100ms and review the Atlas Performance Advisor weekly for index recommendations.