Python PostgreSQL with asyncpg: High-Performance Async Driver

asyncpg is the fastest PostgreSQL driver for Python — benchmarks show it 3–5x faster than psycopg2 for I/O-bound workloads. It implements the PostgreSQL binary protocol directly, avoids the overhead of DB-API 2.0, and integrates natively with asyncio. This guide covers connection pools, prepared statements, bulk COPY operations, LISTEN/NOTIFY for real-time events, transactions, and FastAPI integration.

Setup and Connection Pool

pip install asyncpg fastapi uvicorn
import asyncpg
import os

DATABASE_URL = os.environ["DATABASE_URL"]
# e.g., postgresql://user:password@localhost/mydb

# Global connection pool — create once at startup
pool: asyncpg.Pool | None = None

async def create_pool() -> asyncpg.Pool:
    return await asyncpg.create_pool(
        DATABASE_URL,
        min_size=5,
        max_size=20,
        max_inactive_connection_lifetime=300,
        command_timeout=60,
        # Custom type codecs registered here
    )

async def close_pool():
    if pool:
        await pool.close()

# Dependency for FastAPI
async def get_db() -> asyncpg.Connection:
    async with pool.acquire() as conn:
        yield conn
Pool sizing: A good starting point is max_size = 2 × CPU cores for compute-heavy apps, or max_size = (total DB connections limit) / number of app instances. PostgreSQL's default max connections is 100. If you run 5 app instances, each pool should have max_size = 20.

Queries and Type Mapping

asyncpg returns records as Record objects — dict-like, access by column name or index. PostgreSQL types map to Python types automatically: TEXTstr, INTEGERint, TIMESTAMPTZdatetime, JSONBdict.

import asyncpg
from datetime import datetime

async def get_user(conn: asyncpg.Connection, user_id: int) -> dict | None:
    row = await conn.fetchrow(
        "SELECT id, username, email, created_at FROM users WHERE id = $1",
        user_id,
    )
    return dict(row) if row else None

async def list_users(conn: asyncpg.Connection, active_only: bool = True) -> list[dict]:
    query = "SELECT id, username, email FROM users"
    if active_only:
        query += " WHERE is_active = TRUE"
    query += " ORDER BY created_at DESC LIMIT 100"
    rows = await conn.fetch(query)
    return [dict(r) for r in rows]

async def create_user(conn: asyncpg.Connection, username: str, email: str) -> dict:
    row = await conn.fetchrow(
        """
        INSERT INTO users (username, email, created_at)
        VALUES ($1, $2, NOW())
        RETURNING id, username, email, created_at
        """,
        username, email,
    )
    return dict(row)

async def count_posts(conn: asyncpg.Connection) -> int:
    return await conn.fetchval("SELECT COUNT(*) FROM posts WHERE published = TRUE")

# executemany — execute the same statement for many rows
async def bulk_insert_tags(conn: asyncpg.Connection, tags: list[tuple]) -> None:
    await conn.executemany(
        "INSERT INTO tags (name, slug) VALUES ($1, $2) ON CONFLICT DO NOTHING",
        tags,
    )

Prepared Statements

asyncpg automatically caches prepared statements per connection. For hot-path queries executed thousands of times per second, explicit prepared statements give maximum performance — PostgreSQL skips re-parsing and re-planning on every call.

async def setup_prepared_statements(conn: asyncpg.Connection):
    # Explicitly prepare frequently used queries
    get_user_stmt = await conn.prepare(
        "SELECT id, username, email, roles FROM users WHERE id = $1 AND is_active = TRUE"
    )
    get_session_stmt = await conn.prepare(
        "SELECT user_id, expires_at FROM sessions WHERE token = $1"
    )
    return get_user_stmt, get_session_stmt

# Use the prepared statement
async def fast_auth(conn, token: str) -> dict | None:
    _, session_stmt = await setup_prepared_statements(conn)
    row = await session_stmt.fetchrow(token)
    if not row or row["expires_at"] < datetime.now():
        return None
    return dict(row)

# Pool-level prepared statements (cached across connections)
async with pool.acquire() as conn:
    stmt = await conn.prepare("SELECT * FROM products WHERE category = $1 ORDER BY price")
    electronics = await stmt.fetch("electronics")
    clothing = await stmt.fetch("clothing")

Transactions and Savepoints

async def transfer_funds(
    conn: asyncpg.Connection,
    from_account: int,
    to_account: int,
    amount: float,
) -> bool:
    async with conn.transaction():
        # Check balance
        balance = await conn.fetchval(
            "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
            from_account,
        )
        if balance < amount:
            return False  # transaction auto-rolled back

        await conn.execute(
            "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
            amount, from_account,
        )
        await conn.execute(
            "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
            amount, to_account,
        )
        await conn.execute(
            "INSERT INTO transfers (from_id, to_id, amount) VALUES ($1, $2, $3)",
            from_account, to_account, amount,
        )
        return True

