Python PostgreSQL with asyncpg: High-Performance Async Driver

asyncpg is the fastest PostgreSQL driver for Python, written in Cython and using the binary PostgreSQL protocol directly without psycopg2's intermediary layer. It benchmarks at 3–5x the throughput of psycopg2 for async workloads and integrates natively with Python's asyncio event loop. Its connection pooling, prepared statement caching, and support for PostgreSQL-specific features like NOTIFY/LISTEN, COPY and composite types make it the go-to choice for high-performance async applications.

Installation and First Connection

asyncpg requires no C extension compilation on modern platforms — binary wheels are available for all major Python versions on Linux, macOS and Windows. It connects directly to PostgreSQL using the native binary protocol, which avoids the overhead of text encoding/decoding that psycopg2 incurs for every query result.

pip install asyncpg
import asyncio
import asyncpg

DSN = "postgresql://user:password@localhost:5432/mydb"

async def basic_usage():
    # Single connection (for scripts or tests)
    conn = await asyncpg.connect(DSN)
    try:
        version = await conn.fetchval("SELECT version()")
        print(version)

        # fetch — returns list of Record objects
        rows = await conn.fetch("SELECT id, name FROM users LIMIT 5")
        for row in rows:
            print(dict(row))

        # fetchrow — returns single Record or None
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE email = $1", "alice@example.com"
        )

        # fetchval — returns single scalar value
        count = await conn.fetchval("SELECT count(*) FROM users")
        print(f"Total users: {count}")

        # execute — DDL or DML (returns status string)
        status = await conn.execute(
            "UPDATE users SET active = $1 WHERE id = $2", True, 42
        )
        print(status)  # e.g. "UPDATE 1"
    finally:
        await conn.close()

asyncio.run(basic_usage())
Record objects: asyncpg returns asyncpg.Record objects, not dicts. They support both index and attribute access: row[0], row["name"], and dict(row). They are read-only and more memory-efficient than dicts.

Connection Pool Management

Creating a new connection for every request is expensive — each TCP handshake and PostgreSQL authentication round-trip adds 5–50ms of latency. asyncpg's connection pool maintains a reusable set of authenticated connections and hands them out to concurrent coroutines without blocking. The pool automatically handles connection health checks, reconnects on failure, and respects PostgreSQL's max_connections setting.

import asyncpg
from asyncpg.pool import Pool

pool: Pool | None = None

async def create_pool() -> Pool:
    return await asyncpg.create_pool(
        dsn=DSN,
        min_size=5,         # keep these connections warm at all times
        max_size=20,        # never exceed this
        max_inactive_connection_lifetime=300,  # recycle idle connections every 5 min
        command_timeout=30, # cancel queries running longer than 30s
        statement_cache_size=1000,  # cache prepared statements per connection
        init=_init_connection,  # called after each connection is created
    )

async def _init_connection(conn):
    """Set connection-level settings for every new connection."""
    await conn.set_type_codec(
        "jsonb",
        encoder=lambda v: v,  # pass Python dict directly
        decoder=lambda v: v,
        schema="pg_catalog",
        format="text",
    )
    await conn.execute("SET TIME ZONE 'UTC'")
    await conn.execute("SET search_path TO myschema, public")

async def get_user(user_id: int):
    async with pool.acquire() as conn:
        return await conn.fetchrow(
            "SELECT id, name, email FROM users WHERE id = $1", user_id
        )

# Context manager automatically returns connection to pool
async def with_timeout(user_id: int):
    async with pool.acquire(timeout=5.0) as conn:  # wait max 5s for a free connection
        return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)

Executing Queries

asyncpg uses positional placeholders ($1, $2, …) rather than %s or named parameters. This maps directly to the PostgreSQL wire protocol and avoids any string interpolation, making SQL injection impossible by design. asyncpg also infers the parameter types from the query and sends them as binary, which eliminates type conversion overhead on both client and server.

from datetime import date

