Python Context Managers: with Statement and contextlib

Context managers are Python's mechanism for deterministic resource management — they guarantee that cleanup code runs even when exceptions occur. The with statement is familiar from file handling, but context managers extend to database connections, locks, timers, temporary directories, mocked objects, and any operation with a setup/teardown lifecycle. The contextlib module provides utilities that turn ordinary generator functions into context managers, eliminating boilerplate class definitions.

The Context Manager Protocol

The with statement calls __enter__() on entry and __exit__(exc_type, exc_val, exc_tb) on exit — whether the block completed normally or raised an exception. If __exit__ returns a truthy value, the exception is suppressed; returning False or None lets it propagate. This protocol is simple but powerful: it separates the "what to do" from the "how to clean up."

class Timer:
    """Context manager that measures elapsed time."""
    import time

    def __enter__(self):
        self.start = __import__('time').perf_counter()
        return self  # value bound to 'as' variable

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = __import__('time').perf_counter() - self.start
        print(f"Elapsed: {self.elapsed:.3f}s")
        return False  # don't suppress exceptions

with Timer() as t:
    total = sum(range(10_000_000))
# Prints: "Elapsed: 0.412s"
print(t.elapsed)  # also accessible after the block

# Exception handling in __exit__
class SuppressErrors:
    def __init__(self, *exceptions):
        self.exceptions = exceptions

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type and issubclass(exc_type, self.exceptions):
            print(f"Suppressed: {exc_val}")
            return True  # suppress the exception
        return False

with SuppressErrors(FileNotFoundError, PermissionError):
    open("/nonexistent/file.txt")  # suppressed silently
print("Continued after suppressed error")

Class-Based Context Managers

Class-based context managers are the most flexible approach when you need to store state, support multiple exit paths, or expose methods for use inside the with block. They are the right choice for managing database connections, network sockets, and any resource that needs introspection or configuration during the block.

import sqlite3
import threading

class DatabaseConnection:
    """Context manager for SQLite with automatic transaction management."""

    def __init__(self, db_path: str, auto_commit: bool = True):
        self.db_path = db_path
        self.auto_commit = auto_commit
        self.conn = None

    def __enter__(self) -> sqlite3.Connection:
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None and self.auto_commit:
            self.conn.commit()
        else:
            self.conn.rollback()
        self.conn.close()
        return False

# Usage
with DatabaseConnection(":memory:") as conn:
    conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    conn.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
    # Auto-committed on exit

# Lock context manager for thread safety
class ReadWriteLock:
    """Context managers for readers-writer lock pattern."""

    def __init__(self):
        self._lock = threading.Lock()
        self._readers = 0
        self._read_lock = threading.Lock()

    def read(self):
        return self._ReadContext(self)

    def write(self):
        return self._WriteContext(self)

    class _ReadContext:
        def __init__(self, parent):
            self.parent = parent

        def __enter__(self):
            with self.parent._read_lock:
                self.parent._readers += 1
                if self.parent._readers == 1:
                    self.parent._lock.acquire()

        def __exit__(self, *args):
            with self.parent._read_lock:
                self.parent._readers -= 1
                if self.parent._readers == 0:
                    self.parent._lock.release()

    class _WriteContext:
        def __init__(self, parent):
            self.parent = parent

        def __enter__(self):
            self.parent._lock.acquire()

        def __exit__(self, *args):
            self.parent._lock.release()

contextlib.contextmanager

The @contextmanager decorator transforms a generator function into a context manager. Everything before yield is __enter__, everything after is __exit__. This eliminates the boilerplate of writing a class with two dunder methods. For most custom context managers, this is the preferred approach — it's concise, readable, and supports exception handling via try/finally.

from contextlib import contextmanager
import os
import tempfile
import shutil

@contextmanager
def temporary_directory():
    """Create a temp dir, yield it, then clean up."""
    tmpdir = tempfile.mkdtemp()
    try:
        yield tmpdir
    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)

with temporary_directory() as tmpdir:
    with open(os.path.join(tmpdir, "test.txt"), "w") as f:
        f.write("temporary content")
    # tmpdir is automatically deleted after the with block

@contextmanager
def change_directory(path: str):
    """Temporarily change working directory."""
    original = os.getcwd()
    try:
        os.chdir(path)
        yield path
    finally:
        os.chdir(original)

@contextmanager
def managed_transaction(conn):
    """Database transaction with automatic commit/rollback."""
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise  # re-raise after rollback

@contextmanager
def timer(label: str = ""):
    """Measure and print execution time."""
    import time
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label}: {elapsed:.4f}s")

with timer("matrix multiply"):
    result = [[sum(a*b for a,b in zip(row, col))
               for col in zip(*[[1,2],[3,4]])]
              for row in [[5,6],[7,8]]]

contextlib.ExitStack

ExitStack allows dynamically composing a variable number of context managers. It's the solution when you don't know at compile time how many resources you need to manage, or when you want to conditionally apply context managers. All registered contexts are cleaned up in LIFO order when the ExitStack exits.

