Python Context Managers: with Statement and contextlib
Context managers are Python's mechanism for deterministic resource management — they guarantee that cleanup code runs even when exceptions occur. The with statement is familiar from file handling, but context managers extend to database connections, locks, timers, temporary directories, mocked objects, and any operation with a setup/teardown lifecycle. The contextlib module provides utilities that turn ordinary generator functions into context managers, eliminating boilerplate class definitions.
Table of Contents
The Context Manager Protocol
The with statement calls __enter__() on entry and __exit__(exc_type, exc_val, exc_tb) on exit — whether the block completed normally or raised an exception. If __exit__ returns a truthy value, the exception is suppressed; returning False or None lets it propagate. This protocol is simple but powerful: it separates the "what to do" from the "how to clean up."
class Timer:
"""Context manager that measures elapsed time."""
import time
def __enter__(self):
self.start = __import__('time').perf_counter()
return self # value bound to 'as' variable
def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = __import__('time').perf_counter() - self.start
print(f"Elapsed: {self.elapsed:.3f}s")
return False # don't suppress exceptions
with Timer() as t:
total = sum(range(10_000_000))
# Prints: "Elapsed: 0.412s"
print(t.elapsed) # also accessible after the block
# Exception handling in __exit__
class SuppressErrors:
def __init__(self, *exceptions):
self.exceptions = exceptions
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type and issubclass(exc_type, self.exceptions):
print(f"Suppressed: {exc_val}")
return True # suppress the exception
return False
with SuppressErrors(FileNotFoundError, PermissionError):
open("/nonexistent/file.txt") # suppressed silently
print("Continued after suppressed error")
Class-Based Context Managers
Class-based context managers are the most flexible approach when you need to store state, support multiple exit paths, or expose methods for use inside the with block. They are the right choice for managing database connections, network sockets, and any resource that needs introspection or configuration during the block.
import sqlite3
import threading
class DatabaseConnection:
"""Context manager for SQLite with automatic transaction management."""
def __init__(self, db_path: str, auto_commit: bool = True):
self.db_path = db_path
self.auto_commit = auto_commit
self.conn = None
def __enter__(self) -> sqlite3.Connection:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None and self.auto_commit:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()
return False
# Usage
with DatabaseConnection(":memory:") as conn:
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
conn.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
# Auto-committed on exit
# Lock context manager for thread safety
class ReadWriteLock:
"""Context managers for readers-writer lock pattern."""
def __init__(self):
self._lock = threading.Lock()
self._readers = 0
self._read_lock = threading.Lock()
def read(self):
return self._ReadContext(self)
def write(self):
return self._WriteContext(self)
class _ReadContext:
def __init__(self, parent):
self.parent = parent
def __enter__(self):
with self.parent._read_lock:
self.parent._readers += 1
if self.parent._readers == 1:
self.parent._lock.acquire()
def __exit__(self, *args):
with self.parent._read_lock:
self.parent._readers -= 1
if self.parent._readers == 0:
self.parent._lock.release()
class _WriteContext:
def __init__(self, parent):
self.parent = parent
def __enter__(self):
self.parent._lock.acquire()
def __exit__(self, *args):
self.parent._lock.release()
contextlib.contextmanager
The @contextmanager decorator transforms a generator function into a context manager. Everything before yield is __enter__, everything after is __exit__. This eliminates the boilerplate of writing a class with two dunder methods. For most custom context managers, this is the preferred approach — it's concise, readable, and supports exception handling via try/finally.
from contextlib import contextmanager
import os
import tempfile
import shutil
@contextmanager
def temporary_directory():
"""Create a temp dir, yield it, then clean up."""
tmpdir = tempfile.mkdtemp()
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
with temporary_directory() as tmpdir:
with open(os.path.join(tmpdir, "test.txt"), "w") as f:
f.write("temporary content")
# tmpdir is automatically deleted after the with block
@contextmanager
def change_directory(path: str):
"""Temporarily change working directory."""
original = os.getcwd()
try:
os.chdir(path)
yield path
finally:
os.chdir(original)
@contextmanager
def managed_transaction(conn):
"""Database transaction with automatic commit/rollback."""
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise # re-raise after rollback
@contextmanager
def timer(label: str = ""):
"""Measure and print execution time."""
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.4f}s")
with timer("matrix multiply"):
result = [[sum(a*b for a,b in zip(row, col))
for col in zip(*[[1,2],[3,4]])]
for row in [[5,6],[7,8]]]
contextlib.ExitStack
ExitStack allows dynamically composing a variable number of context managers. It's the solution when you don't know at compile time how many resources you need to manage, or when you want to conditionally apply context managers. All registered contexts are cleaned up in LIFO order when the ExitStack exits.
from contextlib import ExitStack
# Open a variable number of files
def process_files(paths: list[str]):
with ExitStack() as stack:
files = [stack.enter_context(open(p)) for p in paths]
for f in files:
print(f.read())
# All files closed here
# Conditional context managers
def connect(use_ssl: bool, use_compression: bool):
with ExitStack() as stack:
conn = stack.enter_context(DatabaseConnection(":memory:"))
if use_ssl:
stack.enter_context(ssl_context())
if use_compression:
stack.enter_context(compression_context())
# Work with conn here
# Register cleanup callbacks (not context managers)
def risky_operation():
with ExitStack() as stack:
resource = acquire_resource()
stack.callback(release_resource, resource)
# Even if an exception occurs, release_resource is called
another = acquire_another()
stack.callback(release_another, another)
# Cleanups run in reverse registration order
# Transfer ownership out of ExitStack
def create_managed_resource():
with ExitStack() as stack:
resource = stack.enter_context(open("data.txt"))
# Transfer cleanup responsibility to caller
new_stack = stack.pop_all()
return resource, new_stack # caller must close new_stack
contextlib.suppress and nullcontext
contextlib.suppress(*exceptions) is a clean way to selectively ignore specific exceptions without try/except boilerplate. contextlib.nullcontext() is a no-op context manager useful for optional context management — it lets you write a single code path that works with or without a context manager.
from contextlib import suppress, nullcontext
# suppress: cleaner than try/except/pass
with suppress(FileNotFoundError):
os.remove("temp.txt")
with suppress(KeyError, AttributeError):
value = config["missing"]["key"]["deep"]
# Equivalent but more verbose:
try:
os.remove("temp.txt")
except FileNotFoundError:
pass
# nullcontext: optional context managers
def process(data, lock=None):
ctx = lock if lock is not None else nullcontext()
with ctx:
return [x * 2 for x in data]
# Without lock (single-threaded)
result = process([1, 2, 3])
# With lock (multi-threaded)
import threading
result = process([1, 2, 3], lock=threading.Lock())
# nullcontext as a no-op wrapper in tests
def run_with_profiler(profiler=None):
ctx = profiler if profiler else nullcontext()
with ctx:
return expensive_computation()
Async Context Managers
Async context managers implement __aenter__ and __aexit__ (note the a prefix) and are used with async with. They are essential for async database connections, HTTP sessions (aiohttp), and any async resource that needs cleanup. The @asynccontextmanager decorator from contextlib creates them from async generator functions.
import asyncio
from contextlib import asynccontextmanager
import aiohttp
@asynccontextmanager
async def http_session():
"""Managed aiohttp session."""
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
@asynccontextmanager
async def async_timer(label: str):
"""Async timing context manager."""
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.3f}s")
async def fetch_data(urls: list[str]) -> list[dict]:
async with http_session() as session:
async with async_timer("fetch"):
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# Class-based async context manager
class AsyncDatabasePool:
def __init__(self, dsn: str, min_size: int = 2, max_size: int = 10):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def __aenter__(self):
# self.pool = await asyncpg.create_pool(self.dsn, ...)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
await self.pool.close()
return False
async def fetch(self, query: str, *args):
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
Real-World Patterns
Context managers shine in production code: managing database transactions, distributed locks, feature flags, and test fixtures. These patterns ensure resources are always released, transactions are always committed or rolled back, and test state is always restored — regardless of exceptions or early returns.
import redis
import time
from contextlib import contextmanager
@contextmanager
def redis_lock(client: redis.Redis, key: str, timeout: int = 10):
"""Distributed lock using Redis SET NX EX."""
lock_key = f"lock:{key}"
acquired = client.set(lock_key, "1", nx=True, ex=timeout)
if not acquired:
raise RuntimeError(f"Could not acquire lock: {lock_key}")
try:
yield
finally:
client.delete(lock_key)
# Use in concurrent workers
r = redis.Redis()
with redis_lock(r, "process-payments", timeout=30):
# Only one worker processes payments at a time
pass
# Retry context manager
@contextmanager
def retry(max_attempts: int = 3, delay: float = 1.0, exceptions=(Exception,)):
"""Retry the body up to max_attempts times."""
last_exc = None
for attempt in range(max_attempts):
try:
yield attempt
return # success
except exceptions as e:
last_exc = e
if attempt < max_attempts - 1:
time.sleep(delay * (2 ** attempt)) # exponential backoff
raise last_exc
with retry(max_attempts=3, delay=0.5, exceptions=(ConnectionError,)) as attempt:
print(f"Attempt {attempt + 1}")
# connect_to_service()