This guide covers the most frequently asked Python interview questions in 2026 — from language internals and OOP patterns to async programming, testing, and production system design.
In Python, everything is an object. Variables are names (references) pointing to objects on the heap — not memory slots holding values directly.
a = [1, 2, 3]
b = a # b points to THE SAME list object
b.append(4)
print(a) # [1, 2, 3, 4] — mutation visible through a
a = [1, 2, 3]
b = a[:] # b is a SHALLOW COPY — different list object
b.append(4)
print(a) # [1, 2, 3] — a unchanged
Reference counting — Python's primary GC mechanism. Each object has a ob_refcnt. When it hits 0, the object is immediately deallocated. Fast and deterministic for most cases.
Cyclic garbage collector — handles reference cycles (gc module). Runs periodically to detect and free groups of objects that only reference each other.
# Reference count:
import sys
a = []
sys.getrefcount(a) # 2 (one for 'a', one for getrefcount's argument)
# Small integers (-5 to 256) and interned strings are cached/singletons:
x = 256; y = 256; x is y # True (same object)
x = 257; y = 257; x is y # False (different objects)
The Global Interpreter Lock (GIL) is a mutex in CPython that allows only one thread to execute Python bytecode at a time, even on multi-core hardware.
Why it exists: CPython's memory management (reference counting) is not thread-safe. The GIL avoids data races on reference counts without per-object locking overhead.
# Threading does NOT parallelise CPU-bound Python code:
import threading
def count(n):
while n > 0: n -= 1
# Two threads on a 2-core machine: still runs sequentially due to GIL
t1 = threading.Thread(target=count, args=(50_000_000,))
t2 = threading.Thread(target=count, args=(50_000_000,))
t1.start(); t2.start(); t1.join(); t2.join()
# Slower than sequential! Context-switch overhead + GIL contention
# GIL IS released for:
# - I/O operations (socket, file read/write)
# - C extension code that explicitly releases it (numpy, pandas, etc.)
# - time.sleep()
# Solutions for CPU-bound parallelism:
# 1. multiprocessing (separate processes, each has own GIL)
# 2. concurrent.futures.ProcessPoolExecutor
# 3. C extensions / Cython that release GIL
# 4. Python 3.13+: optional free-threaded mode (no GIL) — experimental
threading or asyncio — GIL is released during I/O. CPU-bound tasks: use multiprocessing or ProcessPoolExecutor.| Immutable | Mutable |
|---|---|
| int, float, complex, bool | list, dict, set, bytearray |
| str, bytes, tuple, frozenset | Custom class instances (by default) |
# Immutable: reassignment creates a new object
s = "hello"
s += " world" # new str object; old "hello" unchanged
# Mutable: in-place modification
lst = [1, 2, 3]
lst.append(4) # same list object, new element
# Default mutable argument — classic gotcha:
def add_item(item, lst=[]): # lst created ONCE at function definition
lst.append(item)
return lst
add_item(1) # [1]
add_item(2) # [1, 2] ← unexpected!
# Fix: use None sentinel
def add_item(item, lst=None):
if lst is None:
lst = []
lst.append(item)
return lst
# Immutable as dict keys (hashability):
d = {(1,2): "point"} # tuple as key — OK (immutable, hashable)
d = {[1,2]: "point"} # TypeError: list as key — unhashable (mutable)
# List comprehension: builds entire list in memory
squares = [x**2 for x in range(10) if x % 2 == 0]
# [0, 4, 16, 36, 64]
# Dict comprehension:
word_lengths = {word: len(word) for word in ["hello", "world"]}
# {'hello': 5, 'world': 5}
# Set comprehension (unique values):
unique_mods = {x % 5 for x in range(20)}
# {0, 1, 2, 3, 4}
# Generator expression (lazy — one item at a time, no intermediate list):
total = sum(x**2 for x in range(1_000_000)) # memory-efficient
# Parentheses, not brackets
# When to use generator vs comprehension:
# Use generator: large/infinite sequences, only need to iterate once,
# memory is a concern (streaming processing)
# Use list: need to access elements multiple times, need len(),
# need to pass to code that expects a list
# Nested comprehension:
matrix = [[1,2,3],[4,5,6],[7,8,9]]
flat = [x for row in matrix for x in row]
# [1, 2, 3, 4, 5, 6, 7, 8, 9]
== and is?Easy# == : value equality (calls __eq__)
# is : identity equality (same object in memory, same id())
a = [1, 2, 3]
b = [1, 2, 3]
a == b # True (same values)
a is b # False (different objects)
c = a
a is c # True (same object)
# Use 'is' ONLY for:
# - Singleton comparisons: x is None, x is True, x is False
# - Sentinel objects
# Never use 'is' for strings/ints outside singletons — CPython caches
# some small integers and interned strings, but this is an implementation
# detail and must not be relied upon:
x = "hello world"
y = "hello world"
x == y # True (always)
x is y # Maybe True (CPython may intern) — DON'T rely on this
dict work internally?MediumPython's dict is a hash table. Since Python 3.7, dicts are also insertion-ordered (guaranteed, not just an implementation detail).
# Hash table mechanics:
# 1. hash(key) → integer hash
# 2. hash % table_size → slot index
# 3. Collision resolution: open addressing with pseudo-random probing
# Requirements for dict keys:
# - Must be hashable: implement __hash__ and __eq__
# - __hash__ must be consistent with __eq__
# Compact dict layout (Python 3.6+ CPython optimisation):
# Separate indices array (small, dense) + entries array (insertion-ordered)
# Reduces memory ~20-25% vs old dict
# Performance:
# Lookup: O(1) average, O(n) worst (many collisions — rare)
# Insert: O(1) amortised (resize doubles capacity at ~2/3 load factor)
# Delete: O(1)
# dict vs defaultdict vs Counter:
from collections import defaultdict, Counter
d = defaultdict(list)
d["key"].append(1) # no KeyError — default is empty list
c = Counter("mississippi")
# Counter({'i': 4, 's': 4, 'p': 2, 'm': 1})
c.most_common(2) # [('i', 4), ('s', 4)]
*args and **kwargs, and argument unpacking.Easy# *args: variable positional arguments → tuple
def greet(*names):
for name in names:
print(f"Hello, {name}")
greet("Alice", "Bob", "Carol")
# **kwargs: variable keyword arguments → dict
def configure(**settings):
for key, val in settings.items():
print(f"{key} = {val}")
configure(debug=True, timeout=30)
# Combined:
def func(required, *args, keyword_only, **kwargs):
pass # keyword_only must be passed as keyword (after *)
# Argument unpacking at call site:
coords = (3, 4)
distance = math.hypot(*coords) # same as math.hypot(3, 4)
config = {"debug": True, "timeout": 30}
configure(**config)
# Positional-only parameters (Python 3.8+, / separator):
def pos_only(a, b, /, c):
pass # a and b cannot be passed as keywords
__getitem__ work?Easy# Slice syntax: seq[start:stop:step]
lst = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
lst[2:7] # [2, 3, 4, 5, 6] (stop is exclusive)
lst[::2] # [0, 2, 4, 6, 8] (every other)
lst[::-1] # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] (reverse)
lst[-3:] # [7, 8, 9] (last 3)
lst[:5] # [0, 1, 2, 3, 4] (first 5)
# Slice object:
s = slice(2, 7, 1)
lst[s] # [2, 3, 4, 5, 6]
# Custom __getitem__:
class Matrix:
def __getitem__(self, key):
if isinstance(key, tuple):
row, col = key
return self._data[row][col]
return self._data[key]
m = Matrix()
m[0, 1] # row 0, col 1 — passes (0, 1) as key tuple
m[0:2] # passes slice(0, 2) as key
# Python's sort uses Timsort — O(n log n) worst case, O(n) best case (nearly sorted)
# Timsort is STABLE: equal elements preserve their original relative order
# list.sort() — in-place, returns None
numbers = [3, 1, 4, 1, 5]
numbers.sort() # [1, 1, 3, 4, 5]
numbers.sort(reverse=True) # [5, 4, 3, 1, 1]
# sorted() — returns new list, works on any iterable
sorted("hello") # ['e', 'h', 'l', 'l', 'o']
# key function:
words = ["banana", "apple", "kiwi", "cherry"]
sorted(words, key=len) # by length
sorted(words, key=str.lower) # case-insensitive
sorted(words, key=lambda w: (len(w), w))# primary: length, secondary: alpha
# Sort by attribute:
from operator import attrgetter, itemgetter
people.sort(key=attrgetter("age"))
rows.sort(key=itemgetter(1)) # sort list of tuples by index 1
# Stability example:
students = [("Alice", "B"), ("Bob", "A"), ("Carol", "B")]
students.sort(key=lambda s: s[1])
# [('Bob', 'A'), ('Alice', 'B'), ('Carol', 'B')]
# Alice before Carol maintained (original order, both grade B)
namedtuple, dataclass, and when do you use each?Medium# namedtuple: immutable, memory-efficient, tuple-compatible
from collections import namedtuple
Point = namedtuple("Point", ["x", "y"])
p = Point(3, 4)
p.x # 3
p[0] # 3 (index access still works)
p._asdict() # OrderedDict([('x', 3), ('y', 4)])
# dataclass (Python 3.7+): mutable by default, supports defaults, methods
from dataclasses import dataclass, field
@dataclass
class Order:
id: int
items: list = field(default_factory=list)
total: float = 0.0
def add_item(self, price: float):
self.items.append(price)
self.total += price
o = Order(id=1)
o.add_item(9.99)
# @dataclass(frozen=True) → immutable dataclass (hashable)
# @dataclass(order=True) → auto-generates __lt__, __le__, etc.
# Choose:
# namedtuple: simple value object, needs tuple compatibility, memory-critical
# dataclass: domain model with logic/defaults/mutability
# TypedDict: type annotations on dicts (for JSON-like structures)
# Pydantic: validation, serialisation, API request/response models
with statement and context managers?Medium# Context manager: guarantees setup and teardown (even on exception)
# Implements __enter__ and __exit__
# Classic use: file handling
with open("data.txt", "r") as f:
content = f.read()
# File automatically closed after block (even if exception raised)
# Custom context manager (class):
class Timer:
def __enter__(self):
import time
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self.start
return False # don't suppress exceptions
with Timer() as t:
expensive_operation()
print(f"Took {t.elapsed:.3f}s")
# Context manager via contextlib.contextmanager:
from contextlib import contextmanager
@contextmanager
def managed_resource():
resource = acquire_resource()
try:
yield resource # code inside 'with' block runs here
finally:
release_resource(resource) # always runs
# Multiple context managers:
with open("in.txt") as f_in, open("out.txt", "w") as f_out:
f_out.write(f_in.read())
# Exception hierarchy:
# BaseException
# SystemExit, KeyboardInterrupt, GeneratorExit ← don't catch with 'except Exception'
# Exception
# ValueError, TypeError, KeyError, IOError, ...
# Best practices:
# 1. Catch specific exceptions, not bare 'except:'
try:
value = int(user_input)
except ValueError as e:
print(f"Not a number: {e}")
# 2. except Exception as e: for unexpected errors (log + reraise)
except Exception:
logger.exception("Unexpected error") # logs full traceback
raise # re-raise original exception
# 3. else block: runs only if no exception raised
try:
result = risky_operation()
except NetworkError:
handle_network_error()
else:
process(result) # only if no exception
finally:
cleanup() # always runs
# 4. Custom exceptions for domain errors:
class InsufficientFundsError(ValueError):
def __init__(self, balance, amount):
super().__init__(f"Cannot withdraw {amount}, balance is {balance}")
self.balance = balance
self.amount = amount
# 5. Exception chaining:
try:
db.save(record)
except DatabaseError as e:
raise ServiceError("Failed to save order") from e # preserves original cause
# 6. ExceptionGroup (Python 3.11+):
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
except* ValueError as eg: # 'except*' handles ExceptionGroup
for exc in eg.exceptions: handle(exc)
# MRO: determines the order Python searches classes for a method
# Python uses C3 Linearisation algorithm
class A:
def greet(self): print("A")
class B(A):
def greet(self): print("B")
class C(A):
def greet(self): print("C")
class D(B, C): # Multiple inheritance
pass
D.__mro__
# (, , , , )
# D → B → C → A → object
D().greet() # prints "B" (first in MRO after D that has greet)
# super() follows MRO — not just "parent class":
class B(A):
def greet(self):
super().greet() # calls C.greet (next in MRO when called via D)
print("B")
class C(A):
def greet(self):
super().greet() # calls A.greet
print("C")
class D(B, C):
def greet(self):
super().greet() # calls B.greet (chain: B→C→A all called)
print("D")
D().greet() # prints: A C B D (cooperative multiple inheritance)
# A decorator is a callable that wraps another callable
# @decorator is syntactic sugar for: func = decorator(func)
import functools
def timer(func):
@functools.wraps(func) # preserves __name__, __doc__ of wrapped function
def wrapper(*args, **kwargs):
import time
start = time.perf_counter()
result = func(*args, **kwargs)
print(f"{func.__name__} took {time.perf_counter()-start:.3f}s")
return result
return wrapper
@timer
def slow_function(n):
"""Does something slow."""
return sum(range(n))
# Decorator with arguments (factory pattern):
def retry(max_attempts=3, exceptions=(Exception,)):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
print(f"Attempt {attempt+1} failed: {e}")
return wrapper
return decorator
@retry(max_attempts=5, exceptions=(NetworkError,))
def fetch_data(url):
return requests.get(url).json()
# Class-based decorator:
class Cached:
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args):
if args not in self.cache:
self.cache[args] = self.func(*args)
return self.cache[args]
@Cached
def fibonacci(n): ...
yield and send() work?Medium# Generator: a function that yields values lazily (one at a time)
# Preserves local state between yields — not re-executed from start
def fibonacci():
a, b = 0, 1
while True:
yield a # suspends here, returns a to caller
a, b = b, a+b # resumes here on next()
gen = fibonacci()
[next(gen) for _ in range(8)] # [0, 1, 1, 2, 3, 5, 8, 13]
# Generator pipeline (memory-efficient ETL):
def read_lines(filename):
with open(filename) as f:
for line in f:
yield line.strip()
def filter_empty(lines):
return (line for line in lines if line)
def parse_csv(lines):
for line in lines:
yield line.split(",")
# Composable pipeline — file never fully loaded in memory:
rows = parse_csv(filter_empty(read_lines("huge.csv")))
# send() — pass values INTO a generator (coroutine pattern):
def running_average():
total = count = 0
average = None
while True:
value = yield average # yield sends average out; receives next value via send()
total += value
count += 1
average = total / count
avg = running_average()
next(avg) # prime the generator (advance to first yield)
avg.send(10) # 10.0
avg.send(20) # 15.0
avg.send(30) # 20.0
classmethod, staticmethod, and property?Medium# @classmethod: receives class (cls) as first arg — can access/modify class state
# @staticmethod: receives no implicit first arg — utility function in class namespace
# @property: getter/setter/deleter for managed attributes
class Circle:
_instances = 0
def __init__(self, radius):
self.radius = radius
Circle._instances += 1
@classmethod
def from_diameter(cls, diameter):
return cls(diameter / 2) # alternative constructor
@classmethod
def instance_count(cls):
return cls._instances
@staticmethod
def validate_radius(radius):
if radius <= 0:
raise ValueError("Radius must be positive")
@property
def area(self):
import math
return math.pi * self.radius ** 2
@property
def diameter(self):
return self.radius * 2
@diameter.setter
def diameter(self, value):
self.radius = value / 2
c = Circle.from_diameter(10) # cls method as factory
c.diameter = 20 # calls setter → radius becomes 10
print(c.area) # calls getter — no () needed
__get__/__set__ work?Hard# A descriptor is any object that implements __get__, __set__, or __delete__
# property, classmethod, staticmethod, functions — all implemented as descriptors
class Validated:
"""Descriptor that validates values on assignment."""
def __set_name__(self, owner, name):
self.name = name
self.private_name = f"_{name}"
def __get__(self, obj, objtype=None):
if obj is None:
return self # class-level access → return descriptor itself
return getattr(obj, self.private_name, None)
def __set__(self, obj, value):
self.validate(value)
setattr(obj, self.private_name, value)
def validate(self, value):
pass # override in subclasses
class PositiveNumber(Validated):
def validate(self, value):
if not isinstance(value, (int, float)) or value <= 0:
raise ValueError(f"{self.name} must be a positive number, got {value!r}")
class Product:
price = PositiveNumber() # class-level descriptor
stock = PositiveNumber()
def __init__(self, price, stock):
self.price = price # triggers PositiveNumber.__set__
self.stock = stock
p = Product(9.99, 100)
p.price = -5 # raises ValueError: price must be a positive number
__slots__ and when should you use it?Medium# By default, each instance has a __dict__ (dict) for its attributes → flexible but memory-heavy
# __slots__: declares fixed set of instance attributes → stores in C struct, not dict
class Point:
__slots__ = ("x", "y") # only x and y allowed as instance attributes
def __init__(self, x, y):
self.x = x
self.y = y
p = Point(3, 4)
p.z = 5 # AttributeError: 'Point' object has no attribute 'z'
p.__dict__ # AttributeError: no __dict__ with __slots__
# Memory savings:
# Regular instance: __dict__ dict overhead + key strings (~232 bytes)
# __slots__ instance: C-level slots (~56 bytes for 2 slots)
# 3-4x memory reduction — critical for millions of small objects
# When to use:
# ✅ Value objects created in very large numbers (Point, Particle, Row)
# ✅ When you want to prevent accidental attribute creation
# ❌ When you need dynamic attributes or pickling/copying without extra work
# ❌ Inheritance with __slots__ is tricky (parent __dict__ leaks back in)
import sys
class WithDict:
def __init__(self): self.x = 1; self.y = 2
class WithSlots:
__slots__ = ("x","y")
def __init__(self): self.x = 1; self.y = 2
sys.getsizeof(WithDict()) # ~48 + dict overhead
sys.getsizeof(WithSlots()) # ~56 (smaller, no dict)
# In Python, classes are objects too. A metaclass is "the class of a class."
# type is the default metaclass: type(int) →
# Metaclass use case: auto-register subclasses (plugin pattern)
class PluginMeta(type):
registry = {}
def __new__(mcs, name, bases, namespace):
cls = super().__new__(mcs, name, bases, namespace)
if bases: # don't register base class itself
mcs.registry[name] = cls
return cls
class Plugin(metaclass=PluginMeta):
pass
class CSVPlugin(Plugin): pass
class JSONPlugin(Plugin): pass
PluginMeta.registry
# {'CSVPlugin': , 'JSONPlugin': }
# Another use: enforce interface (ABCMeta is a metaclass):
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self) -> float: ...
@abstractmethod
def perimeter(self) -> float: ...
class Circle(Shape):
def __init__(self, r): self.r = r
def area(self): return 3.14159 * self.r**2
def perimeter(self): return 2 * 3.14159 * self.r
Circle(5) # works
Shape() # TypeError: Can't instantiate abstract class
# Note: metaclasses are rarely needed. Use class decorators or __init_subclass__
# for most registry/enforcement patterns instead.
# __repr__ vs __str__:
# __repr__: unambiguous, for developers (repr(obj), REPL output, debugging)
# __str__: human-readable, for end users (str(obj), print(obj))
# If __str__ not defined, falls back to __repr__
class Money:
def __init__(self, amount, currency="USD"):
self.amount = amount
self.currency = currency
def __repr__(self):
return f"Money({self.amount!r}, {self.currency!r})"
def __str__(self):
return f"{self.currency} {self.amount:.2f}"
def __add__(self, other): # +
if self.currency != other.currency:
raise ValueError("Currency mismatch")
return Money(self.amount + other.amount, self.currency)
def __eq__(self, other): # ==
return self.amount == other.amount and self.currency == other.currency
def __hash__(self): # required when __eq__ defined (for dict keys/sets)
return hash((self.amount, self.currency))
def __lt__(self, other): # < (enables sorted())
return self.amount < other.amount
def __bool__(self): # bool(obj)
return self.amount != 0
def __len__(self): # len(obj)
return int(self.amount)
def __contains__(self, item): # item in obj
pass
# Closure: inner function that captures variables from enclosing scope
def make_counter(start=0):
count = start # captured by the closure
def counter():
nonlocal count # 'nonlocal' required to rebind (not just read)
count += 1
return count
return counter
c = make_counter(10)
c() # 11
c() # 12
# Classic gotcha — loop variable capture:
funcs = [lambda: i for i in range(5)]
[f() for f in funcs] # [4, 4, 4, 4, 4] — all capture same 'i' (late binding!)
# Fix: capture current value in default argument:
funcs = [lambda i=i: i for i in range(5)]
[f() for f in funcs] # [0, 1, 2, 3, 4]
# LEGB rule: Python resolves names in this order:
# Local → Enclosing → Global → Built-in
x = "global"
def outer():
x = "enclosing"
def inner():
x = "local" # creates LOCAL x, shadows enclosing
print(x) # "local"
inner()
print(x) # "enclosing"
outer()
functools module? Explain lru_cache, partial, and reduce.Medium# lru_cache: memoisation decorator (Least Recently Used cache)
from functools import lru_cache, cache
@lru_cache(maxsize=128) # cache up to 128 results
def fib(n):
if n < 2: return n
return fib(n-1) + fib(n-2)
fib(100) # fast — cached intermediate results
fib.cache_info() # CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)
@cache # Python 3.9+: unbounded cache (equivalent to lru_cache(maxsize=None))
def expensive(n): ...
# partial: pre-fill some arguments of a function
from functools import partial
import math
log_base2 = partial(math.log, base=2)
log_base2(8) # 3.0 (math.log(8, base=2))
def multiply(x, y): return x * y
double = partial(multiply, y=2)
double(5) # 10
# reduce: fold a sequence to a single value
from functools import reduce
product = reduce(lambda acc, x: acc * x, [1,2,3,4,5]) # 120
# Other useful functools:
# total_ordering: define __eq__ + one comparison, auto-derives the rest
# singledispatch: function overloading based on first argument type
| Model | Best for | GIL impact |
|---|---|---|
threading | I/O-bound tasks, simple shared state | Released during I/O — works |
multiprocessing | CPU-bound tasks | Bypassed (separate processes) |
asyncio | Many concurrent I/O tasks (thousands) | Single thread — no GIL issue |
# concurrent.futures: high-level, thread or process pool
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# I/O-bound: ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
results = [f.result() for f in futures]
# CPU-bound: ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
results = list(executor.map(cpu_intensive, data_chunks))
asyncio work? Explain the event loop, coroutines, and tasks.Medium# asyncio: single-threaded cooperative concurrency
# Event loop: scheduler that runs coroutines
# Coroutine: async function — can be suspended (await) without blocking the loop
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.json() # awaits non-blocking I/O
async def main():
import aiohttp
async with aiohttp.ClientSession() as session:
# Run 100 requests concurrently (not sequentially):
tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
results = await asyncio.gather(*tasks) # wait for all
asyncio.run(main()) # Python 3.7+: creates event loop, runs main, closes it
# Key primitives:
# await expr: suspend coroutine until awaitable completes
# asyncio.create_task(): schedule coroutine to run concurrently
# asyncio.gather(*coros): run multiple coroutines concurrently, collect results
# asyncio.wait(): run with timeout, partial completion options
# asyncio.sleep(0): yield control back to event loop (prevents starvation)
# Async context managers and iterators:
async with aiofiles.open("data.txt") as f:
async for line in f: # async iteration
process(line)
asyncio.to_thread() to run blocking code in a thread pool.asyncio.gather vs asyncio.wait vs TaskGroup?Medium# asyncio.gather(*coros, return_exceptions=False):
# - Runs all concurrently
# - Returns results in same order as input
# - By default: first exception cancels all + raises
# - return_exceptions=True: exceptions returned as results (not raised)
results = await asyncio.gather(task1(), task2(), task3(),
return_exceptions=True)
# asyncio.wait(tasks, timeout=None, return_when=FIRST_COMPLETED):
# - Returns (done, pending) sets
# - More control: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED
done, pending = await asyncio.wait(tasks, timeout=5.0,
return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# asyncio.TaskGroup (Python 3.11+) — PREFERRED for structured concurrency:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coroutine1())
task2 = tg.create_task(coroutine2())
# All tasks completed by here
result1 = task1.result()
result2 = task2.result()
# If any task raises: all others are cancelled, ExceptionGroup is raised
# Much safer than gather for structured code
# threading.Lock: mutual exclusion
import threading
class Counter:
def __init__(self):
self._count = 0
self._lock = threading.Lock()
def increment(self):
with self._lock: # acquires lock, releases on exit (even if exception)
self._count += 1
return self._count
# threading.RLock: reentrant lock (same thread can acquire multiple times)
# threading.Semaphore: limits concurrent access to N
# threading.Event: signal between threads (set/wait/clear)
# threading.Condition: wait for a condition, notify one/all waiters
# queue.Queue: thread-safe FIFO (preferred for producer-consumer)
from queue import Queue, PriorityQueue, LifoQueue
q = Queue(maxsize=100)
# Producer:
def producer():
for item in source:
q.put(item) # blocks if full (maxsize reached)
q.put(None) # sentinel to signal done
# Consumer:
def consumer():
while True:
item = q.get() # blocks if empty
if item is None:
break
process(item)
q.task_done() # signal item processed
q.join() # wait until all items processed (task_done called for each)
multiprocessing and how do you share state between processes?Medium# Processes have SEPARATE memory — can't share Python objects directly
# Options for inter-process communication (IPC):
from multiprocessing import Process, Queue, Pipe, Manager, Value, Array
# 1. Queue (process-safe):
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
result = q.get() # blocks until worker puts something
p.join()
# 2. Pipe (two endpoints, faster than Queue for 2 processes):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
parent_conn.send("hello")
print(parent_conn.recv())
# 3. Shared memory (Manager — slower, proxied):
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
p = Process(target=worker, args=(shared_dict,))
# 4. Value/Array — low-level shared C types (fast, no manager):
counter = Value("i", 0) # shared integer
arr = Array("d", range(10)) # shared double array
# 5. Pool.map for parallel data processing:
from multiprocessing import Pool
def process_chunk(chunk):
return [transform(item) for item in chunk]
with Pool(processes=4) as pool:
results = pool.map(process_chunk, data_chunks) # blocks until all done
# or pool.imap() for lazy/streaming results
asyncio.to_thread and when do you use it?Medium# Problem: calling blocking code inside an async function freezes the event loop
async def bad_handler(request):
data = requests.get(url).json() # BLOCKS event loop for entire duration!
return data
# Solution 1: asyncio.to_thread (Python 3.9+)
# Runs blocking function in a thread pool without blocking the event loop
async def good_handler(request):
data = await asyncio.to_thread(requests.get, url) # runs in thread
return data.json()
# Solution 2: loop.run_in_executor (older, more control)
import asyncio
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_function, arg1, arg2)
# None = use default ThreadPoolExecutor
# Or pass your own: executor = ThreadPoolExecutor(max_workers=5)
# Solution 3: Use async libraries (preferred):
# requests → httpx (async) or aiohttp
# psycopg2 → asyncpg or psycopg3 (async)
# redis-py → redis-py with async support (or aioredis)
# smtplib → aiosmtplib
# Rule of thumb:
# Use async libraries where available
# Use asyncio.to_thread for legacy blocking code or file I/O
# Use ProcessPoolExecutor for CPU-bound work inside async apps
# asyncio has its own primitives (NOT the threading ones):
import asyncio
# asyncio.Lock: mutual exclusion for coroutines
async def update():
async with asyncio.Lock():
await shared_resource.modify()
# asyncio.Semaphore: limit concurrent coroutines (rate limiting)
sem = asyncio.Semaphore(10) # max 10 concurrent
async def limited_fetch(url):
async with sem:
return await fetch(url)
# Useful for rate-limiting API calls:
tasks = [limited_fetch(url) for url in 1000_urls]
await asyncio.gather(*tasks) # max 10 at a time
# asyncio.Event: one-to-many signalling
ready = asyncio.Event()
async def waiter():
await ready.wait() # blocks until set
print("ready!")
async def setter():
await asyncio.sleep(1)
ready.set()
# asyncio.Queue: async producer-consumer
q = asyncio.Queue(maxsize=100)
await q.put(item) # awaitable put
item = await q.get() # awaitable get
q.task_done()
await q.join() # wait until all task_done() called
async for and async with?Easy# async with: asynchronous context manager
# Calls __aenter__ and __aexit__ (both coroutines)
async with aiohttp.ClientSession() as session:
response = await session.get(url)
# __aexit__ awaited here — allows async cleanup (like closing connections)
# async for: asynchronous iterator
# Object implements __aiter__ and __anext__ (coroutine)
async for record in database.stream("SELECT * FROM events"):
process(record)
# Custom async iterator:
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # simulate async work
value = self.current
self.current += 1
return value
async for i in AsyncCounter(5):
print(i)
# async generator (simpler):
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
async for i in async_range(5):
print(i)
# asyncio.timeout (Python 3.11+) — preferred:
async def fetch_with_timeout(url):
try:
async with asyncio.timeout(5.0): # 5 second timeout
return await fetch(url)
except TimeoutError:
return None
# asyncio.wait_for (all versions):
try:
result = await asyncio.wait_for(fetch(url), timeout=5.0)
except asyncio.TimeoutError:
print("Timed out")
# Timeout a group of tasks:
async def main():
try:
async with asyncio.timeout(10.0):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(work(i)) for i in range(100)]
except TimeoutError:
print("Overall 10s timeout exceeded; remaining tasks cancelled")
# Reschedule deadline (Python 3.11+):
async with asyncio.timeout(5) as timeout_cm:
await phase1()
timeout_cm.reschedule(asyncio.get_event_loop().time() + 5) # reset 5s
await phase2()
counter += 1 are not thread-safe (read-modify-write). Always use Lock or threading.local.requests.get(), time.sleep(), or CPU-heavy code inside an async function. Fix: use asyncio.to_thread() or asyncio.sleep().result = coroutine() creates a coroutine object but never runs it. Fix: result = await coroutine().asyncio.create_task() exceptions are not raised unless you await the task or check task.exception(). Fix: use TaskGroup or add task.add_done_callback.TypeVar, Generic, Protocol?Medium# Type hints (PEP 484) — checked by mypy/pyright, not enforced at runtime
from typing import Optional, Union, List, Dict, Tuple, Any, Callable
from typing import TypeVar, Generic, Protocol, overload
# Basic:
def greet(name: str) -> str:
return f"Hello, {name}"
def find_user(id: int) -> Optional[User]: # may return None
return db.get(id)
# TypeVar: generic placeholder (like Java's )
T = TypeVar("T")
def first(items: list[T]) -> T: # Python 3.12: def first[T](items: list[T]) -> T
return items[0]
first([1,2,3]) # inferred: int
first(["a","b"]) # inferred: str
# Generic class:
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
return self._items.pop()
s: Stack[int] = Stack()
# Protocol (structural subtyping — duck typing with type safety):
class Drawable(Protocol):
def draw(self) -> None: ...
class Circle: # Does NOT inherit Drawable
def draw(self) -> None: print("O")
def render(shape: Drawable) -> None:
shape.draw()
render(Circle()) # works — Circle satisfies the protocol structurally
itertools and collections highlights?Medium# itertools — lazy combinatorial tools:
import itertools
# chain: flatten multiple iterables
list(itertools.chain([1,2], [3,4], [5])) # [1,2,3,4,5]
# groupby: group consecutive elements with same key
for key, group in itertools.groupby("AAABBCCC"):
print(key, list(group)) # A ['A','A','A'], B ['B','B'], C ['C','C','C']
# islice: slice a generator
list(itertools.islice(fibonacci(), 10)) # first 10 Fibonacci numbers
# product: cartesian product
list(itertools.product("AB", [1,2])) # [('A',1),('A',2),('B',1),('B',2)]
# combinations / permutations
list(itertools.combinations([1,2,3], 2)) # [(1,2),(1,3),(2,3)]
list(itertools.permutations([1,2,3], 2)) # [(1,2),(1,3),(2,1),(2,3),(3,1),(3,2)]
# accumulate (running totals):
list(itertools.accumulate([1,2,3,4], lambda acc,x: acc+x)) # [1,3,6,10]
# collections highlights:
from collections import deque, OrderedDict, ChainMap
deque(maxlen=5) # circular buffer; O(1) append and appendleft / pop / popleft
# ChainMap: search multiple dicts in order (great for layered config):
defaults = {"theme": "dark", "lang": "en"}
user_prefs = {"lang": "fr"}
settings = ChainMap(user_prefs, defaults)
settings["theme"] # "dark" (from defaults)
settings["lang"] # "fr" (from user_prefs)
# datetime module:
from datetime import datetime, date, time, timedelta, timezone
import zoneinfo # Python 3.9+
# Naive datetime (no timezone):
now = datetime.now()
today = date.today()
# Timezone-aware (always prefer in production):
now_utc = datetime.now(timezone.utc)
now_ny = datetime.now(zoneinfo.ZoneInfo("America/New_York"))
# Parse string:
dt = datetime.strptime("2026-06-24 15:30:00", "%Y-%m-%d %H:%M:%S")
dt = datetime.fromisoformat("2026-06-24T15:30:00+05:30") # ISO 8601
# Format:
dt.strftime("%B %d, %Y") # "June 24, 2026"
dt.isoformat() # "2026-06-24T15:30:00"
# Arithmetic:
tomorrow = today + timedelta(days=1)
duration = datetime(2026, 12, 31) - datetime.now()
duration.days # days remaining
# Convert between timezones:
utc_dt = datetime.now(timezone.utc)
ist_dt = utc_dt.astimezone(zoneinfo.ZoneInfo("Asia/Kolkata"))
# Prefer: python-dateutil for parsing arbitrary formats
# Prefer: arrow or pendulum for ergonomic timezone handling
from dateutil.relativedelta import relativedelta
next_month = today + relativedelta(months=1)
pathlib and how does it compare to os.path?Easy# pathlib.Path (Python 3.4+) — object-oriented, cross-platform
from pathlib import Path
p = Path("/home/user/projects/blog")
p.name # "blog"
p.stem # "blog" (no extension)
p.suffix # "" (no extension)
p.parent # Path("/home/user/projects")
p.parts # ('/', 'home', 'user', 'projects', 'blog')
# Navigation with / operator:
config = p / "config" / "settings.json"
# File operations:
config.exists() # True/False
config.is_file()
config.is_dir()
config.mkdir(parents=True, exist_ok=True)
# Read/write:
text = config.read_text(encoding="utf-8")
config.write_text(json.dumps(data), encoding="utf-8")
bytes_data = config.read_bytes()
# Glob:
list(p.glob("**/*.html")) # all HTML files recursively
list(p.rglob("*.py")) # rglob = recursive glob
# vs os.path:
# os.path.join(base, "config", "settings.json") ← string manipulation
# Path(base) / "config" / "settings.json" ← object composition (preferred)
# Stat:
stat = config.stat()
stat.st_size # file size in bytes
stat.st_mtime # last modified timestamp
dataclasses?Medium# Pydantic v2: fast validation, serialisation, and settings management
from pydantic import BaseModel, field_validator, model_validator, Field
from typing import Annotated
class Address(BaseModel):
street: str
city: str
zip_code: str
class User(BaseModel):
id: int
name: str = Field(min_length=1, max_length=50)
email: str
age: Annotated[int, Field(ge=0, le=150)]
address: Address
tags: list[str] = []
@field_validator("email")
@classmethod
def validate_email(cls, v: str) -> str:
if "@" not in v:
raise ValueError("Invalid email")
return v.lower()
# Parsing (auto-coerces types):
user = User(id="1", name="Alice", email="ALICE@EX.COM",
age=30, address={"street":"123 Main","city":"NY","zip_code":"10001"})
user.id # 1 (int, coerced from "1")
user.email # "alice@ex.com" (lowercased by validator)
# Serialisation:
user.model_dump() # dict
user.model_dump_json() # JSON string
# dataclasses vs Pydantic:
# dataclasses: lightweight, no runtime validation, no coercion, stdlib
# Pydantic: validation, coercion, JSON serialisation, FastAPI integration
# Use dataclasses for internal data; Pydantic for API models and config
# Virtual environments: isolated Python + packages per project
python -m venv .venv # create
source .venv/bin/activate # activate (Linux/Mac)
.venv\Scripts\activate # activate (Windows)
pip install requests # installs into .venv only
deactivate # leave venv
# requirements.txt (legacy):
pip freeze > requirements.txt
pip install -r requirements.txt
# pyproject.toml (modern standard — PEP 517/518/621):
[project]
name = "myapp"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.100",
"pydantic>=2.0",
"sqlalchemy>=2.0",
]
[project.optional-dependencies]
dev = ["pytest", "black", "mypy", "ruff"]
# Tools:
# pip: basic package manager
# pip-tools: pin transitive dependencies (pip-compile)
# poetry: project management + publishing + venvs
# uv (Astral, 2024+): extremely fast Rust-based pip/venv replacement
# → uv pip install requests (100x faster than pip)
# pdm, hatch: other modern alternatives
# Use logging, not print() — levels, handlers, formatters, structured output
import logging
# Module-level logger (preferred pattern):
logger = logging.getLogger(__name__) # name = "mypackage.module"
# Basic configuration (application entry point only):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler("app.log")]
)
# Usage:
logger.debug("Cache miss for key %s", key) # lazy formatting (no f-string!)
logger.info("User %s logged in", user_id)
logger.warning("Rate limit approaching: %d/min", count)
logger.error("Payment failed: %s", error)
logger.exception("Unexpected error") # logs + full traceback
# Structured logging (JSON, for log aggregation systems):
import structlog
log = structlog.get_logger()
log.info("order_placed", order_id=123, user_id=456, amount=99.99)
# Output: {"event": "order_placed", "order_id": 123, ...}
# Rules:
# Library code: logger.getLogger(__name__), NEVER basicConfig
# Application entry point: configure handlers/formatters
# Use % formatting in log calls (not f-strings) — lazy eval if level not enabled
# Never log PII/passwords/tokens
importlib and module system work?Hard# Import system:
# 1. Check sys.modules cache (already imported?)
# 2. Find module: sys.meta_path finders (PathFinder, BuiltinImporter, FrozenImporter)
# 3. Load: read source/bytecode, compile, execute module body
# 4. Cache in sys.modules
import sys
"json" in sys.modules # True if json already imported
# Relative imports (only inside packages):
# from . import sibling_module
# from .. import parent_module
# from .utils import helper
# Lazy import (defer cost until first use):
def get_numpy():
import numpy # only loaded when this function is called first time
return numpy
# importlib: programmatic imports
import importlib
mod = importlib.import_module("json")
importlib.reload(mod) # reload (rarely needed — forces re-execution)
# Dynamic plugin loading:
def load_plugin(name):
module = importlib.import_module(f"plugins.{name}")
return module.Plugin()
# __init__.py: marks directory as package; can expose public API:
# mypackage/__init__.py:
# from .core import MainClass → users do: from mypackage import MainClass
# __all__: controls what 'from module import *' exports:
__all__ = ["PublicClass", "public_function"] # private names excluded
# pytest: no boilerplate, rich assertions, fixtures, plugins
# test_order.py:
import pytest
from myapp.orders import Order, InsufficientFundsError
def test_order_total():
order = Order(items=[("Widget", 9.99), ("Gadget", 29.99)])
assert order.total == pytest.approx(39.98) # float comparison with tolerance
def test_order_raises_on_empty():
with pytest.raises(ValueError, match="at least one item"):
Order(items=[])
@pytest.mark.parametrize("amount,expected", [
(100, 90), # 10% discount
(50, 50), # no discount
(200, 170), # 15% discount
])
def test_discount(amount, expected):
assert apply_discount(amount) == expected
# Fixtures (reusable setup):
@pytest.fixture
def sample_order():
return Order(items=[("Widget", 9.99)])
@pytest.fixture(scope="session") # created once per test session
def db():
conn = create_test_database()
yield conn
conn.close()
def test_save_order(sample_order, db):
db.save(sample_order)
loaded = db.find(sample_order.id)
assert loaded == sample_order
# Mocking:
from unittest.mock import patch, MagicMock
def test_email_sent():
with patch("myapp.orders.send_email") as mock_email:
order = Order(items=[("Widget", 9.99)])
order.place()
mock_email.assert_called_once_with(subject="Order Confirmed", ...)
# pytest-asyncio: run async test functions
# pip install pytest-asyncio
import pytest
import pytest_asyncio
# Mark individual test:
@pytest.mark.asyncio
async def test_fetch_user():
async with aiohttp.ClientSession() as session:
user = await fetch_user(session, user_id=1)
assert user.name == "Alice"
# Or configure globally in pyproject.toml:
# [tool.pytest.ini_options]
# asyncio_mode = "auto" # auto-detect async tests
@pytest_asyncio.fixture
async def async_client():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
yield client
async def test_api_endpoint(async_client):
response = await async_client.get("/users/1")
assert response.status_code == 200
# Mock async functions:
from unittest.mock import AsyncMock
async def test_with_async_mock():
mock_fetch = AsyncMock(return_value={"id": 1, "name": "Alice"})
with patch("myapp.service.fetch_user", mock_fetch):
result = await my_service.process_user(1)
mock_fetch.assert_awaited_once_with(1)
# 1. cProfile: function-level profiling (deterministic)
python -m cProfile -s cumulative -o profile.out my_script.py
python -m pstats profile.out # interactive analysis
# Or in code:
import cProfile
import pstats
profiler = cProfile.Profile()
profiler.enable()
expensive_function()
profiler.disable()
stats = pstats.Stats(profiler).sort_stats("cumulative")
stats.print_stats(20) # top 20 functions
# 2. line_profiler: line-by-line timing (pip install line_profiler)
@profile # decorator added by kernprof
def slow_function():
result = []
for i in range(1000): # ← line-level timing here
result.append(i**2)
return result
# kernprof -l -v my_script.py
# 3. memory_profiler: memory usage per line
# @memory_profiler.profile decorator
# 4. timeit: microbenchmarks
import timeit
timeit.timeit("sorted([3,1,2])", number=100_000)
# 5. py-spy: sampling profiler — zero overhead, no code changes
# py-spy top --pid 12345
# py-spy record -o profile.svg --pid 12345
# 6. scalene: CPU + memory + GPU profiler in one
# python -m scalene my_script.py
# 1. Use built-ins and stdlib — implemented in C
sum(lst) # faster than: total = 0; for x in lst: total += x
"".join(parts) # faster than string concatenation in loop
set intersection # O(min(m,n)) vs O(m*n) nested loop
# 2. List comprehension vs loop:
squares = [x**2 for x in range(1000)] # faster than append-loop
# 3. Local variable lookup faster than global:
def fast():
local_range = range # cache built-in as local
return [local_range(i) for i in range(1000)]
# 4. Avoid repeated attribute lookup in tight loops:
import math
for _ in range(1000):
math.sqrt(2) # slow: dict lookup + attribute access each time
sqrt = math.sqrt # cache once
for _ in range(1000):
sqrt(2) # direct call
# 5. Use appropriate data structures:
# O(1) set lookup vs O(n) list search:
valid_ids = set(db.fetch_all_ids()) # build once
if user_id in valid_ids: ... # O(1)
# 6. numpy for numeric computation:
import numpy as np
a = np.array([1,2,3,4,5])
a * 2 # vectorised C loop — 100x faster than Python loop
# 7. PyPy, Cython, or Numba for CPU-intensive pure Python code
# 8. Lazy evaluation with generators
# 9. lru_cache for pure functions called repeatedly
# 10. Avoid creating unnecessary objects (reuse, preallocate)
# FastAPI: async REST framework, automatic OpenAPI docs, Pydantic integration
# pip install fastapi uvicorn[standard]
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import Annotated
app = FastAPI(title="Order API", version="1.0.0")
class OrderCreate(BaseModel):
product_id: int
quantity: int = 1
class OrderResponse(BaseModel):
id: int
product_id: int
quantity: int
total: float
class Config:
from_attributes = True # allow ORM models (SQLAlchemy)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int, db: Annotated[Session, Depends(get_db)]):
order = await db.get(Order, order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
@app.post("/orders", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(payload: OrderCreate, db: Annotated[Session, Depends(get_db)]):
order = Order(**payload.model_dump())
db.add(order)
await db.commit()
await db.refresh(order)
return order
# Run: uvicorn main:app --reload
# Docs: http://localhost:8000/docs (Swagger) or /redoc
# SQLAlchemy 2.0 async (with asyncpg / aiosqlite):
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db", echo=True)
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(nullable=False)
email: Mapped[str] = mapped_column(unique=True)
# Create tables:
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Dependency for FastAPI:
async def get_db():
async with AsyncSessionFactory() as session:
yield session
# Query:
async def get_user(session: AsyncSession, user_id: int) -> User | None:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# Insert:
async def create_user(session: AsyncSession, name: str, email: str) -> User:
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
# In-process cache (single instance):
from functools import lru_cache
from cachetools import TTLCache, LRUCache
ttl_cache = TTLCache(maxsize=1000, ttl=60) # expires after 60s
# Redis (distributed, multi-instance):
import redis.asyncio as aioredis
from typing import Any
import json
class RedisCache:
def __init__(self, url: str):
self.redis = aioredis.from_url(url)
async def get(self, key: str) -> Any | None:
value = await self.redis.get(key)
return json.loads(value) if value else None
async def set(self, key: str, value: Any, ttl: int = 300):
await self.redis.setex(key, ttl, json.dumps(value))
async def delete(self, key: str):
await self.redis.delete(key)
# Cache-aside pattern (read-through):
async def get_user(user_id: int) -> User:
cache_key = f"user:{user_id}"
cached = await cache.get(cache_key)
if cached:
return User(**cached)
user = await db.fetch_user(user_id)
await cache.set(cache_key, user.model_dump(), ttl=300)
return user
# Cache invalidation strategies:
# 1. TTL: expire after N seconds (simple, eventual consistency)
# 2. Write-through: update cache on every DB write
# 3. Write-around: only cache on read, skip on write
# 4. Event-driven: invalidate on message from Kafka/Redis pub-sub
# Option 1: FastAPI BackgroundTasks (lightweight, in-process)
from fastapi import BackgroundTasks
@app.post("/orders")
async def create_order(payload: OrderCreate, bg: BackgroundTasks):
order = await save_order(payload)
bg.add_task(send_confirmation_email, order.id) # runs after response sent
return order
# Option 2: Celery (production — distributed task queue)
# pip install celery redis
from celery import Celery
celery = Celery("tasks", broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1")
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, order_id: int):
try:
email_service.send(order_id)
except EmailError as e:
raise self.retry(exc=e)
# Dispatch from FastAPI:
send_email.delay(order_id=123)
send_email.apply_async(args=[123], countdown=30) # run after 30s
# Option 3: ARQ (async Redis Queue — lighter than Celery)
from arq import create_pool
from arq.connections import RedisSettings
async def send_email(ctx, order_id: int):
await email_service.async_send(order_id)
class WorkerSettings:
functions = [send_email]
redis_settings = RedisSettings()
# Dispatch:
redis = await create_pool(RedisSettings())
await redis.enqueue_job("send_email", order_id=123)
# pydantic-settings: type-safe configuration from env vars (12-factor app)
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import SecretStr
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False
)
app_name: str = "MyService"
debug: bool = False
database_url: str
redis_url: str = "redis://localhost:6379"
secret_key: SecretStr # never exposed in repr/logs
max_connections: int = 10
allowed_origins: list[str] = ["*"]
settings = Settings() # reads env vars + .env file
# .env file (not committed to git):
# DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# SECRET_KEY=supersecret
# Access:
settings.database_url # "postgresql+asyncpg://..."
settings.secret_key.get_secret_value() # explicit: reveals SecretStr
# Singleton pattern (avoid re-reading env on every import):
from functools import lru_cache
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()
# FastAPI Depends integration:
def get_db_url(settings: Annotated[Settings, Depends(get_settings)]):
return settings.database_url
# 1. Singleton — module-level global (Python idiom):
# settings.py is already a singleton — just import it
# 2. Factory — classmethod or callable:
class Parser:
@classmethod
def for_format(cls, fmt: str) -> "Parser":
return {"json": JSONParser, "csv": CSVParser}[fmt]()
# 3. Strategy — callables / Protocol:
def process(data, transform: Callable[[str], str]) -> str:
return transform(data)
process(data, str.upper)
process(data, lambda s: s.strip())
# 4. Observer — callbacks or event bus:
class EventBus:
def __init__(self): self._handlers = defaultdict(list)
def subscribe(self, event, handler): self._handlers[event].append(handler)
def publish(self, event, **data):
for h in self._handlers[event]: h(**data)
bus.subscribe("order_placed", send_email)
bus.subscribe("order_placed", update_inventory)
bus.publish("order_placed", order_id=123)
# 5. Repository — abstract data access:
class OrderRepository(Protocol):
async def get(self, id: int) -> Order | None: ...
async def save(self, order: Order) -> Order: ...
class PostgresOrderRepo:
async def get(self, id: int) -> Order | None:
return await db.get(Order, id)
# 6. Dependency injection via constructors or FastAPI Depends:
class OrderService:
def __init__(self, repo: OrderRepository, cache: CacheService):
self.repo = repo
self.cache = cache
# 7. Template Method — base class with hooks:
class Report(ABC):
def generate(self) -> str: # template method
data = self.fetch_data()
return self.format(data)
@abstractmethod
def fetch_data(self): ...
@abstractmethod
def format(self, data): ...