Python Caching Strategies: Redis, Memcached and functools
Caching is the single most effective performance optimization for most web applications — it can reduce database load by 90% and response times from hundreds of milliseconds to under one millisecond. Python's ecosystem offers in-process caching with functools.lru_cache, distributed caching with Redis and Memcached, and HTTP-level caching with FastAPI-Cache. This guide covers all layers with cache-aside, write-through, invalidation patterns, and monitoring.
Table of Contents
functools.lru_cache and cache
In-process caching is the simplest form. lru_cache memoizes function results in memory — perfect for expensive pure functions called repeatedly with the same arguments (config loading, regex compilation, DB schema lookups).
from functools import lru_cache, cache
import time
# @cache (Python 3.9+) — equivalent to lru_cache(maxsize=None)
@cache
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(50)) # instant, even though it's exponential recursion
# @lru_cache — bounded size, evicts least recently used
@lru_cache(maxsize=256)
def get_country_code(name: str) -> str:
import pycountry
country = pycountry.countries.search_fuzzy(name)[0]
return country.alpha_2
# Cache stats
print(get_country_code.cache_info())
# CacheInfo(hits=5, misses=3, maxsize=256, currsize=3)
# Clear the cache
get_country_code.cache_clear()
# Async memoization (lru_cache doesn't work with async)
# Use async-lru or aiocache:
from async_lru import alru_cache
@alru_cache(maxsize=128)
async def fetch_user(user_id: int) -> dict:
async with db.connection() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# TTL-based in-process cache
import threading
from datetime import datetime, timedelta
class TTLCache:
def __init__(self, ttl_seconds: int = 300):
self._cache: dict = {}
self._ttl = ttl_seconds
self._lock = threading.Lock()
def get(self, key: str):
with self._lock:
item = self._cache.get(key)
if item and datetime.utcnow() < item["expires"]:
return item["value"]
self._cache.pop(key, None)
return None
def set(self, key: str, value):
with self._lock:
self._cache[key] = {
"value": value,
"expires": datetime.utcnow() + timedelta(seconds=self._ttl),
}
def invalidate(self, key: str):
with self._lock:
self._cache.pop(key, None)
Redis Caching
Redis is the standard distributed cache for Python web applications. It survives process restarts, is shared across multiple app instances, and supports atomic operations, pub/sub, and data structures beyond simple key-value pairs.
import redis.asyncio as aioredis
import json
import os
redis = aioredis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379"))
async def cache_get(key: str) -> dict | None:
value = await redis.get(key)
return json.loads(value) if value else None
async def cache_set(key: str, value: dict, ttl: int = 300) -> None:
await redis.setex(key, ttl, json.dumps(value))
async def cache_delete(key: str) -> None:
await redis.delete(key)
async def cache_delete_pattern(pattern: str) -> int:
"""Delete all keys matching a pattern (e.g., 'user:*')."""
keys = await redis.keys(pattern)
if keys:
return await redis.delete(*keys)
return 0
# Cache-aside pattern
async def get_user(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# 1. Try cache first
cached = await cache_get(cache_key)
if cached:
return cached
# 2. Cache miss — fetch from DB
user = await db.fetch_user(user_id)
if not user:
return None
# 3. Store in cache with TTL
await cache_set(cache_key, user, ttl=300)
return user
# Pipeline — batch multiple Redis commands in one round trip
async def get_multiple_users(user_ids: list[int]) -> list[dict]:
async with redis.pipeline() as pipe:
for uid in user_ids:
pipe.get(f"user:{uid}")
results = await pipe.execute()
users = []
missing_ids = []
for uid, cached in zip(user_ids, results):
if cached:
users.append(json.loads(cached))
else:
missing_ids.append(uid)
# Fetch missing from DB in one query
if missing_ids:
db_users = await db.fetch_users_batch(missing_ids)
async with redis.pipeline() as pipe:
for user in db_users:
pipe.setex(f"user:{user['id']}", 300, json.dumps(user))
await pipe.execute()
users.extend(db_users)
return users
Building a Cache Decorator
import functools
import hashlib
import json
import inspect
def redis_cache(ttl: int = 300, key_prefix: str = ""):
"""Decorator that caches async function results in Redis."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Build cache key from function name + arguments
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
arg_str = json.dumps(dict(bound.arguments), sort_keys=True, default=str)
key_hash = hashlib.md5(arg_str.encode()).hexdigest()[:12]
cache_key = f"{key_prefix or func.__name__}:{key_hash}"
# Check cache
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Execute function
result = await func(*args, **kwargs)
# Cache result
if result is not None:
await redis.setex(cache_key, ttl, json.dumps(result, default=str))
return result
async def invalidate(*args, **kwargs):
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
arg_str = json.dumps(dict(bound.arguments), sort_keys=True, default=str)
key_hash = hashlib.md5(arg_str.encode()).hexdigest()[:12]
cache_key = f"{key_prefix or func.__name__}:{key_hash}"
await redis.delete(cache_key)
wrapper.invalidate = invalidate
return wrapper
return decorator
@redis_cache(ttl=600, key_prefix="product")
async def get_product(product_id: int) -> dict:
return await db.fetch_product(product_id)
# Use it
product = await get_product(42)
# Invalidate
await get_product.invalidate(42)
FastAPI-Cache
pip install fastapi-cache2[redis]
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
r = aioredis.from_url("redis://localhost:6379")
FastAPICache.init(RedisBackend(r), prefix="fastapi-cache:")
yield
app = FastAPI(lifespan=lifespan)
@app.get("/products/{product_id}")
@cache(expire=300)
async def get_product(product_id: int):
# Result cached for 5 minutes — key = URL + query params
return await db.fetch_product(product_id)
@app.get("/stats/daily")
@cache(expire=3600, namespace="stats")
async def daily_stats():
return await db.compute_daily_stats()
# Custom cache key builder
def user_cache_key(func, namespace: str = "", *args, **kwargs):
# Separate cache per user
user_id = kwargs.get("current_user", {}).get("id", "anon")
return f"{namespace}:{func.__name__}:{user_id}"
@app.get("/me/feed")
@cache(expire=60, key_builder=user_cache_key)
async def my_feed(current_user=Depends(get_current_user)):
return await db.fetch_feed(current_user["id"])
Cache Patterns
import asyncio
# Write-through — update cache and DB simultaneously
async def update_product(product_id: int, data: dict) -> dict:
# Write to DB
updated = await db.update_product(product_id, data)
# Write to cache immediately
await cache_set(f"product:{product_id}", updated, ttl=300)
return updated
# Write-behind / write-back — write to cache first, DB later
async def log_event(event: dict) -> None:
await redis.rpush("events:queue", json.dumps(event))
# Background worker drains queue to DB in batches
# Stampede prevention — only one request rebuilds the cache
_locks: dict[str, asyncio.Lock] = {}
async def get_with_lock(key: str, fetch_fn) -> dict:
cached = await cache_get(key)
if cached:
return cached
if key not in _locks:
_locks[key] = asyncio.Lock()
async with _locks[key]:
# Double-check after acquiring lock
cached = await cache_get(key)
if cached:
return cached
result = await fetch_fn()
await cache_set(key, result, ttl=300)
return result
Cache Invalidation
# Tag-based invalidation — group related cache entries
async def set_with_tags(key: str, value: dict, tags: list[str], ttl: int = 300):
async with redis.pipeline() as pipe:
pipe.setex(key, ttl, json.dumps(value))
for tag in tags:
pipe.sadd(f"tag:{tag}", key)
pipe.expire(f"tag:{tag}", ttl + 60)
await pipe.execute()
async def invalidate_tag(tag: str):
keys = await redis.smembers(f"tag:{tag}")
if keys:
await redis.delete(*keys, f"tag:{tag}")
# Usage
await set_with_tags(
f"product:{product_id}", product,
tags=[f"category:{product['category_id']}", "products"],
ttl=600,
)
# When a category changes, invalidate all its products:
await invalidate_tag(f"category:{category_id}")
Monitoring Cache Effectiveness
import time
class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
self.total_time_saved = 0.0
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total else 0
def record(self, hit: bool, time_saved_ms: float = 0):
if hit:
self.hits += 1
self.total_time_saved += time_saved_ms
else:
self.misses += 1
metrics = CacheMetrics()
# Redis INFO command for built-in stats
async def get_redis_stats() -> dict:
info = await redis.info("stats")
return {
"hits": info["keyspace_hits"],
"misses": info["keyspace_misses"],
"hit_rate": info["keyspace_hits"] / (info["keyspace_hits"] + info["keyspace_misses"] + 1),
"evicted_keys": info["evicted_keys"],
}
Frequently Asked Questions
- Redis vs Memcached — which to choose?
- Redis supports richer data types (lists, sets, sorted sets, hashes), persistence, pub/sub, and Lua scripting. Memcached is simpler, slightly faster for pure key-value caching, and uses less memory. In 2026, choose Redis for new projects — its additional capabilities are almost always useful and it has better ecosystem support.
- How do I avoid cache stampede under high load?
- Use probabilistic early expiration: check if the TTL is below a threshold and refresh early with some probability. Or use the lock pattern shown above. Redis 7+ supports
GETEXwhich atomically gets a value and extends its TTL, helping with lock-free refresh patterns. - What is an appropriate TTL for different data types?
- User sessions: 30 minutes to 24 hours. Static reference data (country codes, categories): 1-24 hours. User profiles: 5-30 minutes. Search results: 1-5 minutes. Real-time data: 1-60 seconds. Always keep TTL shorter than how long stale data would cause problems.