Python Redis: Caching, Sessions and Pub/Sub with redis-py (2026)
Redis is the go-to in-memory data store for Python applications that need sub-millisecond latency — whether you are caching database query results, managing user sessions, coordinating distributed workers, or building a real-time event pipeline. The redis-py library is the official Python client, and this guide covers everything from basic data-type operations to production patterns like distributed locking, Redis Streams, and the RedisJSON and RediSearch modules.
Table of Contents
Installation and Connection Setup
Install redis-py with optional extras for hiredis (C parser, ~10x faster) and async support:
pip install "redis[hiredis]>=5.0"
# For async with asyncio:
pip install "redis[hiredis,asyncio]>=5.0"
Connect to a local or remote Redis instance:
import redis
# Synchronous client
r = redis.Redis(
host="localhost",
port=6379,
db=0,
password="your-secret",
decode_responses=True, # return str instead of bytes
socket_timeout=5,
socket_connect_timeout=5,
)
# Verify connection
r.ping() # returns True
# Async client (Python 3.11+)
import redis.asyncio as aioredis
async def get_async_client():
return await aioredis.from_url(
"redis://localhost:6379/0",
decode_responses=True,
max_connections=20,
)
decode_responses=True when storing string data. Without it, redis-py returns raw bytes and you must call .decode("utf-8") everywhere.
Core Data Type Operations
Redis has five primary data structures. Here are the most important Python operations for each.
Strings
# Basic get/set
r.set("user:1:name", "Alice")
r.get("user:1:name") # "Alice"
# Atomic increment (perfect for counters)
r.incr("page:views:home")
r.incrby("page:views:home", 5)
# Set with expiry (seconds)
r.setex("session:abc123", 3600, "user_id=42")
# Bulk operations — faster than N individual calls
r.mset({"k1": "v1", "k2": "v2"})
r.mget(["k1", "k2"]) # ["v1", "v2"]
Lists
# Task queue — producer pushes left, consumer pops right
r.lpush("tasks:email", "task:101", "task:102")
task = r.brpop("tasks:email", timeout=10) # blocking pop
# Append-only log
r.rpush("events:user:1", "login", "purchase", "logout")
r.lrange("events:user:1", 0, -1) # all items
Hashes
# Store an object without JSON serialisation overhead
r.hset("user:1", mapping={
"name": "Alice",
"email": "alice@example.com",
"plan": "pro",
})
r.hget("user:1", "plan") # "pro"
r.hgetall("user:1") # full dict
r.hincrby("user:1", "login_count", 1)
Sets and Sorted Sets
# Unique tags per article
r.sadd("article:42:tags", "python", "redis", "caching")
r.smembers("article:42:tags")
# Leaderboard: sorted set with scores
r.zadd("leaderboard", {"alice": 9800, "bob": 8700, "carol": 9100})
r.zrevrange("leaderboard", 0, 2, withscores=True) # top 3
r.zincrby("leaderboard", 150, "bob") # update score
Caching Patterns: Cache-Aside and TTL Management
Cache-aside (lazy loading) is the most common pattern. The application checks the cache first; on a miss it fetches from the database and populates the cache.
import json
import hashlib
from functools import wraps
def cache_aside(ttl=300):
"""Decorator implementing cache-aside with JSON serialisation."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Build a deterministic cache key
key_data = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = "cache:" + hashlib.md5(key_data.encode()).hexdigest()
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cache_aside(ttl=600)
def get_user_profile(user_id: int) -> dict:
# Expensive DB call only happens on cache miss
return db.query("SELECT * FROM users WHERE id = %s", user_id)
v2:user:profile:{id} and increment the version in a config key when you need to flush the entire cache namespace without calling FLUSHDB.
Write-Through Pattern
def update_user(user_id: int, data: dict):
# Write to DB first, then update cache atomically
db.execute("UPDATE users SET ... WHERE id=%s", user_id)
r.hset(f"user:{user_id}", mapping=data)
r.expire(f"user:{user_id}", 3600)
Session Storage
import secrets
import json
SESSION_TTL = 86400 # 24 hours
def create_session(user_id: int, metadata: dict) -> str:
session_id = secrets.token_urlsafe(32)
payload = {"user_id": user_id, **metadata}
r.setex(f"session:{session_id}", SESSION_TTL, json.dumps(payload))
return session_id
def get_session(session_id: str) -> dict | None:
data = r.get(f"session:{session_id}")
if not data:
return None
# Sliding expiry — reset TTL on every access
r.expire(f"session:{session_id}", SESSION_TTL)
return json.loads(data)
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
Distributed Locking with SET NX
When multiple workers must not process the same job simultaneously, a Redis lock provides a lightweight mutex without a dedicated lock server.
import uuid
import time
import contextlib
@contextlib.contextmanager
def redis_lock(lock_name: str, timeout: int = 10, retry_delay: float = 0.1):
"""
Context manager for a Redis distributed lock.
Uses SET NX PX (atomic set-if-not-exists with millisecond expiry).
"""
lock_key = f"lock:{lock_name}"
lock_value = str(uuid.uuid4()) # unique per caller — prevents accidental release
deadline = time.monotonic() + timeout
acquired = False
try:
while time.monotonic() < deadline:
acquired = r.set(
lock_key, lock_value,
nx=True, # only set if key does not exist
px=timeout * 1000 # auto-expire in ms (safety net for crashes)
)
if acquired:
break
time.sleep(retry_delay)
if not acquired:
raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout}s")
yield
finally:
if acquired:
# Lua script: only delete if we own the lock (atomic check-and-delete)
lua = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
r.eval(lua, 1, lock_key, lock_value)
# Usage
with redis_lock("process:order:42", timeout=30):
process_order(42)
redis-py-redlock package), which acquires locks on N/2+1 independent Redis nodes.
Pub/Sub Messaging
# Publisher
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
publish_event("notifications:user:42", {
"type": "order_shipped",
"order_id": 1001,
"ts": "2026-06-05T10:00:00Z"
})
# Subscriber — runs in a background thread
def message_handler(message):
if message["type"] == "message":
payload = json.loads(message["data"])
print(f"Received: {payload}")
pubsub = r.pubsub()
pubsub.subscribe(**{"notifications:user:42": message_handler})
# Non-blocking thread
thread = pubsub.run_in_thread(sleep_time=0.01, daemon=True)
# Pattern subscriptions (wildcard)
pubsub.psubscribe("notifications:user:*")
Pub/Sub is fire-and-forget: messages delivered to offline subscribers are lost. Use Redis Streams for durable message delivery.
Redis Streams for Event Sourcing
# Producer — append to stream
event_id = r.xadd("orders", {
"order_id": "1001",
"user_id": "42",
"total": "149.99",
"status": "placed",
})
print(event_id) # "1717500000000-0"
# Create consumer group for distributed consumption
r.xgroup_create("orders", "fulfillment-workers", id="0", mkstream=True)
# Consumer — read new messages (blocking)
messages = r.xreadgroup(
groupname="fulfillment-workers",
consumername="worker-1",
streams={"orders": ">"}, # ">" means undelivered messages only
count=10,
block=5000, # wait up to 5s
)
for stream_name, entries in (messages or []):
for entry_id, fields in entries:
print(f"Processing order {fields['order_id']}")
# Acknowledge after successful processing
r.xack("orders", "fulfillment-workers", entry_id)
# Check pending (unacknowledged) messages
pending = r.xpending("orders", "fulfillment-workers")
print(pending)
RedisJSON and RediSearch
RedisJSON — Store and Query JSON Documents
from redis.commands.json.path import Path
# Store a JSON document
r.json().set("product:1", Path.root_path(), {
"name": "Mechanical Keyboard",
"price": 129.99,
"tags": ["electronics", "peripherals"],
"specs": {"switches": "Cherry MX Blue", "layout": "TKL"}
})
# Atomic update of a nested field
r.json().set("product:1", "$.price", 119.99)
r.json().arrappend("product:1", "$.tags", "gaming")
# Retrieve specific path
price = r.json().get("product:1", "$.price") # [119.99]
RediSearch — Full-Text Search
from redis.commands.search.field import TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
# Create an index on JSON documents
schema = (
TextField("$.name", as_name="name"),
NumericField("$.price", as_name="price"),
TagField("$.tags[*]", as_name="tags"),
)
r.ft("products").create_index(
schema,
definition=IndexDefinition(prefix=["product:"], index_type=IndexType.JSON)
)
# Full-text search
results = r.ft("products").search(
Query("keyboard").add_filter(
# price between 50 and 200
).sort_by("price")
)
for doc in results.docs:
print(doc.name, doc.price)
Connection Pooling in Production
import redis
# Create a shared pool (do this at module level / app startup)
pool = redis.ConnectionPool(
host="redis.internal",
port=6379,
db=0,
password="secret",
max_connections=50,
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30,
)
# Each request gets a connection from the pool, returns it on exit
def get_redis():
return redis.Redis(connection_pool=pool)
# With Sentinel (high-availability setup)
from redis.sentinel import Sentinel
sentinel = Sentinel(
[("sentinel1", 26379), ("sentinel2", 26379), ("sentinel3", 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for("mymaster", socket_timeout=0.5, decode_responses=True)
slave = sentinel.slave_for("mymaster", socket_timeout=0.5, decode_responses=True)
max_connections equal to your application's thread/coroutine concurrency. Exceeding this raises ConnectionError. Use health_check_interval to detect stale connections in long-lived processes.
Frequently Asked Questions
What is the difference between redis-py and aioredis?
Since redis-py 4.2, async support is built in as redis.asyncio — the standalone aioredis package is now deprecated. Use redis.asyncio.Redis or redis.asyncio.from_url() for async/await applications.
How do I handle Redis connection failures gracefully?
Wrap Redis calls in try/except for redis.exceptions.ConnectionError and redis.exceptions.TimeoutError. Use a circuit-breaker library (e.g., pybreaker) to stop hammering a down Redis server and fall back to the database.
Is Redis Pub/Sub suitable for production event pipelines?
Pub/Sub is fine for ephemeral notifications (live dashboards, chat). For durable, at-least-once delivery — especially across restarts — use Redis Streams with consumer groups, which provide message acknowledgment and replay.
How do I pipeline multiple commands to reduce round-trips?
pipe = r.pipeline()
pipe.hset("user:1", mapping={"name": "Alice"})
pipe.expire("user:1", 3600)
pipe.incr("stats:users:total")
pipe.execute() # single round-trip
What is the best key naming convention for Redis?
Use colon-separated namespaces: object-type:id:field, e.g. user:42:profile, session:abc123, cache:product:list:page:1. Keep keys short — every byte of key name is stored in memory per key. Avoid spaces and special characters.