async def complex_queries(pool):
    async with pool.acquire() as conn:
        # Array parameters — pass Python lists directly
        ids = [1, 2, 3, 4, 5]
        rows = await conn.fetch(
            "SELECT * FROM users WHERE id = ANY($1::int[])", ids
        )

        # Insert with RETURNING
        new_user = await conn.fetchrow(
            """INSERT INTO users (name, email, created_at)
               VALUES ($1, $2, NOW())
               RETURNING id, name, email, created_at""",
            "Bob", "bob@example.com"
        )

        # executemany — run one statement with many parameter sets
        await conn.executemany(
            "INSERT INTO tags (user_id, tag) VALUES ($1, $2)",
            [(1, "python"), (1, "asyncio"), (2, "fastapi")]
        )

        # JSON/JSONB columns — pass Python dicts directly
        await conn.execute(
            "UPDATE users SET preferences = $1 WHERE id = $2",
            {"theme": "dark", "lang": "en"},
            42
        )

        # Date range query
        rows = await conn.fetch(
            "SELECT * FROM events WHERE event_date BETWEEN $1 AND $2",
            date(2026, 1, 1), date(2026, 12, 31)
        )

        # Iterate large results without loading all into memory
        async with conn.transaction():
            async for record in conn.cursor(
                "SELECT * FROM large_table WHERE active = $1", True
            ):
                process(record)

Transactions and Savepoints

asyncpg transactions are explicit context managers. Nesting transactions creates savepoints automatically — if the inner block raises, only the savepoint rolls back, not the outer transaction. This is PostgreSQL's native savepoint mechanism, which asyncpg exposes without any additional API.

async def transfer_with_savepoint(pool, from_id: int, to_id: int, amount: float):
    async with pool.acquire() as conn:
        async with conn.transaction():
            # Outer transaction
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                amount, from_id
            )

            try:
                async with conn.transaction():  # inner = savepoint
                    await conn.execute(
                        "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                        amount, to_id
                    )
                    # Simulate a validation failure
                    balance = await conn.fetchval(
                        "SELECT balance FROM accounts WHERE id = $1", to_id
                    )
                    if balance > 1_000_000:
                        raise ValueError("Balance limit exceeded")
            except ValueError:
                # Only inner savepoint rolled back; outer transaction still active
                await conn.execute(
                    "INSERT INTO failed_transfers (from_id, to_id, amount) VALUES ($1,$2,$3)",
                    from_id, to_id, amount
                )
                raise

async def read_committed_isolation(pool):
    async with pool.acquire() as conn:
        async with conn.transaction(isolation="serializable"):
            # All reads within this block see a consistent snapshot
            count = await conn.fetchval("SELECT count(*) FROM orders WHERE status='pending'")
            if count > 1000:
                await conn.execute("UPDATE queue_config SET paused = true")

Prepared Statements

asyncpg automatically prepares and caches frequently executed statements. You can also explicitly prepare statements to reuse across many executions — PostgreSQL compiles the query plan once and re-executes it with different parameters, saving planning time on complex queries. The prepared statement is tied to a single connection, so use it within the same acquire() block.

async def use_prepared_statements(pool):
    async with pool.acquire() as conn:
        # Explicitly prepare a statement — useful for tight loops
        stmt = await conn.prepare(
            "SELECT id, name, email FROM users WHERE active = $1 AND created_at > $2"
        )

        # Execute many times — plan is cached
        from datetime import datetime, timedelta
        cutoff = datetime.utcnow() - timedelta(days=30)
        recent_active = await stmt.fetch(True, cutoff)

        # Get type info about parameters
        print(stmt.get_parameters())  # [, ]
        print(stmt.get_attributes())  # column names and types

        # Explicit caching across the pool using statement_cache_size
        # asyncpg maintains a per-connection LRU cache of N prepared statements
        # Queries run via fetch/fetchrow/execute are auto-prepared on first use

COPY for Bulk Data Loading

PostgreSQL's COPY command is the fastest way to load large datasets — 10–100x faster than individual INSERT statements. asyncpg exposes COPY as an async streaming interface: you push data row by row and asyncpg streams it to PostgreSQL over the binary protocol without building an in-memory buffer of the entire dataset.

