Python Rate Limiting: Token Bucket and Sliding Window
Rate limiting controls how many requests a client can make to your API within a time window, protecting services from abuse, accidental overload, and denial-of-service attacks. Python provides multiple approaches: in-memory limiters for single-process applications, Redis-backed distributed limiters for multi-instance deployments, and battle-tested libraries like slowapi that integrate directly into FastAPI and Flask. Understanding the underlying algorithms — token bucket, sliding window, and fixed window — helps you choose the right strategy for each endpoint.
Table of Contents
Rate Limiting Algorithms Compared
Each algorithm offers different trade-offs between burst handling, memory usage, and accuracy. The fixed window counter is simplest but allows burst traffic at window boundaries. The sliding window log is accurate but memory-intensive. The token bucket and sliding window counter offer the best balance of accuracy, burst allowance, and efficiency.
Algorithm | Burst handling | Memory | Accuracy | Complexity
------------------|----------------|---------|----------|-----------
Fixed Window | Poor (2x burst)| O(1) | Low | Very Low
Sliding Window Log| No burst | O(n) | Exact | Medium
Sliding Window | Approximate | O(1) | High | Low
Token Bucket | Controlled | O(1) | High | Low
Leaky Bucket | None | O(1) | Exact | Low
Token Bucket Implementation
The token bucket algorithm maintains a bucket that fills with tokens at a constant rate up to a maximum capacity. Each request consumes one token. If the bucket is empty, the request is rejected. This naturally allows short bursts (up to the bucket capacity) while enforcing a long-term average rate. It's the algorithm used by most cloud providers (AWS API Gateway, Stripe, GitHub).
import asyncio
import time
from dataclasses import dataclass, field
from collections import defaultdict
import threading
@dataclass
class TokenBucket:
"""Thread-safe token bucket rate limiter."""
capacity: float # max tokens
refill_rate: float # tokens added per second
_tokens: float = field(init=False)
_last_refill: float = field(init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def __post_init__(self):
self._tokens = self.capacity
self._last_refill = time.monotonic()
def _refill(self):
now = time.monotonic()
elapsed = now - self._last_refill
added = elapsed * self.refill_rate
self._tokens = min(self.capacity, self._tokens + added)
self._last_refill = now
def consume(self, tokens: float = 1.0) -> bool:
"""Return True if request is allowed, False if rate limited."""
with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
@property
def available(self) -> float:
with self._lock:
self._refill()
return self._tokens
# Per-client rate limiter using a dict of buckets
class RateLimiter:
def __init__(self, capacity: int = 10, rate: float = 1.0):
self.capacity = capacity
self.rate = rate
self._buckets: dict[str, TokenBucket] = defaultdict(
lambda: TokenBucket(capacity=self.capacity, refill_rate=self.rate)
)
def is_allowed(self, client_id: str) -> bool:
return self._buckets[client_id].consume()
# Usage
limiter = RateLimiter(capacity=10, rate=2.0) # burst 10, refill 2/sec
for i in range(15):
allowed = limiter.is_allowed("user-123")
print(f"Request {i+1}: {'OK' if allowed else 'RATE LIMITED'}")
time.sleep(0.1)
Sliding Window with Redis
Redis-backed sliding window rate limiting works across multiple application instances and survives process restarts. The sliding window counter stores the count in a sorted set keyed by client ID, with request timestamps as scores. A Lua script runs the check atomically on the Redis server, preventing race conditions that would occur if the check and update were two separate commands.
import redis.asyncio as aioredis
import time
redis_client = aioredis.from_url("redis://localhost:6379")
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local request_id = ARGV[4]
-- Remove timestamps outside the window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Count requests in the current window
local count = redis.call('ZCARD', key)
if count < limit then
-- Add this request
redis.call('ZADD', key, now, request_id)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return {1, limit - count - 1, 0} -- allowed, remaining, retry_after
else
-- Find when the oldest request will expire
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = math.ceil((tonumber(oldest[2]) + window - now) / 1000)
return {0, 0, retry_after} -- denied, remaining, retry_after
end
"""
_script = None
async def sliding_window_rate_limit(
client_id: str,
limit: int = 100,
window_ms: int = 60_000, # 1 minute
) -> tuple[bool, int, int]:
"""
Returns (allowed, remaining, retry_after_seconds).
Uses atomic Lua script to prevent race conditions.
"""
global _script
if _script is None:
_script = redis_client.register_script(SLIDING_WINDOW_SCRIPT)
now_ms = int(time.time() * 1000)
request_id = f"{now_ms}-{id(object())}"
key = f"ratelimit:{client_id}"
result = await _script(
keys=[key],
args=[now_ms, window_ms, limit, request_id]
)
allowed = bool(result[0])
remaining = int(result[1])
retry_after = int(result[2])
return allowed, remaining, retry_after
slowapi with FastAPI
slowapi is a rate limiting library for FastAPI and Starlette, inspired by Flask-Limiter. It uses the limits library under the hood, which supports multiple storage backends (in-memory, Redis, memcached). Decorating routes with @limiter.limit() is the simplest way to add rate limiting without writing middleware from scratch.
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# Use IP address as the key by default
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# Use Redis for distributed rate limiting
from limits.storage import RedisStorage
limiter_with_redis = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
)
@app.get("/api/public")
@limiter.limit("100/minute")
async def public_endpoint(request: Request):
return {"message": "OK"}
@app.get("/api/search")
@limiter.limit("10/minute;50/hour") # multiple limits
async def search_endpoint(request: Request, q: str):
return {"query": q, "results": []}
@app.post("/api/login")
@limiter.limit("5/minute") # strict limit on auth endpoints
async def login_endpoint(request: Request):
return {"token": "..."}
# Custom key function — rate limit per authenticated user
def get_user_id(request: Request) -> str:
user = getattr(request.state, "user", None)
if user:
return f"user:{user.id}"
return get_remote_address(request)
user_limiter = Limiter(key_func=get_user_id)
Per-User and Per-Endpoint Limits
Production APIs typically enforce different limits per user tier (free vs paid) and per endpoint sensitivity (read vs write, public vs authenticated). This requires a dynamic limit function that reads the user's plan from the database or JWT claims and returns the appropriate limit string.
from fastapi import Depends
from functools import lru_cache
PLAN_LIMITS = {
"free": {"api": "60/hour", "search": "10/minute"},
"pro": {"api": "1000/hour", "search": "100/minute"},
"enterprise": {"api": "10000/hour","search": "1000/minute"},
}
def get_api_limit(request: Request) -> str:
"""Dynamic limit based on authenticated user's plan."""
user = getattr(request.state, "user", None)
plan = user.plan if user else "free"
return PLAN_LIMITS.get(plan, PLAN_LIMITS["free"])["api"]
# FastAPI middleware that enforces per-user limits
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class DynamicRateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
user_id = getattr(request.state, "user_id", request.client.host)
endpoint = request.url.path
allowed, remaining, retry_after = await sliding_window_rate_limit(
client_id=f"{user_id}:{endpoint}",
limit=self._get_limit(request),
window_ms=60_000,
)
if not allowed:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"},
headers={
"Retry-After": str(retry_after),
"X-RateLimit-Remaining": "0",
},
)
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response
def _get_limit(self, request: Request) -> int:
user = getattr(request.state, "user", None)
plan = user.plan if user else "free"
return {"free": 60, "pro": 1000, "enterprise": 10000}.get(plan, 60)
Rate Limit Response Headers
Standard rate limit headers let clients handle throttling gracefully by backing off before hitting the limit. The RateLimit headers draft standard (IETF draft-polli-ratelimit-headers) proposes RateLimit-Limit, RateLimit-Remaining, and RateLimit-Reset. Always include Retry-After in 429 responses so clients know exactly when to retry.
from fastapi import Response
from datetime import datetime, timezone
def add_rate_limit_headers(
response: Response,
limit: int,
remaining: int,
reset_at: datetime,
retry_after: int | None = None,
):
"""Add standard rate limit headers to a response."""
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
if retry_after is not None:
response.headers["Retry-After"] = str(retry_after)
# 429 response with proper headers
def rate_limit_response(retry_after: int) -> JSONResponse:
return JSONResponse(
status_code=429,
content={
"detail": "Too Many Requests",
"retry_after": retry_after,
},
headers={
"Retry-After": str(retry_after),
"Content-Type": "application/json",
},
)
Testing Rate Limiters
Rate limiting logic is critical enough to warrant dedicated unit tests. Test that the limiter allows requests under the limit, blocks requests over the limit, and resets correctly after the window expires. Use freezegun or time.monotonic patching to control time in tests without actually sleeping.
import pytest
from unittest.mock import patch
import time
@pytest.fixture
def bucket():
return TokenBucket(capacity=5, refill_rate=1.0)
def test_allows_requests_within_limit(bucket):
for _ in range(5):
assert bucket.consume() is True
def test_blocks_when_empty(bucket):
for _ in range(5):
bucket.consume()
assert bucket.consume() is False # 6th request denied
def test_refills_over_time(bucket):
for _ in range(5):
bucket.consume() # drain the bucket
# Simulate 3 seconds passing
with patch("time.monotonic", return_value=time.monotonic() + 3):
bucket._refill()
assert bucket._tokens == pytest.approx(3.0, abs=0.1)
assert bucket.consume() is True
@pytest.mark.asyncio
async def test_sliding_window_redis():
"""Integration test with real Redis (requires running instance)."""
client_id = "test-user-123"
# Clean up from previous runs
await redis_client.delete(f"ratelimit:{client_id}")
for i in range(5):
allowed, remaining, _ = await sliding_window_rate_limit(
client_id, limit=5, window_ms=60_000
)
assert allowed is True
assert remaining == 4 - i
allowed, remaining, retry_after = await sliding_window_rate_limit(
client_id, limit=5, window_ms=60_000
)
assert allowed is False
assert retry_after > 0