Python Alembic: Database Migrations with SQLAlchemy

Alembic is the database migration tool for SQLAlchemy. It tracks schema changes in version-controlled migration files, enabling teams to evolve database schemas safely across environments — development, staging, and production — without losing data. Alembic's autogenerate feature compares your SQLAlchemy models against the live database and generates migration scripts automatically, reducing manual SQL writing to edge cases. This guide covers setup through production-safe zero-downtime migrations.

Setup and Configuration

Alembic is initialized with alembic init, which creates the alembic/ directory with configuration files. The most important configuration file is env.py — it controls how Alembic connects to the database and discovers your SQLAlchemy models. The alembic.ini file sets the database URL and migration directory location.

pip install alembic sqlalchemy psycopg2-binary

# Initialize Alembic in your project
alembic init alembic

# Directory structure created:
# alembic/
#   env.py            ← Core configuration
#   script.py.mako    ← Migration file template
#   versions/         ← Individual migration files
# alembic.ini         ← Main config (database URL)
# alembic.ini
[alembic]
# Database URL — use env variable in production
script_location = alembic
sqlalchemy.url = postgresql+psycopg2://user:password@localhost/techoral_db

# Logging
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
# alembic/env.py — configure to use your models and env var for URL
import os
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig

# Import your SQLAlchemy Base so autogenerate can detect changes
from app.models import Base   # adjust import to your project

config = context.config

# Override URL from environment variable
db_url = os.environ.get("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
config.set_main_option("sqlalchemy.url", db_url)

# Set target metadata for autogenerate
target_metadata = Base.metadata

fileConfig(config.config_file_name)

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

SQLAlchemy Models and Base

Alembic's autogenerate works by comparing your SQLAlchemy model definitions against the live database schema. All models must use the same Base metadata object, and that Base must be imported in env.py before running migrations. Mixins for common fields (timestamps, soft delete) keep models DRY.

# app/models.py
from datetime import datetime, timezone
from sqlalchemy import Column, String, Integer, Boolean, Text, ForeignKey, DateTime, Index
from sqlalchemy.orm import DeclarativeBase, relationship, Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID
import uuid

class Base(DeclarativeBase):
    pass

# Mixin for common timestamp fields
class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        nullable=False,
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
        nullable=False,
    )

