SQLAlchemy ORM: Database Access in Python (2026)
Published June 6, 2026 • 13 min read
SQLAlchemy 2.0 brought a sweeping modernization: a fully typed declarative API, first-class async support, and a cleaner query interface that drops the legacy Query object in favor of select() statements. If you're still writing SQLAlchemy 1.x code or working with raw SQL, this guide shows you everything that changed and how to work with the modern API correctly — using a User/Post/Tag model as the running example.
Installation and Engine Setup
pip install sqlalchemy[asyncio] asyncpg psycopg2-binary alembic
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# ── Sync engine (scripts, background workers) ──
SYNC_URL = "postgresql+psycopg2://user:pass@localhost/blogdb"
sync_engine = create_engine(
SYNC_URL,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # reconnect on stale connections
echo=False,
)
SyncSession = sessionmaker(bind=sync_engine, expire_on_commit=False)
# ── Async engine (FastAPI, async web frameworks) ──
ASYNC_URL = "postgresql+asyncpg://user:pass@localhost/blogdb"
async_engine = create_async_engine(
ASYNC_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False
)
class Base(DeclarativeBase):
pass
expire_on_commit=False prevents SQLAlchemy from expiring all attributes after a commit, which would trigger lazy loads when you access the object afterward. This is almost always what you want in web applications.Declarative Models with Type Annotations
SQLAlchemy 2.0 uses Python type annotations via Mapped[T] to declare columns. The result is IDE-friendly, mypy-compatible model code:
# models.py
from datetime import datetime
from typing import List, Optional
from sqlalchemy import String, Text, ForeignKey, DateTime, Table, Column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from .database import Base
class User(Base):
__tablename__ = "users"
id : Mapped[int] = mapped_column(primary_key=True)
username : Mapped[str] = mapped_column(String(50), unique=True, index=True)
email : Mapped[str] = mapped_column(String(255), unique=True)
hashed_pw : Mapped[str] = mapped_column(String(255))
is_active : Mapped[bool] = mapped_column(default=True)
created_at : Mapped[datetime] = mapped_column(DateTime(timezone=True),
server_default=func.now())
# relationships
posts : Mapped[List["Post"]] = relationship(back_populates="author",
cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"User(id={self.id}, username={self.username!r})"
class Post(Base):
__tablename__ = "posts"
id : Mapped[int] = mapped_column(primary_key=True)
title : Mapped[str] = mapped_column(String(255))
body : Mapped[str] = mapped_column(Text)
published : Mapped[bool] = mapped_column(default=False)
author_id : Mapped[int] = mapped_column(ForeignKey("users.id"))
created_at : Mapped[datetime] = mapped_column(DateTime(timezone=True),
server_default=func.now())
updated_at : Mapped[Optional[datetime]]= mapped_column(DateTime(timezone=True),
onupdate=func.now())
author : Mapped["User"] = relationship(back_populates="posts")
tags : Mapped[List["Tag"]] = relationship(secondary="post_tags",
back_populates="posts")
One-to-Many and Many-to-Many Relationships
The association table for the Post ↔ Tag many-to-many is a plain Table object (no mapped class needed for a pure junction table):
# Association table — no ORM class needed
post_tags = Table(
"post_tags", Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)
class Tag(Base):
__tablename__ = "tags"
id : Mapped[int] = mapped_column(primary_key=True)
name : Mapped[str] = mapped_column(String(50), unique=True)
posts : Mapped[List["Post"]] = relationship(secondary="post_tags",
back_populates="tags")
created_at or role field), promote it to a full mapped class and use association_proxy for convenience access from the parent models.Working with the Sync Session
In SQLAlchemy 2.0, queries use select() statements instead of the legacy session.query():
# sync_queries.py
from sqlalchemy import select, update, delete
from sqlalchemy.orm import joinedload
from .database import SyncSession
from .models import User, Post
def create_user(username: str, email: str, hashed_pw: str) -> User:
with SyncSession() as session:
user = User(username=username, email=email, hashed_pw=hashed_pw)
session.add(user)
session.commit()
session.refresh(user)
return user
def get_user_with_posts(user_id: int) -> User | None:
with SyncSession() as session:
stmt = (
select(User)
.where(User.id == user_id)
.options(joinedload(User.posts))
)
return session.scalars(stmt).unique().first()
def publish_posts_by_author(author_id: int) -> int:
"""Bulk update — returns row count."""
with SyncSession() as session:
result = session.execute(
update(Post)
.where(Post.author_id == author_id, Post.published == False)
.values(published=True)
)
session.commit()
return result.rowcount
def delete_draft_posts(author_id: int) -> int:
with SyncSession() as session:
result = session.execute(
delete(Post).where(Post.author_id == author_id, Post.published == False)
)
session.commit()
return result.rowcount
Async Session with AsyncSession
The async session API mirrors the sync session but requires await. Use it in FastAPI or any asyncio-based web framework:
# async_queries.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from .database import AsyncSessionLocal
from .models import User, Post
async def get_users_paginated(offset: int = 0, limit: int = 20) -> list[User]:
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.is_active == True).offset(offset).limit(limit)
result = await session.scalars(stmt)
return result.all()
async def get_post_with_author_and_tags(post_id: int) -> Post | None:
async with AsyncSessionLocal() as session:
stmt = (
select(Post)
.where(Post.id == post_id)
.options(
selectinload(Post.author), # separate IN query — safe for async
selectinload(Post.tags),
)
)
result = await session.scalars(stmt)
return result.unique().first()
async def create_post(author_id: int, title: str, body: str) -> Post:
async with AsyncSessionLocal() as session:
post = Post(author_id=author_id, title=title, body=body)
session.add(post)
await session.commit()
await session.refresh(post)
return post
selectinload instead of joinedload for eager loading. joinedload works in async but produces complex SQL that can be harder to optimize. selectinload fires separate IN (id1, id2, ...) queries, which are fast and cache-friendly.Alembic Migrations
Alembic is the standard migration tool for SQLAlchemy. Initialize it once per project:
# Initialize Alembic in the project root
alembic init alembic
# Edit alembic.ini: set sqlalchemy.url
# sqlalchemy.url = postgresql+psycopg2://user:pass@localhost/blogdb
# Edit alembic/env.py to import your Base
# from app.database import Base
# target_metadata = Base.metadata
# alembic/env.py — key section
from app.database import Base
from app import models # noqa: ensure all models are imported before autogenerate
target_metadata = Base.metadata
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
# Autogenerate a migration from model changes
alembic revision --autogenerate -m "add tags table"
# Review the generated file in alembic/versions/
# Then apply it
alembic upgrade head
# Roll back one migration
alembic downgrade -1
# Show current revision
alembic current
op.rename_table() and op.alter_column().Loading Strategies: joinedload vs. selectinload vs. lazy
| Strategy | SQL Generated | Use When |
|---|---|---|
lazy="select" (default) | Separate SELECT per access | Accessing the relation on a small number of objects |
joinedload | LEFT OUTER JOIN | Sync code, loading one object with its relations |
selectinload | Separate IN(...) SELECT | Loading collections; required in async code |
subqueryload | Correlated subquery | Legacy; prefer selectinload in 2.0 |
noload | Nothing | Explicitly block accidental lazy loads |
from sqlalchemy.orm import joinedload, selectinload, noload
# Sync — joinedload for a single object's relations
stmt = select(User).options(joinedload(User.posts)).where(User.id == 1)
# Async or collections — selectinload
stmt = select(User).options(selectinload(User.posts).selectinload(Post.tags))
# Explicitly block lazy loading (raises error on access — good for testing)
stmt = select(Post).options(noload(Post.tags))
Connection Pool Configuration
For production PostgreSQL, tune the pool to match your workload and database's max_connections:
sync_engine = create_engine(
DATABASE_URL,
pool_size=5, # persistent connections kept open
max_overflow=10, # extra connections allowed beyond pool_size
pool_timeout=30, # seconds to wait for a connection before raising
pool_recycle=1800, # recycle connections after 30 min (avoids stale TCP)
pool_pre_ping=True, # test connection health before use
)
pool_size + max_overflow per process. If you run 4 Gunicorn workers, each creates its own pool — plan accordingly. A database with max_connections=100 and 4 workers × 15 connections = 60 connections total, leaving headroom for migrations and admin tools.Bulk Inserts
For inserting thousands of rows, avoid the ORM's per-object overhead:
from sqlalchemy import insert
# Method 1: session.bulk_insert_mappings (legacy but fast)
def bulk_create_posts_legacy(session, post_dicts: list[dict]) -> None:
session.bulk_insert_mappings(Post, post_dicts)
session.commit()
# Method 2: Core INSERT with executemany (SQLAlchemy 2.0 preferred)
def bulk_create_posts(session, post_dicts: list[dict]) -> None:
session.execute(insert(Post), post_dicts)
session.commit()
# Method 3: PostgreSQL COPY (fastest for very large datasets)
import csv, io
from sqlalchemy import text
def copy_posts_from_csv(session, rows: list[dict]) -> None:
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=['title', 'body', 'author_id', 'published'])
writer.writerows(rows)
buf.seek(0)
conn = session.connection().connection # raw psycopg2 connection
with conn.cursor() as cur:
cur.copy_expert("COPY posts (title,body,author_id,published) FROM STDIN CSV", buf)
session.commit()
| Method | 10k rows / sec (approx) | Notes |
|---|---|---|
ORM session.add() loop | ~500 | Convenient; slow at scale |
bulk_insert_mappings | ~5,000 | Skips ORM events, faster |
Core INSERT executemany | ~8,000 | Preferred in 2.0 |
| PostgreSQL COPY | ~50,000+ | Fastest; PostgreSQL-only |
Frequently Asked Questions
- What is the difference between
Sessionandscoped_session? - A plain
Sessionis not thread-safe — you should create one per request or transaction.scoped_sessionwraps a session factory with a thread-local registry, so the same session is reused within a thread (useful in Flask with WSGI). In modern async frameworks, skipscoped_sessionentirely and rely on dependency injection to scopeAsyncSessionto the request. - How do I use SQLAlchemy with FastAPI?
- Create a dependency that yields an
AsyncSessionand closes it after the request. UseDepends(get_db)in route handlers. Never share anAsyncSessionacross requests — each request gets its own session from the dependency. - When should I use the ORM vs. Core?
- Use the ORM for typical CRUD and domain logic — it provides unit-of-work tracking, relationships, and a clean Python interface. Use Core (raw
select(),insert(), etc.) for bulk operations, complex analytical queries, or when you need maximum performance with minimal overhead. Both are available in the same application. - How do I handle database errors and rollbacks?
- SQLAlchemy sessions act as context managers — when an exception is raised inside a
with Session() as sessionblock, the session is automatically rolled back on exit. For explicit control, callsession.rollback()inside anexceptblock. Never silently swallowIntegrityError— always roll back first, then decide whether to retry or re-raise. - How do I migrate from SQLAlchemy 1.4 to 2.0?
- Enable
SQLALCHEMY_WARN_20=1in 1.4 to surface deprecation warnings. The biggest breaking changes are: replacesession.query(Model)withselect(Model); replace.first()/.all()on query objects withsession.scalars(stmt).first(); updaterelationship()to useMapped[...]type annotations. The official migration guide covers every case.