Python Caching Strategies: Redis, Memcached and functools

Caching is the single most effective performance optimization for most web applications — it can reduce database load by 90% and response times from hundreds of milliseconds to under one millisecond. Python's ecosystem offers in-process caching with functools.lru_cache, distributed caching with Redis and Memcached, and HTTP-level caching with FastAPI-Cache. This guide covers all layers with cache-aside, write-through, invalidation patterns, and monitoring.

functools.lru_cache and cache

In-process caching is the simplest form. lru_cache memoizes function results in memory — perfect for expensive pure functions called repeatedly with the same arguments (config loading, regex compilation, DB schema lookups).

from functools import lru_cache, cache
import time

# @cache (Python 3.9+) — equivalent to lru_cache(maxsize=None)
@cache
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(50))  # instant, even though it's exponential recursion

# @lru_cache — bounded size, evicts least recently used
@lru_cache(maxsize=256)
def get_country_code(name: str) -> str:
    import pycountry
    country = pycountry.countries.search_fuzzy(name)[0]
    return country.alpha_2

# Cache stats
print(get_country_code.cache_info())
# CacheInfo(hits=5, misses=3, maxsize=256, currsize=3)

# Clear the cache
get_country_code.cache_clear()

# Async memoization (lru_cache doesn't work with async)
# Use async-lru or aiocache:
from async_lru import alru_cache

@alru_cache(maxsize=128)
async def fetch_user(user_id: int) -> dict:
    async with db.connection() as conn:
        return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)

# TTL-based in-process cache
import threading
from datetime import datetime, timedelta

class TTLCache:
    def __init__(self, ttl_seconds: int = 300):
        self._cache: dict = {}
        self._ttl = ttl_seconds
        self._lock = threading.Lock()

    def get(self, key: str):
        with self._lock:
            item = self._cache.get(key)
            if item and datetime.utcnow() < item["expires"]:
                return item["value"]
            self._cache.pop(key, None)
            return None

    def set(self, key: str, value):
        with self._lock:
            self._cache[key] = {
                "value": value,
                "expires": datetime.utcnow() + timedelta(seconds=self._ttl),
            }

    def invalidate(self, key: str):
        with self._lock:
            self._cache.pop(key, None)

Redis Caching

Redis is the standard distributed cache for Python web applications. It survives process restarts, is shared across multiple app instances, and supports atomic operations, pub/sub, and data structures beyond simple key-value pairs.

import redis.asyncio as aioredis
import json
import os

redis = aioredis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379"))

async def cache_get(key: str) -> dict | None:
    value = await redis.get(key)
    return json.loads(value) if value else None

async def cache_set(key: str, value: dict, ttl: int = 300) -> None:
    await redis.setex(key, ttl, json.dumps(value))

async def cache_delete(key: str) -> None:
    await redis.delete(key)

async def cache_delete_pattern(pattern: str) -> int:
    """Delete all keys matching a pattern (e.g., 'user:*')."""
    keys = await redis.keys(pattern)
    if keys:
        return await redis.delete(*keys)
    return 0

# Cache-aside pattern
async def get_user(user_id: int) -> dict:
    cache_key = f"user:{user_id}"

    # 1. Try cache first
    cached = await cache_get(cache_key)
    if cached:
        return cached

    # 2. Cache miss — fetch from DB
    user = await db.fetch_user(user_id)
    if not user:
        return None

    # 3. Store in cache with TTL
    await cache_set(cache_key, user, ttl=300)
    return user

# Pipeline — batch multiple Redis commands in one round trip
async def get_multiple_users(user_ids: list[int]) -> list[dict]:
    async with redis.pipeline() as pipe:
        for uid in user_ids:
            pipe.get(f"user:{uid}")
        results = await pipe.execute()

    users = []
    missing_ids = []
    for uid, cached in zip(user_ids, results):
        if cached:
            users.append(json.loads(cached))
        else:
            missing_ids.append(uid)

    # Fetch missing from DB in one query
    if missing_ids:
        db_users = await db.fetch_users_batch(missing_ids)
        async with redis.pipeline() as pipe:
            for user in db_users:
                pipe.setex(f"user:{user['id']}", 300, json.dumps(user))
        await pipe.execute()
        users.extend(db_users)

    return users

Building a Cache Decorator

import functools
import hashlib
import json
import inspect

def redis_cache(ttl: int = 300, key_prefix: str = ""):
    """Decorator that caches async function results in Redis."""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Build cache key from function name + arguments
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()
            arg_str = json.dumps(dict(bound.arguments), sort_keys=True, default=str)
            key_hash = hashlib.md5(arg_str.encode()).hexdigest()[:12]
            cache_key = f"{key_prefix or func.__name__}:{key_hash}"

            # Check cache
            cached = await redis.get(cache_key)
            if cached:
                return json.loads(cached)

            # Execute function
            result = await func(*args, **kwargs)

            # Cache result
            if result is not None:
                await redis.setex(cache_key, ttl, json.dumps(result, default=str))

            return result

        async def invalidate(*args, **kwargs):
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()
            arg_str = json.dumps(dict(bound.arguments), sort_keys=True, default=str)
            key_hash = hashlib.md5(arg_str.encode()).hexdigest()[:12]
            cache_key = f"{key_prefix or func.__name__}:{key_hash}"
            await redis.delete(cache_key)

        wrapper.invalidate = invalidate
        return wrapper
    return decorator

