Python Redis: Caching, Sessions and Pub/Sub with redis-py (2026)

Redis is the go-to in-memory data store for Python applications that need sub-millisecond latency — whether you are caching database query results, managing user sessions, coordinating distributed workers, or building a real-time event pipeline. The redis-py library is the official Python client, and this guide covers everything from basic data-type operations to production patterns like distributed locking, Redis Streams, and the RedisJSON and RediSearch modules.

Installation and Connection Setup

Install redis-py with optional extras for hiredis (C parser, ~10x faster) and async support:

pip install "redis[hiredis]>=5.0"
# For async with asyncio:
pip install "redis[hiredis,asyncio]>=5.0"

Connect to a local or remote Redis instance:

import redis

# Synchronous client
r = redis.Redis(
    host="localhost",
    port=6379,
    db=0,
    password="your-secret",
    decode_responses=True,   # return str instead of bytes
    socket_timeout=5,
    socket_connect_timeout=5,
)

# Verify connection
r.ping()  # returns True

# Async client (Python 3.11+)
import redis.asyncio as aioredis

async def get_async_client():
    return await aioredis.from_url(
        "redis://localhost:6379/0",
        decode_responses=True,
        max_connections=20,
    )
Tip: Always set decode_responses=True when storing string data. Without it, redis-py returns raw bytes and you must call .decode("utf-8") everywhere.

Core Data Type Operations

Redis has five primary data structures. Here are the most important Python operations for each.

Strings

# Basic get/set
r.set("user:1:name", "Alice")
r.get("user:1:name")           # "Alice"

# Atomic increment (perfect for counters)
r.incr("page:views:home")
r.incrby("page:views:home", 5)

# Set with expiry (seconds)
r.setex("session:abc123", 3600, "user_id=42")

# Bulk operations — faster than N individual calls
r.mset({"k1": "v1", "k2": "v2"})
r.mget(["k1", "k2"])            # ["v1", "v2"]

Lists

# Task queue — producer pushes left, consumer pops right
r.lpush("tasks:email", "task:101", "task:102")
task = r.brpop("tasks:email", timeout=10)  # blocking pop

# Append-only log
r.rpush("events:user:1", "login", "purchase", "logout")
r.lrange("events:user:1", 0, -1)  # all items

Hashes

# Store an object without JSON serialisation overhead
r.hset("user:1", mapping={
    "name": "Alice",
    "email": "alice@example.com",
    "plan": "pro",
})

r.hget("user:1", "plan")       # "pro"
r.hgetall("user:1")            # full dict
r.hincrby("user:1", "login_count", 1)

Sets and Sorted Sets

# Unique tags per article
r.sadd("article:42:tags", "python", "redis", "caching")
r.smembers("article:42:tags")

# Leaderboard: sorted set with scores
r.zadd("leaderboard", {"alice": 9800, "bob": 8700, "carol": 9100})
r.zrevrange("leaderboard", 0, 2, withscores=True)  # top 3
r.zincrby("leaderboard", 150, "bob")               # update score

Caching Patterns: Cache-Aside and TTL Management

Cache-aside (lazy loading) is the most common pattern. The application checks the cache first; on a miss it fetches from the database and populates the cache.

import json
import hashlib
from functools import wraps

