SQLAlchemy ORM: Database Access in Python (2026)

Published June 6, 2026 • 13 min read

SQLAlchemy 2.0 brought a sweeping modernization: a fully typed declarative API, first-class async support, and a cleaner query interface that drops the legacy Query object in favor of select() statements. If you're still writing SQLAlchemy 1.x code or working with raw SQL, this guide shows you everything that changed and how to work with the modern API correctly — using a User/Post/Tag model as the running example.

Installation and Engine Setup

pip install sqlalchemy[asyncio] asyncpg psycopg2-binary alembic
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# ── Sync engine (scripts, background workers) ──
SYNC_URL = "postgresql+psycopg2://user:pass@localhost/blogdb"
sync_engine = create_engine(
    SYNC_URL,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,   # reconnect on stale connections
    echo=False,
)
SyncSession = sessionmaker(bind=sync_engine, expire_on_commit=False)

# ── Async engine (FastAPI, async web frameworks) ──
ASYNC_URL = "postgresql+asyncpg://user:pass@localhost/blogdb"
async_engine = create_async_engine(
    ASYNC_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    echo=False,
)
AsyncSessionLocal = sessionmaker(
    bind=async_engine, class_=AsyncSession, expire_on_commit=False
)

class Base(DeclarativeBase):
    pass
Note: expire_on_commit=False prevents SQLAlchemy from expiring all attributes after a commit, which would trigger lazy loads when you access the object afterward. This is almost always what you want in web applications.

Declarative Models with Type Annotations

SQLAlchemy 2.0 uses Python type annotations via Mapped[T] to declare columns. The result is IDE-friendly, mypy-compatible model code:

# models.py
from datetime import datetime
from typing import List, Optional
from sqlalchemy import String, Text, ForeignKey, DateTime, Table, Column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from .database import Base

class User(Base):
    __tablename__ = "users"

    id         : Mapped[int]           = mapped_column(primary_key=True)
    username   : Mapped[str]           = mapped_column(String(50), unique=True, index=True)
    email      : Mapped[str]           = mapped_column(String(255), unique=True)
    hashed_pw  : Mapped[str]           = mapped_column(String(255))
    is_active  : Mapped[bool]          = mapped_column(default=True)
    created_at : Mapped[datetime]      = mapped_column(DateTime(timezone=True),
                                                        server_default=func.now())
    # relationships
    posts      : Mapped[List["Post"]]  = relationship(back_populates="author",
                                                       cascade="all, delete-orphan")

    def __repr__(self) -> str:
        return f"User(id={self.id}, username={self.username!r})"

class Post(Base):
    __tablename__ = "posts"

    id         : Mapped[int]               = mapped_column(primary_key=True)
    title      : Mapped[str]               = mapped_column(String(255))
    body       : Mapped[str]               = mapped_column(Text)
    published  : Mapped[bool]              = mapped_column(default=False)
    author_id  : Mapped[int]               = mapped_column(ForeignKey("users.id"))
    created_at : Mapped[datetime]          = mapped_column(DateTime(timezone=True),
                                                            server_default=func.now())
    updated_at : Mapped[Optional[datetime]]= mapped_column(DateTime(timezone=True),
                                                            onupdate=func.now())
    author     : Mapped["User"]            = relationship(back_populates="posts")
    tags       : Mapped[List["Tag"]]       = relationship(secondary="post_tags",
                                                           back_populates="posts")

One-to-Many and Many-to-Many Relationships

The association table for the Post ↔ Tag many-to-many is a plain Table object (no mapped class needed for a pure junction table):

# Association table — no ORM class needed
post_tags = Table(
    "post_tags", Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id",  ForeignKey("tags.id"),  primary_key=True),
)

class Tag(Base):
    __tablename__ = "tags"

    id    : Mapped[int]           = mapped_column(primary_key=True)
    name  : Mapped[str]           = mapped_column(String(50), unique=True)
    posts : Mapped[List["Post"]]  = relationship(secondary="post_tags",
                                                  back_populates="tags")
Pro Tip: If the association table needs extra columns (e.g., a created_at or role field), promote it to a full mapped class and use association_proxy for convenience access from the parent models.

