Python WebSockets: Real-Time Communication with FastAPI
WebSockets provide a persistent, full-duplex connection between client and server — perfect for chat applications, live dashboards, collaborative editing, and real-time notifications. FastAPI supports WebSockets natively with @app.websocket(), giving you the same async programming model as HTTP endpoints. This guide covers the fundamentals, connection management, room-based chat, JWT authentication, and horizontal scaling with Redis pub/sub.
Table of Contents
WebSocket Basics in FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# Accept the WebSocket handshake
await websocket.accept()
try:
while True:
# Receive text or bytes
data = await websocket.receive_text()
message = json.loads(data)
# Echo back with a timestamp
import time
response = {"echo": message, "ts": time.time()}
await websocket.send_text(json.dumps(response))
except WebSocketDisconnect:
print("Client disconnected")
# Send different data types
@app.websocket("/ws/binary")
async def binary_endpoint(websocket: WebSocket):
await websocket.accept()
async for data in websocket.iter_bytes():
# Process binary data (images, audio, etc.)
await websocket.send_bytes(data)
Connection Manager
A connection manager tracks active WebSocket connections and provides broadcast capabilities. Keep one manager instance per application (or per room for multi-room apps).
from typing import Optional
import asyncio
class ConnectionManager:
def __init__(self):
# Map of connection_id -> WebSocket
self._connections: dict[str, WebSocket] = {}
self._lock = asyncio.Lock()
async def connect(self, connection_id: str, ws: WebSocket) -> None:
await ws.accept()
async with self._lock:
self._connections[connection_id] = ws
async def disconnect(self, connection_id: str) -> None:
async with self._lock:
self._connections.pop(connection_id, None)
async def send_to(self, connection_id: str, data: dict) -> bool:
ws = self._connections.get(connection_id)
if ws:
try:
await ws.send_json(data)
return True
except Exception:
await self.disconnect(connection_id)
return False
async def broadcast(self, data: dict, exclude: Optional[str] = None) -> None:
dead = []
for cid, ws in list(self._connections.items()):
if cid == exclude:
continue
try:
await ws.send_json(data)
except Exception:
dead.append(cid)
for cid in dead:
await self.disconnect(cid)
@property
def count(self) -> int:
return len(self._connections)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(client_id: str, ws: WebSocket):
await manager.connect(client_id, ws)
await manager.broadcast({"type": "join", "user": client_id, "count": manager.count})
try:
while True:
data = await ws.receive_json()
await manager.broadcast({"type": "message", "from": client_id, "data": data})
except WebSocketDisconnect:
await manager.disconnect(client_id)
await manager.broadcast({"type": "leave", "user": client_id, "count": manager.count})
Room-Based Chat
from collections import defaultdict
class RoomManager:
def __init__(self):
# room_id -> {connection_id -> WebSocket}
self._rooms: dict[str, dict[str, WebSocket]] = defaultdict(dict)
async def join(self, room_id: str, user_id: str, ws: WebSocket) -> None:
await ws.accept()
self._rooms[room_id][user_id] = ws
await self.broadcast_room(room_id, {"type": "join", "user": user_id}, exclude=user_id)
async def leave(self, room_id: str, user_id: str) -> None:
self._rooms[room_id].pop(user_id, None)
if not self._rooms[room_id]:
del self._rooms[room_id]
else:
await self.broadcast_room(room_id, {"type": "leave", "user": user_id})
async def broadcast_room(self, room_id: str, data: dict, exclude: str = None) -> None:
dead = []
for uid, ws in list(self._rooms.get(room_id, {}).items()):
if uid == exclude:
continue
try:
await ws.send_json(data)
except Exception:
dead.append(uid)
for uid in dead:
self._rooms[room_id].pop(uid, None)
def room_members(self, room_id: str) -> list[str]:
return list(self._rooms.get(room_id, {}).keys())
room_manager = RoomManager()
@app.websocket("/ws/rooms/{room_id}/{user_id}")
async def room_endpoint(room_id: str, user_id: str, ws: WebSocket):
await room_manager.join(room_id, user_id, ws)
try:
while True:
data = await ws.receive_json()
await room_manager.broadcast_room(
room_id,
{"type": "message", "from": user_id, "text": data.get("text")},
)
except WebSocketDisconnect:
await room_manager.leave(room_id, user_id)
WebSocket Authentication
WebSocket connections cannot use HTTP headers for auth after the initial handshake. Authenticate via query parameters (for the initial connection) or via the first message after connecting.
from fastapi import Query, HTTPException
import jwt
def decode_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket, token: str = Query(...)):
# Validate JWT before accepting the connection
try:
claims = decode_token(token)
except jwt.InvalidTokenError:
await websocket.close(code=4001, reason="Unauthorized")
return
user_id = claims["sub"]
await websocket.accept()
await websocket.send_json({"type": "connected", "user_id": user_id})
try:
while True:
data = await websocket.receive_json()
# Process authenticated message
await websocket.send_json({"type": "ack", "received": data})
except WebSocketDisconnect:
pass
Scaling with Redis Pub/Sub
When running multiple FastAPI workers or instances, each process has its own in-memory connection manager. To broadcast across all instances, use Redis pub/sub: each instance subscribes to the same channel, and any message published by one instance is received by all.
import asyncio
import redis.asyncio as aioredis
import json
redis_client = aioredis.from_url("redis://localhost:6379")
async def redis_listener(channel: str, local_manager: ConnectionManager):
"""Subscribe to Redis channel and broadcast to local WebSocket connections."""
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await local_manager.broadcast(data)
# Start the listener as a background task on app startup
@app.on_event("startup")
async def startup():
asyncio.create_task(redis_listener("chat:global", manager))
@app.websocket("/ws/scalable/{client_id}")
async def scalable_ws(client_id: str, ws: WebSocket):
await manager.connect(client_id, ws)
try:
while True:
data = await ws.receive_json()
# Publish to Redis — all instances receive it
await redis_client.publish("chat:global", json.dumps({
"type": "message",
"from": client_id,
"text": data.get("text"),
}))
except WebSocketDisconnect:
await manager.disconnect(client_id)
JavaScript Client
// Reconnecting WebSocket client
class ChatClient {
constructor(url, token) {
this.url = `${url}?token=${token}`;
this.ws = null;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("Connected");
this.reconnectDelay = 1000; // reset backoff
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.onMessage(data);
};
this.ws.onclose = (event) => {
if (event.code !== 1000) { // not intentional close
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
}
};
}
send(text) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ text }));
}
}
onMessage(data) {
console.log("Received:", data);
}
disconnect() {
this.ws?.close(1000, "Client disconnect");
}
}
const client = new ChatClient("ws://localhost:8000/ws/secure", "your-jwt-token");
Error Handling and Reconnection
WebSocket connections drop due to network issues, timeouts, and server restarts. Implement heartbeats (ping/pong) to detect stale connections, and exponential backoff reconnection on the client.
import asyncio
from fastapi import WebSocket, WebSocketDisconnect
PING_INTERVAL = 30 # seconds
@app.websocket("/ws/heartbeat/{client_id}")
async def heartbeat_ws(client_id: str, ws: WebSocket):
await ws.accept()
async def send_ping():
while True:
await asyncio.sleep(PING_INTERVAL)
try:
await ws.send_json({"type": "ping"})
except Exception:
break
ping_task = asyncio.create_task(send_ping())
try:
while True:
data = await ws.receive_json()
if data.get("type") == "pong":
continue # client acknowledged heartbeat
await ws.send_json({"type": "ack", "received": data})
except WebSocketDisconnect:
pass
finally:
ping_task.cancel()
Frequently Asked Questions
- How many WebSocket connections can FastAPI handle?
- A single uvicorn worker handles thousands of concurrent WebSocket connections (typically 5,000–20,000) since each connection is just a coroutine in the event loop — no thread per connection. Scale horizontally for more capacity, using Redis pub/sub to synchronize state.
- Should I use WebSockets or Server-Sent Events (SSE)?
- Use SSE when communication is server-to-client only (notifications, live feeds) — SSE is simpler, works over plain HTTP, and reconnects automatically. Use WebSockets when you need bidirectional communication (chat, collaborative editing, games).
- How do I handle WebSocket message ordering?
- WebSocket messages within a single connection are always ordered. Across multiple connections or server instances, ordering is not guaranteed. Include a sequence number or timestamp in your messages and let the client sort if ordering matters.