# Savepoints for partial rollback within a transaction
async def process_batch_with_savepoints(conn, items: list) -> dict:
    results = {"success": 0, "failed": 0}
    async with conn.transaction():
        for item in items:
            try:
                async with conn.transaction():  # creates a savepoint
                    await conn.execute(
                        "INSERT INTO processed (data) VALUES ($1)",
                        item,
                    )
                    results["success"] += 1
            except Exception:
                results["failed"] += 1  # savepoint rolled back, outer continues
    return results

Bulk COPY Operations

asyncpg's COPY support is the fastest way to bulk-load data into PostgreSQL — typically 10x faster than individual INSERTs or even executemany.

import csv
import io

async def bulk_load_users(conn: asyncpg.Connection, csv_path: str) -> int:
    """Load users from CSV using PostgreSQL COPY — extremely fast."""
    with open(csv_path) as f:
        reader = csv.reader(f)
        next(reader)  # skip header
        rows = [(r[0], r[1], r[2]) for r in reader]  # username, email, password_hash

    result = await conn.copy_records_to_table(
        "users",
        records=rows,
        columns=["username", "email", "password_hash"],
    )
    # Result format: "COPY N" where N is number of rows inserted
    return int(result.split()[-1])

async def export_users_csv(conn: asyncpg.Connection) -> str:
    """Export users table as CSV using COPY TO."""
    output = io.StringIO()
    await conn.copy_from_query(
        "SELECT id, username, email, created_at FROM users WHERE is_active = TRUE",
        output=output,
        format="csv",
        header=True,
    )
    return output.getvalue()

LISTEN/NOTIFY for Real-Time Events

PostgreSQL's LISTEN/NOTIFY is a lightweight pub/sub system built into the database. asyncpg supports it natively, enabling real-time notifications without Redis for simple use cases.

import asyncio
import json

async def listen_for_events(channel: str, callback):
    """Dedicated connection for LISTEN — never pool a listener connection."""
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener(channel, callback)
    print(f"Listening on channel: {channel}")
    try:
        await asyncio.Future()  # run forever
    finally:
        await conn.remove_listener(channel, callback)
        await conn.close()

def on_order_event(conn, pid, channel, payload):
    data = json.loads(payload)
    print(f"New order event: {data}")
    asyncio.create_task(process_order(data))

# Trigger from PostgreSQL or from Python
async def notify_order(conn: asyncpg.Connection, order_id: int, status: str):
    payload = json.dumps({"order_id": order_id, "status": status})
    await conn.execute(f"NOTIFY orders, '{payload}'")

# Trigger from SQL:
# CREATE OR REPLACE FUNCTION notify_order_change() RETURNS TRIGGER AS $$
# BEGIN
#   PERFORM pg_notify('orders', row_to_json(NEW)::text);
#   RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;

async def main():
    asyncio.create_task(listen_for_events("orders", on_order_event))

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
import asyncpg

pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
    yield
    await pool.close()

app = FastAPI(lifespan=lifespan)

async def get_conn():
    async with pool.acquire() as conn:
        yield conn

class UserCreate(BaseModel):
    username: str
    email: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate, conn=Depends(get_conn)):
    try:
        row = await conn.fetchrow(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email",
            data.username, data.email,
        )
        return UserResponse(**dict(row))
    except asyncpg.UniqueViolationError:
        raise HTTPException(status_code=409, detail="Username or email already exists")

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, conn=Depends(get_conn)):
    row = await conn.fetchrow(
        "SELECT id, username, email FROM users WHERE id = $1",
        user_id,
    )
    if not row:
        raise HTTPException(status_code=404, detail="User not found")
    return UserResponse(**dict(row))

Frequently Asked Questions

asyncpg vs psycopg3 vs SQLAlchemy+asyncpg — which to choose?
Use asyncpg directly for maximum performance and control over SQL. Use psycopg3 if you prefer DB-API 2.0 compatibility or use Pydantic with psycopg[binary]. Use SQLAlchemy 2.0 with asyncpg backend when you want an ORM, migrations via Alembic, and portability between databases.
Does asyncpg support JSON/JSONB natively?
Yes. asyncpg automatically encodes Python dicts to JSONB and decodes JSONB columns to Python dicts. Register a custom codec with conn.set_type_codec("jsonb", ...) if you need special handling.
How do I handle connection pool exhaustion?
asyncpg raises asyncpg.PoolAcquireTimeout when all connections are in use and the timeout expires. Set timeout in pool.acquire(timeout=5) and handle the exception to return a 503 response. Monitor pool usage with pool.get_size() and pool.get_idle_size().