@redis_cache(ttl=600, key_prefix="product")
async def get_product(product_id: int) -> dict:
    return await db.fetch_product(product_id)

# Use it
product = await get_product(42)

# Invalidate
await get_product.invalidate(42)

FastAPI-Cache

pip install fastapi-cache2[redis]
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    r = aioredis.from_url("redis://localhost:6379")
    FastAPICache.init(RedisBackend(r), prefix="fastapi-cache:")
    yield

app = FastAPI(lifespan=lifespan)

@app.get("/products/{product_id}")
@cache(expire=300)
async def get_product(product_id: int):
    # Result cached for 5 minutes — key = URL + query params
    return await db.fetch_product(product_id)

@app.get("/stats/daily")
@cache(expire=3600, namespace="stats")
async def daily_stats():
    return await db.compute_daily_stats()

# Custom cache key builder
def user_cache_key(func, namespace: str = "", *args, **kwargs):
    # Separate cache per user
    user_id = kwargs.get("current_user", {}).get("id", "anon")
    return f"{namespace}:{func.__name__}:{user_id}"

@app.get("/me/feed")
@cache(expire=60, key_builder=user_cache_key)
async def my_feed(current_user=Depends(get_current_user)):
    return await db.fetch_feed(current_user["id"])

Cache Patterns

import asyncio

# Write-through — update cache and DB simultaneously
async def update_product(product_id: int, data: dict) -> dict:
    # Write to DB
    updated = await db.update_product(product_id, data)
    # Write to cache immediately
    await cache_set(f"product:{product_id}", updated, ttl=300)
    return updated

# Write-behind / write-back — write to cache first, DB later
async def log_event(event: dict) -> None:
    await redis.rpush("events:queue", json.dumps(event))
    # Background worker drains queue to DB in batches

# Stampede prevention — only one request rebuilds the cache
_locks: dict[str, asyncio.Lock] = {}

async def get_with_lock(key: str, fetch_fn) -> dict:
    cached = await cache_get(key)
    if cached:
        return cached

    if key not in _locks:
        _locks[key] = asyncio.Lock()

    async with _locks[key]:
        # Double-check after acquiring lock
        cached = await cache_get(key)
        if cached:
            return cached
        result = await fetch_fn()
        await cache_set(key, result, ttl=300)
        return result

Cache Invalidation

# Tag-based invalidation — group related cache entries
async def set_with_tags(key: str, value: dict, tags: list[str], ttl: int = 300):
    async with redis.pipeline() as pipe:
        pipe.setex(key, ttl, json.dumps(value))
        for tag in tags:
            pipe.sadd(f"tag:{tag}", key)
            pipe.expire(f"tag:{tag}", ttl + 60)
        await pipe.execute()

async def invalidate_tag(tag: str):
    keys = await redis.smembers(f"tag:{tag}")
    if keys:
        await redis.delete(*keys, f"tag:{tag}")

# Usage
await set_with_tags(
    f"product:{product_id}", product,
    tags=[f"category:{product['category_id']}", "products"],
    ttl=600,
)
# When a category changes, invalidate all its products:
await invalidate_tag(f"category:{category_id}")

Monitoring Cache Effectiveness

import time

class CacheMetrics:
    def __init__(self):
        self.hits = 0
        self.misses = 0
        self.total_time_saved = 0.0

    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total else 0

    def record(self, hit: bool, time_saved_ms: float = 0):
        if hit:
            self.hits += 1
            self.total_time_saved += time_saved_ms
        else:
            self.misses += 1

metrics = CacheMetrics()

# Redis INFO command for built-in stats
async def get_redis_stats() -> dict:
    info = await redis.info("stats")
    return {
        "hits": info["keyspace_hits"],
        "misses": info["keyspace_misses"],
        "hit_rate": info["keyspace_hits"] / (info["keyspace_hits"] + info["keyspace_misses"] + 1),
        "evicted_keys": info["evicted_keys"],
    }

Frequently Asked Questions

Redis vs Memcached — which to choose?
Redis supports richer data types (lists, sets, sorted sets, hashes), persistence, pub/sub, and Lua scripting. Memcached is simpler, slightly faster for pure key-value caching, and uses less memory. In 2026, choose Redis for new projects — its additional capabilities are almost always useful and it has better ecosystem support.
How do I avoid cache stampede under high load?
Use probabilistic early expiration: check if the TTL is below a threshold and refresh early with some probability. Or use the lock pattern shown above. Redis 7+ supports GETEX which atomically gets a value and extends its TTL, helping with lock-free refresh patterns.
What is an appropriate TTL for different data types?
User sessions: 30 minutes to 24 hours. Static reference data (country codes, categories): 1-24 hours. User profiles: 5-30 minutes. Search results: 1-5 minutes. Real-time data: 1-60 seconds. Always keep TTL shorter than how long stale data would cause problems.