Working with the Sync Session

In SQLAlchemy 2.0, queries use select() statements instead of the legacy session.query():

# sync_queries.py
from sqlalchemy import select, update, delete
from sqlalchemy.orm import joinedload
from .database import SyncSession
from .models import User, Post

def create_user(username: str, email: str, hashed_pw: str) -> User:
    with SyncSession() as session:
        user = User(username=username, email=email, hashed_pw=hashed_pw)
        session.add(user)
        session.commit()
        session.refresh(user)
        return user

def get_user_with_posts(user_id: int) -> User | None:
    with SyncSession() as session:
        stmt = (
            select(User)
            .where(User.id == user_id)
            .options(joinedload(User.posts))
        )
        return session.scalars(stmt).unique().first()

def publish_posts_by_author(author_id: int) -> int:
    """Bulk update — returns row count."""
    with SyncSession() as session:
        result = session.execute(
            update(Post)
            .where(Post.author_id == author_id, Post.published == False)
            .values(published=True)
        )
        session.commit()
        return result.rowcount

def delete_draft_posts(author_id: int) -> int:
    with SyncSession() as session:
        result = session.execute(
            delete(Post).where(Post.author_id == author_id, Post.published == False)
        )
        session.commit()
        return result.rowcount

Async Session with AsyncSession

The async session API mirrors the sync session but requires await. Use it in FastAPI or any asyncio-based web framework:

# async_queries.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from .database import AsyncSessionLocal
from .models import User, Post

async def get_users_paginated(offset: int = 0, limit: int = 20) -> list[User]:
    async with AsyncSessionLocal() as session:
        stmt = select(User).where(User.is_active == True).offset(offset).limit(limit)
        result = await session.scalars(stmt)
        return result.all()

async def get_post_with_author_and_tags(post_id: int) -> Post | None:
    async with AsyncSessionLocal() as session:
        stmt = (
            select(Post)
            .where(Post.id == post_id)
            .options(
                selectinload(Post.author),   # separate IN query — safe for async
                selectinload(Post.tags),
            )
        )
        result = await session.scalars(stmt)
        return result.unique().first()

async def create_post(author_id: int, title: str, body: str) -> Post:
    async with AsyncSessionLocal() as session:
        post = Post(author_id=author_id, title=title, body=body)
        session.add(post)
        await session.commit()
        await session.refresh(post)
        return post
Note: With async SQLAlchemy, use selectinload instead of joinedload for eager loading. joinedload works in async but produces complex SQL that can be harder to optimize. selectinload fires separate IN (id1, id2, ...) queries, which are fast and cache-friendly.

Alembic Migrations

Alembic is the standard migration tool for SQLAlchemy. Initialize it once per project:

# Initialize Alembic in the project root
alembic init alembic

# Edit alembic.ini: set sqlalchemy.url
# sqlalchemy.url = postgresql+psycopg2://user:pass@localhost/blogdb

# Edit alembic/env.py to import your Base
# from app.database import Base
# target_metadata = Base.metadata
# alembic/env.py — key section
from app.database import Base
from app import models  # noqa: ensure all models are imported before autogenerate

target_metadata = Base.metadata

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()
# Autogenerate a migration from model changes
alembic revision --autogenerate -m "add tags table"

# Review the generated file in alembic/versions/
# Then apply it
alembic upgrade head

# Roll back one migration
alembic downgrade -1

# Show current revision
alembic current
Pro Tip: Always review autogenerated migrations before running them. Alembic cannot detect table or column renames — it will generate a drop + add, which destroys data. Rename operations must be written manually using op.rename_table() and op.alter_column().

Loading Strategies: joinedload vs. selectinload vs. lazy

StrategySQL GeneratedUse When
lazy="select" (default)Separate SELECT per accessAccessing the relation on a small number of objects
joinedloadLEFT OUTER JOINSync code, loading one object with its relations
selectinloadSeparate IN(...) SELECTLoading collections; required in async code
subqueryloadCorrelated subqueryLegacy; prefer selectinload in 2.0
noloadNothingExplicitly block accidental lazy loads
from sqlalchemy.orm import joinedload, selectinload, noload