import csv
import io

async def bulk_load_csv(pool, csv_path: str, table: str):
    """Load a CSV file into PostgreSQL using COPY — extremely fast."""
    async with pool.acquire() as conn:
        with open(csv_path, "r") as f:
            reader = csv.reader(f)
            header = next(reader)  # skip header row
            await conn.copy_records_to_table(
                table,
                records=reader,
                columns=header,
            )

async def copy_from_query(pool):
    """Export query results to a CSV-format string."""
    async with pool.acquire() as conn:
        buf = io.StringIO()
        await conn.copy_from_query(
            "SELECT id, name, email FROM users WHERE active = TRUE",
            output=buf,
            format="csv",
            header=True,
        )
        buf.seek(0)
        return buf.read()

async def stream_large_dataset(pool, target_table: str):
    """Stream rows from a generator into PostgreSQL COPY."""
    async def generate_rows():
        for i in range(1_000_000):
            yield (i, f"user_{i}", f"user{i}@example.com")

    async with pool.acquire() as conn:
        await conn.copy_records_to_table(
            target_table,
            records=generate_rows(),
            columns=["id", "name", "email"],
        )

LISTEN/NOTIFY for Real-Time Events

PostgreSQL's LISTEN/NOTIFY mechanism provides a lightweight pub/sub system within the database. asyncpg can listen on channels and receive notifications asynchronously, making it useful for cache invalidation, cross-process signalling, or triggering background jobs without an external message broker.

import asyncio
import asyncpg

async def listen_for_events():
    """Subscribe to a PostgreSQL notification channel."""
    conn = await asyncpg.connect(DSN)

    def on_notification(connection, pid, channel, payload):
        print(f"Received on '{channel}' from PID {pid}: {payload}")

    await conn.add_listener("order_updates", on_notification)
    print("Listening for order_updates...")

    # Keep listening until cancelled
    try:
        await asyncio.sleep(3600)  # or use an asyncio.Event to signal shutdown
    finally:
        await conn.remove_listener("order_updates", on_notification)
        await conn.close()

async def send_notification(pool, channel: str, payload: str):
    """Notify all listeners on a channel."""
    async with pool.acquire() as conn:
        await conn.execute(f"NOTIFY {channel}, $1", payload)

# Trigger-based notifications — define in PostgreSQL:
# CREATE OR REPLACE FUNCTION notify_order_change() RETURNS trigger AS $$
# BEGIN
#   PERFORM pg_notify('order_updates', row_to_json(NEW)::text);
#   RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;

FastAPI Integration Pattern

The standard pattern is to create the pool during FastAPI's lifespan startup event and close it on shutdown. Inject the pool via a dependency function so that individual route handlers acquire connections only for the duration of their database work, returning them to the pool immediately after.

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
import asyncpg

_pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global _pool
    _pool = await asyncpg.create_pool(DSN, min_size=5, max_size=20)
    yield
    await _pool.close()

app = FastAPI(lifespan=lifespan)

async def get_pool() -> asyncpg.Pool:
    return _pool

@app.get("/users/{user_id}")
async def get_user(user_id: int, pool: asyncpg.Pool = Depends(get_pool)):
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, name, email FROM users WHERE id = $1", user_id
        )
    if not row:
        raise HTTPException(404, "User not found")
    return dict(row)

@app.post("/users")
async def create_user(
    name: str, email: str, pool: asyncpg.Pool = Depends(get_pool)
):
    async with pool.acquire() as conn:
        async with conn.transaction():
            existing = await conn.fetchval(
                "SELECT id FROM users WHERE email = $1", email
            )
            if existing:
                raise HTTPException(409, "Email already registered")
            user_id = await conn.fetchval(
                "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
                name, email
            )
    return {"id": user_id, "name": name, "email": email}
Performance tip: asyncpg with a pool of 10 connections can sustain over 50,000 simple SELECT queries per second on modern hardware. For I/O-bound FastAPI endpoints hitting the database, this is rarely the bottleneck — network latency and application logic dominate.