Python Alembic: Database Migrations with SQLAlchemy

Alembic is the migration tool for SQLAlchemy — it tracks schema changes, generates migration scripts, and applies them in order across environments. Whether you're adding a column, renaming a table, or seeding reference data, Alembic gives you a reproducible, version-controlled migration history. This guide covers setup, autogenerate, data migrations, async support, and integrating migrations into CI/CD pipelines.

Setup and Initialization

pip install alembic sqlalchemy psycopg2-binary

# Initialize Alembic in your project
alembic init alembic

# Project structure after init:
# alembic/
#   env.py          — migration environment configuration
#   script.py.mako  — migration file template
#   versions/       — generated migration files
# alembic.ini       — Alembic configuration

Edit alembic.ini to point at your database, then configure env.py to import your models so autogenerate can detect schema changes:

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os

# Import your models so autogenerate can compare against them
from myapp.models import Base  # your DeclarativeBase

config = context.config

# Read DATABASE_URL from environment (don't hardcode credentials)
config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Autogenerate Migrations

Alembic's autogenerate compares your SQLAlchemy models against the current database schema and generates the migration script. It detects added/removed tables, added/removed columns, type changes, and index changes.

# myapp/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, Boolean, Integer, ForeignKey
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

class Post(Base):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(nullable=False)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
# Generate migration from current model state vs database
alembic revision --autogenerate -m "create users and posts tables"

# Review the generated file in alembic/versions/
# Always review before applying — autogenerate is not perfect

# Apply all pending migrations
alembic upgrade head

# Roll back one migration
alembic downgrade -1

# Show migration history
alembic history --verbose

# Show current revision
alembic current

Writing Migrations Manually

Some changes cannot be autogenerated: renaming columns/tables, changing column types on databases without ALTER COLUMN support, or adding partial indexes. Write these manually.

# alembic/versions/abc123_add_profile_fields.py
"""add profile fields to users

Revision ID: abc123
Revises: prev_rev_id
Create Date: 2026-06-13
"""
from alembic import op
import sqlalchemy as sa

revision = "abc123"
down_revision = "prev_rev_id"
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add nullable column first (safe for large tables)
    op.add_column("users", sa.Column("bio", sa.Text(), nullable=True))
    op.add_column("users", sa.Column("avatar_url", sa.String(500), nullable=True))

    # Add index
    op.create_index("ix_users_email_lower", "users",
        [sa.text("lower(email)")], unique=True)

    # Rename column (PostgreSQL supports this natively)
    op.alter_column("users", "is_active", new_column_name="active")

def downgrade() -> None:
    op.alter_column("users", "active", new_column_name="is_active")
    op.drop_index("ix_users_email_lower", table_name="users")
    op.drop_column("users", "avatar_url")
    op.drop_column("users", "bio")

Data Migrations

Data migrations transform existing data alongside schema changes. Use SQLAlchemy's op.get_bind() to run SQL within a migration, or use op.bulk_insert for seeding.

# alembic/versions/def456_split_name_column.py
"""split full_name into first_name and last_name"""
from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    # 1. Add new columns
    op.add_column("users", sa.Column("first_name", sa.String(100)))
    op.add_column("users", sa.Column("last_name", sa.String(100)))

    # 2. Migrate data
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE users
        SET first_name = split_part(full_name, ' ', 1),
            last_name  = split_part(full_name, ' ', 2)
        WHERE full_name IS NOT NULL
    """))

    # 3. Make columns NOT NULL after backfill
    op.alter_column("users", "first_name", nullable=False, server_default="")
    op.alter_column("users", "last_name",  nullable=False, server_default="")

    # 4. Drop old column
    op.drop_column("users", "full_name")

def downgrade() -> None:
    op.add_column("users", sa.Column("full_name", sa.String(255)))
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE users SET full_name = trim(first_name || ' ' || last_name)
    """))
    op.drop_column("users", "last_name")
    op.drop_column("users", "first_name")

# Seed reference data
def seed_roles() -> None:
    roles_table = sa.table("roles",
        sa.column("id", sa.Integer),
        sa.column("name", sa.String),
    )
    op.bulk_insert(roles_table, [
        {"id": 1, "name": "admin"},
        {"id": 2, "name": "editor"},
        {"id": 3, "name": "viewer"},
    ])

Async SQLAlchemy Support

Alembic runs synchronously, but you can configure it to work with async SQLAlchemy engines using the run_sync pattern.

# alembic/env.py — async version
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncConnection
from alembic import context

DATABASE_URL = os.environ["DATABASE_URL"]
# Use async driver — e.g., postgresql+asyncpg://
ASYNC_URL = DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    engine = create_async_engine(ASYNC_URL, echo=True)
    async with engine.connect() as connection:
        # run_sync wraps the sync Alembic migration in the async context
        await connection.run_sync(do_run_migrations)
    await engine.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

Migration Branching

When multiple developers work on migrations simultaneously, Alembic creates branches. You must merge them before applying to production.

# Two developers create migrations simultaneously:
# dev1: revision abc123 — adds 'bio' column
# dev2: revision def456 — adds 'avatar_url' column

# Alembic detects multiple heads
alembic heads
# abc123 (head)
# def456 (head)

# Create a merge migration that depends on both
alembic merge -m "merge bio and avatar migrations" abc123 def456
# Creates: ghi789_merge_bio_and_avatar_migrations.py
# down_revision = ("abc123", "def456")

# Now there's a single head again
alembic upgrade head

CI/CD Integration

# .github/workflows/deploy.yml
- name: Run database migrations
  env:
    DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
  run: |
    # Check for unapplied migrations (fail CI if migrations are missing)
    alembic check
    # Apply pending migrations
    alembic upgrade head
# Run migrations programmatically at app startup (useful for Kubernetes)
from alembic.config import Config
from alembic import command

def run_migrations():
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])
    command.upgrade(alembic_cfg, "head")

# FastAPI lifespan
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app):
    run_migrations()  # run migrations before accepting traffic
    yield

app = FastAPI(lifespan=lifespan)
alembic check: Added in Alembic 1.9, alembic check exits with status code 1 if there are pending migrations not applied to the database. Use it in CI to catch forgotten migration files.

Frequently Asked Questions

Can I use Alembic without autogenerate?
Yes. Run alembic revision -m "my migration" (without --autogenerate) to create an empty migration file with just the revision metadata. Write your upgrade() and downgrade() functions manually.
How do I handle long-running migrations on large tables?
Add nullable columns without defaults first (instant on PostgreSQL), then backfill in batches, then add the NOT NULL constraint. Never add a NOT NULL column with a default in a single step on a large table — it rewrites the entire table and locks it.
What does Alembic use to track migration state?
Alembic creates an alembic_version table in your database with a single row containing the current revision ID. It compares this against the migration history chain to determine which migrations are pending.