class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[str] = mapped_column(
        UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid.uuid4())
    )
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    password_hash: Mapped[str | None] = mapped_column(String(255), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    role: Mapped[str] = mapped_column(String(50), default="viewer", nullable=False)

    posts: Mapped[list["Post"]] = relationship("Post", back_populates="author")

    __table_args__ = (
        Index("ix_users_email_active", "email", "is_active"),
    )

class Post(TimestampMixin, Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(255), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    author_id: Mapped[str] = mapped_column(
        UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
    )

    author: Mapped["User"] = relationship("User", back_populates="posts")

    __table_args__ = (
        Index("ix_posts_author_published", "author_id", "published"),
    )

Autogenerate Migrations

Alembic's autogenerate inspects your SQLAlchemy models and the live database, then generates the SQL diff. It handles most schema changes automatically: adding/dropping tables and columns, changing column types, adding/removing indexes and unique constraints, and foreign key changes. Some operations require manual intervention: stored procedures, check constraints on certain databases, and column renames (which autogenerate sees as drop+add).

# Generate a migration by comparing models to database
alembic revision --autogenerate -m "add users and posts tables"

# Review the generated migration file before applying
cat alembic/versions/xxxx_add_users_and_posts_tables.py

# Apply pending migrations
alembic upgrade head

# Apply up to a specific revision
alembic upgrade +2          # 2 steps forward
alembic upgrade abc123      # to specific revision

# Rollback
alembic downgrade -1        # 1 step back
alembic downgrade base      # back to empty schema

# Check current state
alembic current             # current revision
alembic history             # all revisions
alembic history --verbose   # with details
alembic show head           # show latest revision

# Generate SQL without applying (for DBA review)
alembic upgrade head --sql > migration.sql
# Generated migration file: alembic/versions/001_add_users_and_posts.py
"""add users and posts tables

Revision ID: 001abc123def
Revises:
Create Date: 2026-06-16 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

revision = "001abc123def"
down_revision = None  # first migration
branch_labels = None
depends_on = None

def upgrade():
    op.create_table(
        "users",
        sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False),
        sa.Column("email", sa.String(255), nullable=False),
        sa.Column("name", sa.String(100), nullable=False),
        sa.Column("password_hash", sa.String(255), nullable=True),
        sa.Column("is_active", sa.Boolean(), nullable=False, server_default="true"),
        sa.Column("role", sa.String(50), nullable=False, server_default="viewer"),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False,
                  server_default=sa.text("now()")),
        sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False,
                  server_default=sa.text("now()")),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("email"),
    )
    op.create_index("ix_users_email_active", "users", ["email", "is_active"])
    op.create_index("ix_users_email", "users", ["email"])
    # ... posts table follows

def downgrade():
    op.drop_index("ix_users_email_active", table_name="users")
    op.drop_index("ix_users_email", table_name="users")
    op.drop_table("users")

Manual Migration Scripts

Autogenerate doesn't handle every case. Column renames, adding non-nullable columns to existing tables, and changing server defaults all need manual migration scripts. Alembic provides op.execute() for raw SQL and a full set of schema operations for DDL changes.

# Manual migration: rename column + add non-nullable column with default
"""rename username to name, add bio column

Revision ID: 002def456ghi
Revises: 001abc123def
"""
from alembic import op
import sqlalchemy as sa

def upgrade():
    # Rename column (autogenerate would see this as drop+add)
    op.alter_column("users", "username", new_column_name="name")

    # Add non-nullable column: must provide default for existing rows
    op.add_column("users", sa.Column("bio", sa.Text(), nullable=True))
    # Populate bio from existing data
    op.execute("UPDATE users SET bio = '' WHERE bio IS NULL")
    # Now make it non-nullable (optional)
    # op.alter_column("users", "bio", nullable=False)

    # Add check constraint
    op.create_check_constraint(
        "ck_users_role_valid",
        "users",
        "role IN ('admin', 'editor', 'viewer')",
    )

    # Add partial index (not possible via autogenerate)
    op.execute("""
        CREATE INDEX CONCURRENTLY ix_users_active_email
        ON users (email)
        WHERE is_active = true
    """)

def downgrade():
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_users_active_email")
    op.drop_constraint("ck_users_role_valid", "users")
    op.drop_column("users", "bio")
    op.alter_column("users", "name", new_column_name="username")

Data Migrations

Data migrations transform existing data during deployment — splitting one column into two, normalizing values, computing derived fields, or migrating data between tables. Alembic supports mixing DDL and DML in the same migration. Use batched operations for large tables to avoid locking the entire table for minutes.

# Data migration: split full_name into first_name + last_name
"""split full_name into first_name and last_name

Revision ID: 003ghi789jkl
Revises: 002def456ghi
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, text

def upgrade():
    # Step 1: Add new columns (nullable initially)
    op.add_column("users", sa.Column("first_name", sa.String(50), nullable=True))
    op.add_column("users", sa.Column("last_name", sa.String(50), nullable=True))

    # Step 2: Migrate data in batches (avoid full table lock)
    connection = op.get_bind()
    users_table = table("users",
        column("id", sa.String),
        column("full_name", sa.String),
        column("first_name", sa.String),
        column("last_name", sa.String),
    )

    # Process in batches of 1000
    offset = 0
    batch_size = 1000
    while True:
        rows = connection.execute(
            sa.select(users_table.c.id, users_table.c.full_name)
            .limit(batch_size).offset(offset)
        ).fetchall()
        if not rows:
            break
        for row in rows:
            parts = (row.full_name or "").split(" ", 1)
            connection.execute(
                users_table.update()
                .where(users_table.c.id == row.id)
                .values(
                    first_name=parts[0],
                    last_name=parts[1] if len(parts) > 1 else "",
                )
            )
        offset += batch_size
        print(f"Migrated {offset} rows...")

    # Step 3: Make non-nullable after data is populated
    op.alter_column("users", "first_name", nullable=False, server_default="")
    op.alter_column("users", "last_name", nullable=False, server_default="")

    # Step 4: Remove old column (keep if rollback might be needed)
    # op.drop_column("users", "full_name")

def downgrade():
    op.add_column("users", sa.Column("full_name", sa.String(200), nullable=True))
    connection = op.get_bind()
    connection.execute(text(
        "UPDATE users SET full_name = TRIM(first_name || ' ' || last_name)"
    ))
    op.drop_column("users", "first_name")
    op.drop_column("users", "last_name")

Async SQLAlchemy Support

For async FastAPI applications using asyncpg or aiosqlite, Alembic's env.py needs to use the async engine. Alembic itself runs synchronously, but it can use the async engine with an adapter pattern introduced in SQLAlchemy 1.4+.

# alembic/env.py — async version
import asyncio
import os
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from app.models import Base

config = context.config
target_metadata = Base.metadata

DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/db")

def run_migrations_offline():
    context.configure(url=DATABASE_URL, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_migrations_online():
    engine = create_async_engine(DATABASE_URL)
    async with engine.begin() as conn:
        await conn.run_sync(do_run_migrations)
    await engine.dispose()

if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())

# Run migrations on app startup (in lifespan)
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Run pending migrations before the app accepts requests
    from alembic.config import Config
    from alembic import command
    alembic_cfg = Config("alembic.ini")
    command.upgrade(alembic_cfg, "head")
    yield

app = FastAPI(lifespan=lifespan)

Zero-Downtime Production Migrations

Large table schema changes can lock tables for minutes, causing downtime. The Expand-Contract pattern (also called Blue-Green migrations) applies changes safely: add the new column/table first (Expand), deploy code that writes to both old and new, backfill data, then remove the old column in a later deployment (Contract). This works for column renames, type changes, and table restructuring.

# Zero-downtime column rename: 3-step process

# Migration 1 — EXPAND: add new column (old column kept)
def upgrade():
    op.add_column("users", sa.Column("display_name", sa.String(100), nullable=True))
    # Don't remove 'name' yet — old code still reads/writes it
    # Backfill in batches:
    op.execute("UPDATE users SET display_name = name WHERE display_name IS NULL")

# Deploy code that writes to BOTH 'name' AND 'display_name'
# Monitor for a few hours/days

# Migration 2 — CONTRACT: remove old column (in next deployment)
def upgrade():
    op.alter_column("users", "display_name", nullable=False, server_default="")
    op.drop_column("users", "name")

# Adding index without locking (PostgreSQL CONCURRENTLY)
def upgrade():
    op.execute("CREATE INDEX CONCURRENTLY ix_posts_created_at ON posts (created_at)")

def downgrade():
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_posts_created_at")

# CI/CD integration — run migrations before deploying new app version
"""
# Dockerfile entrypoint / deploy script:
#!/bin/bash
set -e
echo "Running database migrations..."
alembic upgrade head
echo "Migrations complete. Starting app..."
exec uvicorn app.main:app --host 0.0.0.0 --port 8000
"""
Never use alembic downgrade base in production. Instead, write a corrective migration that safely reverts the change without data loss. Always test migrations on a production-sized data copy before deploying. Use alembic upgrade head --sql to generate the SQL for DBA review on sensitive migrations.