from contextlib import ExitStack

# Open a variable number of files
def process_files(paths: list[str]):
    with ExitStack() as stack:
        files = [stack.enter_context(open(p)) for p in paths]
        for f in files:
            print(f.read())
    # All files closed here

# Conditional context managers
def connect(use_ssl: bool, use_compression: bool):
    with ExitStack() as stack:
        conn = stack.enter_context(DatabaseConnection(":memory:"))
        if use_ssl:
            stack.enter_context(ssl_context())
        if use_compression:
            stack.enter_context(compression_context())
        # Work with conn here

# Register cleanup callbacks (not context managers)
def risky_operation():
    with ExitStack() as stack:
        resource = acquire_resource()
        stack.callback(release_resource, resource)
        # Even if an exception occurs, release_resource is called

        another = acquire_another()
        stack.callback(release_another, another)
        # Cleanups run in reverse registration order

# Transfer ownership out of ExitStack
def create_managed_resource():
    with ExitStack() as stack:
        resource = stack.enter_context(open("data.txt"))
        # Transfer cleanup responsibility to caller
        new_stack = stack.pop_all()
        return resource, new_stack  # caller must close new_stack

contextlib.suppress and nullcontext

contextlib.suppress(*exceptions) is a clean way to selectively ignore specific exceptions without try/except boilerplate. contextlib.nullcontext() is a no-op context manager useful for optional context management — it lets you write a single code path that works with or without a context manager.

from contextlib import suppress, nullcontext

# suppress: cleaner than try/except/pass
with suppress(FileNotFoundError):
    os.remove("temp.txt")

with suppress(KeyError, AttributeError):
    value = config["missing"]["key"]["deep"]

# Equivalent but more verbose:
try:
    os.remove("temp.txt")
except FileNotFoundError:
    pass

# nullcontext: optional context managers
def process(data, lock=None):
    ctx = lock if lock is not None else nullcontext()
    with ctx:
        return [x * 2 for x in data]

# Without lock (single-threaded)
result = process([1, 2, 3])

# With lock (multi-threaded)
import threading
result = process([1, 2, 3], lock=threading.Lock())

# nullcontext as a no-op wrapper in tests
def run_with_profiler(profiler=None):
    ctx = profiler if profiler else nullcontext()
    with ctx:
        return expensive_computation()

Async Context Managers

Async context managers implement __aenter__ and __aexit__ (note the a prefix) and are used with async with. They are essential for async database connections, HTTP sessions (aiohttp), and any async resource that needs cleanup. The @asynccontextmanager decorator from contextlib creates them from async generator functions.

import asyncio
from contextlib import asynccontextmanager
import aiohttp

@asynccontextmanager
async def http_session():
    """Managed aiohttp session."""
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

@asynccontextmanager
async def async_timer(label: str):
    """Async timing context manager."""
    import time
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label}: {elapsed:.3f}s")

async def fetch_data(urls: list[str]) -> list[dict]:
    async with http_session() as session:
        async with async_timer("fetch"):
            tasks = [session.get(url) for url in urls]
            responses = await asyncio.gather(*tasks)
            return [await r.json() for r in responses]

# Class-based async context manager
class AsyncDatabasePool:
    def __init__(self, dsn: str, min_size: int = 2, max_size: int = 10):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None

    async def __aenter__(self):
        # self.pool = await asyncpg.create_pool(self.dsn, ...)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.pool:
            await self.pool.close()
        return False

    async def fetch(self, query: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

Real-World Patterns

Context managers shine in production code: managing database transactions, distributed locks, feature flags, and test fixtures. These patterns ensure resources are always released, transactions are always committed or rolled back, and test state is always restored — regardless of exceptions or early returns.

import redis
import time
from contextlib import contextmanager

@contextmanager
def redis_lock(client: redis.Redis, key: str, timeout: int = 10):
    """Distributed lock using Redis SET NX EX."""
    lock_key = f"lock:{key}"
    acquired = client.set(lock_key, "1", nx=True, ex=timeout)
    if not acquired:
        raise RuntimeError(f"Could not acquire lock: {lock_key}")
    try:
        yield
    finally:
        client.delete(lock_key)

# Use in concurrent workers
r = redis.Redis()
with redis_lock(r, "process-payments", timeout=30):
    # Only one worker processes payments at a time
    pass

# Retry context manager
@contextmanager
def retry(max_attempts: int = 3, delay: float = 1.0, exceptions=(Exception,)):
    """Retry the body up to max_attempts times."""
    last_exc = None
    for attempt in range(max_attempts):
        try:
            yield attempt
            return  # success
        except exceptions as e:
            last_exc = e
            if attempt < max_attempts - 1:
                time.sleep(delay * (2 ** attempt))  # exponential backoff
    raise last_exc

with retry(max_attempts=3, delay=0.5, exceptions=(ConnectionError,)) as attempt:
    print(f"Attempt {attempt + 1}")
    # connect_to_service()