# Sync — joinedload for a single object's relations
stmt = select(User).options(joinedload(User.posts)).where(User.id == 1)

# Async or collections — selectinload
stmt = select(User).options(selectinload(User.posts).selectinload(Post.tags))

# Explicitly block lazy loading (raises error on access — good for testing)
stmt = select(Post).options(noload(Post.tags))

Connection Pool Configuration

For production PostgreSQL, tune the pool to match your workload and database's max_connections:

sync_engine = create_engine(
    DATABASE_URL,
    pool_size=5,          # persistent connections kept open
    max_overflow=10,      # extra connections allowed beyond pool_size
    pool_timeout=30,      # seconds to wait for a connection before raising
    pool_recycle=1800,    # recycle connections after 30 min (avoids stale TCP)
    pool_pre_ping=True,   # test connection health before use
)
Note: Total connections = pool_size + max_overflow per process. If you run 4 Gunicorn workers, each creates its own pool — plan accordingly. A database with max_connections=100 and 4 workers × 15 connections = 60 connections total, leaving headroom for migrations and admin tools.

Bulk Inserts

For inserting thousands of rows, avoid the ORM's per-object overhead:

from sqlalchemy import insert

# Method 1: session.bulk_insert_mappings (legacy but fast)
def bulk_create_posts_legacy(session, post_dicts: list[dict]) -> None:
    session.bulk_insert_mappings(Post, post_dicts)
    session.commit()

# Method 2: Core INSERT with executemany (SQLAlchemy 2.0 preferred)
def bulk_create_posts(session, post_dicts: list[dict]) -> None:
    session.execute(insert(Post), post_dicts)
    session.commit()

# Method 3: PostgreSQL COPY (fastest for very large datasets)
import csv, io
from sqlalchemy import text

def copy_posts_from_csv(session, rows: list[dict]) -> None:
    buf = io.StringIO()
    writer = csv.DictWriter(buf, fieldnames=['title', 'body', 'author_id', 'published'])
    writer.writerows(rows)
    buf.seek(0)
    conn = session.connection().connection   # raw psycopg2 connection
    with conn.cursor() as cur:
        cur.copy_expert("COPY posts (title,body,author_id,published) FROM STDIN CSV", buf)
    session.commit()
Method10k rows / sec (approx)Notes
ORM session.add() loop~500Convenient; slow at scale
bulk_insert_mappings~5,000Skips ORM events, faster
Core INSERT executemany~8,000Preferred in 2.0
PostgreSQL COPY~50,000+Fastest; PostgreSQL-only

Frequently Asked Questions

What is the difference between Session and scoped_session?
A plain Session is not thread-safe — you should create one per request or transaction. scoped_session wraps a session factory with a thread-local registry, so the same session is reused within a thread (useful in Flask with WSGI). In modern async frameworks, skip scoped_session entirely and rely on dependency injection to scope AsyncSession to the request.
How do I use SQLAlchemy with FastAPI?
Create a dependency that yields an AsyncSession and closes it after the request. Use Depends(get_db) in route handlers. Never share an AsyncSession across requests — each request gets its own session from the dependency.
When should I use the ORM vs. Core?
Use the ORM for typical CRUD and domain logic — it provides unit-of-work tracking, relationships, and a clean Python interface. Use Core (raw select(), insert(), etc.) for bulk operations, complex analytical queries, or when you need maximum performance with minimal overhead. Both are available in the same application.
How do I handle database errors and rollbacks?
SQLAlchemy sessions act as context managers — when an exception is raised inside a with Session() as session block, the session is automatically rolled back on exit. For explicit control, call session.rollback() inside an except block. Never silently swallow IntegrityError — always roll back first, then decide whether to retry or re-raise.
How do I migrate from SQLAlchemy 1.4 to 2.0?
Enable SQLALCHEMY_WARN_20=1 in 1.4 to surface deprecation warnings. The biggest breaking changes are: replace session.query(Model) with select(Model); replace .first()/.all() on query objects with session.scalars(stmt).first(); update relationship() to use Mapped[...] type annotations. The official migration guide covers every case.