def cache_aside(ttl=300):
    """Decorator implementing cache-aside with JSON serialisation."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Build a deterministic cache key
            key_data = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
            cache_key = "cache:" + hashlib.md5(key_data.encode()).hexdigest()

            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)

            result = func(*args, **kwargs)
            r.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator


@cache_aside(ttl=600)
def get_user_profile(user_id: int) -> dict:
    # Expensive DB call only happens on cache miss
    return db.query("SELECT * FROM users WHERE id = %s", user_id)
Cache Invalidation: Use a version-prefix strategy for bulk invalidation. Set keys as v2:user:profile:{id} and increment the version in a config key when you need to flush the entire cache namespace without calling FLUSHDB.

Write-Through Pattern

def update_user(user_id: int, data: dict):
    # Write to DB first, then update cache atomically
    db.execute("UPDATE users SET ... WHERE id=%s", user_id)
    r.hset(f"user:{user_id}", mapping=data)
    r.expire(f"user:{user_id}", 3600)

Session Storage

import secrets
import json

SESSION_TTL = 86400  # 24 hours

def create_session(user_id: int, metadata: dict) -> str:
    session_id = secrets.token_urlsafe(32)
    payload = {"user_id": user_id, **metadata}
    r.setex(f"session:{session_id}", SESSION_TTL, json.dumps(payload))
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.get(f"session:{session_id}")
    if not data:
        return None
    # Sliding expiry — reset TTL on every access
    r.expire(f"session:{session_id}", SESSION_TTL)
    return json.loads(data)

def delete_session(session_id: str):
    r.delete(f"session:{session_id}")

Distributed Locking with SET NX

When multiple workers must not process the same job simultaneously, a Redis lock provides a lightweight mutex without a dedicated lock server.

import uuid
import time
import contextlib

@contextlib.contextmanager
def redis_lock(lock_name: str, timeout: int = 10, retry_delay: float = 0.1):
    """
    Context manager for a Redis distributed lock.
    Uses SET NX PX (atomic set-if-not-exists with millisecond expiry).
    """
    lock_key = f"lock:{lock_name}"
    lock_value = str(uuid.uuid4())  # unique per caller — prevents accidental release
    deadline = time.monotonic() + timeout

    acquired = False
    try:
        while time.monotonic() < deadline:
            acquired = r.set(
                lock_key, lock_value,
                nx=True,           # only set if key does not exist
                px=timeout * 1000  # auto-expire in ms (safety net for crashes)
            )
            if acquired:
                break
            time.sleep(retry_delay)

        if not acquired:
            raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout}s")

        yield

    finally:
        if acquired:
            # Lua script: only delete if we own the lock (atomic check-and-delete)
            lua = """
            if redis.call('GET', KEYS[1]) == ARGV[1] then
                return redis.call('DEL', KEYS[1])
            else
                return 0
            end
            """
            r.eval(lua, 1, lock_key, lock_value)


# Usage
with redis_lock("process:order:42", timeout=30):
    process_order(42)
Tip: For higher reliability across Redis cluster failover, consider the Redlock algorithm (implemented in the redis-py-redlock package), which acquires locks on N/2+1 independent Redis nodes.

Pub/Sub Messaging

# Publisher
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

publish_event("notifications:user:42", {
    "type": "order_shipped",
    "order_id": 1001,
    "ts": "2026-06-05T10:00:00Z"
})

# Subscriber — runs in a background thread
def message_handler(message):
    if message["type"] == "message":
        payload = json.loads(message["data"])
        print(f"Received: {payload}")

pubsub = r.pubsub()
pubsub.subscribe(**{"notifications:user:42": message_handler})

# Non-blocking thread
thread = pubsub.run_in_thread(sleep_time=0.01, daemon=True)

# Pattern subscriptions (wildcard)
pubsub.psubscribe("notifications:user:*")

Pub/Sub is fire-and-forget: messages delivered to offline subscribers are lost. Use Redis Streams for durable message delivery.

Redis Streams for Event Sourcing

# Producer — append to stream
event_id = r.xadd("orders", {
    "order_id": "1001",
    "user_id": "42",
    "total": "149.99",
    "status": "placed",
})
print(event_id)  # "1717500000000-0"

# Create consumer group for distributed consumption
r.xgroup_create("orders", "fulfillment-workers", id="0", mkstream=True)

# Consumer — read new messages (blocking)
messages = r.xreadgroup(
    groupname="fulfillment-workers",
    consumername="worker-1",
    streams={"orders": ">"},  # ">" means undelivered messages only
    count=10,
    block=5000,  # wait up to 5s
)

for stream_name, entries in (messages or []):
    for entry_id, fields in entries:
        print(f"Processing order {fields['order_id']}")
        # Acknowledge after successful processing
        r.xack("orders", "fulfillment-workers", entry_id)

# Check pending (unacknowledged) messages
pending = r.xpending("orders", "fulfillment-workers")
print(pending)

RedisJSON and RediSearch

RedisJSON — Store and Query JSON Documents

from redis.commands.json.path import Path

# Store a JSON document
r.json().set("product:1", Path.root_path(), {
    "name": "Mechanical Keyboard",
    "price": 129.99,
    "tags": ["electronics", "peripherals"],
    "specs": {"switches": "Cherry MX Blue", "layout": "TKL"}
})

# Atomic update of a nested field
r.json().set("product:1", "$.price", 119.99)
r.json().arrappend("product:1", "$.tags", "gaming")

# Retrieve specific path
price = r.json().get("product:1", "$.price")  # [119.99]

RediSearch — Full-Text Search

from redis.commands.search.field import TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

# Create an index on JSON documents
schema = (
    TextField("$.name", as_name="name"),
    NumericField("$.price", as_name="price"),
    TagField("$.tags[*]", as_name="tags"),
)
r.ft("products").create_index(
    schema,
    definition=IndexDefinition(prefix=["product:"], index_type=IndexType.JSON)
)

# Full-text search
results = r.ft("products").search(
    Query("keyboard").add_filter(
        # price between 50 and 200
    ).sort_by("price")
)
for doc in results.docs:
    print(doc.name, doc.price)

Connection Pooling in Production

import redis

# Create a shared pool (do this at module level / app startup)
pool = redis.ConnectionPool(
    host="redis.internal",
    port=6379,
    db=0,
    password="secret",
    max_connections=50,
    decode_responses=True,
    socket_keepalive=True,
    socket_keepalive_options={},
    health_check_interval=30,
)

# Each request gets a connection from the pool, returns it on exit
def get_redis():
    return redis.Redis(connection_pool=pool)

# With Sentinel (high-availability setup)
from redis.sentinel import Sentinel

sentinel = Sentinel(
    [("sentinel1", 26379), ("sentinel2", 26379), ("sentinel3", 26379)],
    socket_timeout=0.5,
)
master = sentinel.master_for("mymaster", socket_timeout=0.5, decode_responses=True)
slave = sentinel.slave_for("mymaster", socket_timeout=0.5, decode_responses=True)
Production Tip: Set max_connections equal to your application's thread/coroutine concurrency. Exceeding this raises ConnectionError. Use health_check_interval to detect stale connections in long-lived processes.

Frequently Asked Questions

What is the difference between redis-py and aioredis?

Since redis-py 4.2, async support is built in as redis.asyncio — the standalone aioredis package is now deprecated. Use redis.asyncio.Redis or redis.asyncio.from_url() for async/await applications.

How do I handle Redis connection failures gracefully?

Wrap Redis calls in try/except for redis.exceptions.ConnectionError and redis.exceptions.TimeoutError. Use a circuit-breaker library (e.g., pybreaker) to stop hammering a down Redis server and fall back to the database.

Is Redis Pub/Sub suitable for production event pipelines?

Pub/Sub is fine for ephemeral notifications (live dashboards, chat). For durable, at-least-once delivery — especially across restarts — use Redis Streams with consumer groups, which provide message acknowledgment and replay.

How do I pipeline multiple commands to reduce round-trips?

pipe = r.pipeline()
pipe.hset("user:1", mapping={"name": "Alice"})
pipe.expire("user:1", 3600)
pipe.incr("stats:users:total")
pipe.execute()  # single round-trip

What is the best key naming convention for Redis?

Use colon-separated namespaces: object-type:id:field, e.g. user:42:profile, session:abc123, cache:product:list:page:1. Keep keys short — every byte of key name is stored in memory per key. Avoid spaces and special characters.