Python PostgreSQL with asyncpg: High-Performance Async Driver
asyncpg is the fastest PostgreSQL driver for Python — benchmarks show it 3–5x faster than psycopg2 for I/O-bound workloads. It implements the PostgreSQL binary protocol directly, avoids the overhead of DB-API 2.0, and integrates natively with asyncio. This guide covers connection pools, prepared statements, bulk COPY operations, LISTEN/NOTIFY for real-time events, transactions, and FastAPI integration.
Table of Contents
Setup and Connection Pool
pip install asyncpg fastapi uvicorn
import asyncpg
import os
DATABASE_URL = os.environ["DATABASE_URL"]
# e.g., postgresql://user:password@localhost/mydb
# Global connection pool — create once at startup
pool: asyncpg.Pool | None = None
async def create_pool() -> asyncpg.Pool:
return await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
max_inactive_connection_lifetime=300,
command_timeout=60,
# Custom type codecs registered here
)
async def close_pool():
if pool:
await pool.close()
# Dependency for FastAPI
async def get_db() -> asyncpg.Connection:
async with pool.acquire() as conn:
yield conn
max_size = 2 × CPU cores for compute-heavy apps, or max_size = (total DB connections limit) / number of app instances. PostgreSQL's default max connections is 100. If you run 5 app instances, each pool should have max_size = 20.
Queries and Type Mapping
asyncpg returns records as Record objects — dict-like, access by column name or index. PostgreSQL types map to Python types automatically: TEXT→str, INTEGER→int, TIMESTAMPTZ→datetime, JSONB→dict.
import asyncpg
from datetime import datetime
async def get_user(conn: asyncpg.Connection, user_id: int) -> dict | None:
row = await conn.fetchrow(
"SELECT id, username, email, created_at FROM users WHERE id = $1",
user_id,
)
return dict(row) if row else None
async def list_users(conn: asyncpg.Connection, active_only: bool = True) -> list[dict]:
query = "SELECT id, username, email FROM users"
if active_only:
query += " WHERE is_active = TRUE"
query += " ORDER BY created_at DESC LIMIT 100"
rows = await conn.fetch(query)
return [dict(r) for r in rows]
async def create_user(conn: asyncpg.Connection, username: str, email: str) -> dict:
row = await conn.fetchrow(
"""
INSERT INTO users (username, email, created_at)
VALUES ($1, $2, NOW())
RETURNING id, username, email, created_at
""",
username, email,
)
return dict(row)
async def count_posts(conn: asyncpg.Connection) -> int:
return await conn.fetchval("SELECT COUNT(*) FROM posts WHERE published = TRUE")
# executemany — execute the same statement for many rows
async def bulk_insert_tags(conn: asyncpg.Connection, tags: list[tuple]) -> None:
await conn.executemany(
"INSERT INTO tags (name, slug) VALUES ($1, $2) ON CONFLICT DO NOTHING",
tags,
)
Prepared Statements
asyncpg automatically caches prepared statements per connection. For hot-path queries executed thousands of times per second, explicit prepared statements give maximum performance — PostgreSQL skips re-parsing and re-planning on every call.
async def setup_prepared_statements(conn: asyncpg.Connection):
# Explicitly prepare frequently used queries
get_user_stmt = await conn.prepare(
"SELECT id, username, email, roles FROM users WHERE id = $1 AND is_active = TRUE"
)
get_session_stmt = await conn.prepare(
"SELECT user_id, expires_at FROM sessions WHERE token = $1"
)
return get_user_stmt, get_session_stmt
# Use the prepared statement
async def fast_auth(conn, token: str) -> dict | None:
_, session_stmt = await setup_prepared_statements(conn)
row = await session_stmt.fetchrow(token)
if not row or row["expires_at"] < datetime.now():
return None
return dict(row)
# Pool-level prepared statements (cached across connections)
async with pool.acquire() as conn:
stmt = await conn.prepare("SELECT * FROM products WHERE category = $1 ORDER BY price")
electronics = await stmt.fetch("electronics")
clothing = await stmt.fetch("clothing")
Transactions and Savepoints
async def transfer_funds(
conn: asyncpg.Connection,
from_account: int,
to_account: int,
amount: float,
) -> bool:
async with conn.transaction():
# Check balance
balance = await conn.fetchval(
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
from_account,
)
if balance < amount:
return False # transaction auto-rolled back
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_account,
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_account,
)
await conn.execute(
"INSERT INTO transfers (from_id, to_id, amount) VALUES ($1, $2, $3)",
from_account, to_account, amount,
)
return True
# Savepoints for partial rollback within a transaction
async def process_batch_with_savepoints(conn, items: list) -> dict:
results = {"success": 0, "failed": 0}
async with conn.transaction():
for item in items:
try:
async with conn.transaction(): # creates a savepoint
await conn.execute(
"INSERT INTO processed (data) VALUES ($1)",
item,
)
results["success"] += 1
except Exception:
results["failed"] += 1 # savepoint rolled back, outer continues
return results
Bulk COPY Operations
asyncpg's COPY support is the fastest way to bulk-load data into PostgreSQL — typically 10x faster than individual INSERTs or even executemany.
import csv
import io
async def bulk_load_users(conn: asyncpg.Connection, csv_path: str) -> int:
"""Load users from CSV using PostgreSQL COPY — extremely fast."""
with open(csv_path) as f:
reader = csv.reader(f)
next(reader) # skip header
rows = [(r[0], r[1], r[2]) for r in reader] # username, email, password_hash
result = await conn.copy_records_to_table(
"users",
records=rows,
columns=["username", "email", "password_hash"],
)
# Result format: "COPY N" where N is number of rows inserted
return int(result.split()[-1])
async def export_users_csv(conn: asyncpg.Connection) -> str:
"""Export users table as CSV using COPY TO."""
output = io.StringIO()
await conn.copy_from_query(
"SELECT id, username, email, created_at FROM users WHERE is_active = TRUE",
output=output,
format="csv",
header=True,
)
return output.getvalue()
LISTEN/NOTIFY for Real-Time Events
PostgreSQL's LISTEN/NOTIFY is a lightweight pub/sub system built into the database. asyncpg supports it natively, enabling real-time notifications without Redis for simple use cases.
import asyncio
import json
async def listen_for_events(channel: str, callback):
"""Dedicated connection for LISTEN — never pool a listener connection."""
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener(channel, callback)
print(f"Listening on channel: {channel}")
try:
await asyncio.Future() # run forever
finally:
await conn.remove_listener(channel, callback)
await conn.close()
def on_order_event(conn, pid, channel, payload):
data = json.loads(payload)
print(f"New order event: {data}")
asyncio.create_task(process_order(data))
# Trigger from PostgreSQL or from Python
async def notify_order(conn: asyncpg.Connection, order_id: int, status: str):
payload = json.dumps({"order_id": order_id, "status": status})
await conn.execute(f"NOTIFY orders, '{payload}'")
# Trigger from SQL:
# CREATE OR REPLACE FUNCTION notify_order_change() RETURNS TRIGGER AS $$
# BEGIN
# PERFORM pg_notify('orders', row_to_json(NEW)::text);
# RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;
async def main():
asyncio.create_task(listen_for_events("orders", on_order_event))
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
import asyncpg
pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
async def get_conn():
async with pool.acquire() as conn:
yield conn
class UserCreate(BaseModel):
username: str
email: str
class UserResponse(BaseModel):
id: int
username: str
email: str
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate, conn=Depends(get_conn)):
try:
row = await conn.fetchrow(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email",
data.username, data.email,
)
return UserResponse(**dict(row))
except asyncpg.UniqueViolationError:
raise HTTPException(status_code=409, detail="Username or email already exists")
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, conn=Depends(get_conn)):
row = await conn.fetchrow(
"SELECT id, username, email FROM users WHERE id = $1",
user_id,
)
if not row:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(**dict(row))
Frequently Asked Questions
- asyncpg vs psycopg3 vs SQLAlchemy+asyncpg — which to choose?
- Use asyncpg directly for maximum performance and control over SQL. Use psycopg3 if you prefer DB-API 2.0 compatibility or use Pydantic with
psycopg[binary]. Use SQLAlchemy 2.0 with asyncpg backend when you want an ORM, migrations via Alembic, and portability between databases. - Does asyncpg support JSON/JSONB natively?
- Yes. asyncpg automatically encodes Python dicts to JSONB and decodes JSONB columns to Python dicts. Register a custom codec with
conn.set_type_codec("jsonb", ...)if you need special handling. - How do I handle connection pool exhaustion?
- asyncpg raises
asyncpg.PoolAcquireTimeoutwhen all connections are in use and the timeout expires. Settimeoutinpool.acquire(timeout=5)and handle the exception to return a 503 response. Monitor pool usage withpool.get_size()andpool.get_idle_size().