Python Rate Limiting: Token Bucket and Sliding Window
Rate limiting protects your API from abuse, prevents runaway clients from overloading your database, and enforces fair usage across tenants. Python offers several rate limiting strategies — fixed window, sliding window log, sliding window counter, and token bucket — each with different trade-offs in accuracy, memory, and fairness. This guide covers pure-Python implementations, Redis-backed distributed rate limiting, and integrating with FastAPI using slowapi.
Table of Contents
Rate Limiting Algorithms Compared
| Algorithm | Memory | Accuracy | Burst Handling | Best For |
|---|---|---|---|---|
| Fixed Window | O(1) | Low (boundary spikes) | Poor | Simple quotas |
| Sliding Window Log | O(requests) | High | Good | Accurate limiting |
| Sliding Window Counter | O(1) | Medium | Good | High-traffic APIs |
| Token Bucket | O(1) | High | Excellent | Burst-friendly APIs |
| Leaky Bucket | O(1) | High | None | Smooth output rate |
Token Bucket Implementation
The token bucket is the most practical rate limiting algorithm. Tokens accumulate at a fixed rate up to a maximum capacity. Each request consumes one token. Bursts are allowed up to the bucket capacity; after that, requests are rejected until tokens refill. This gives clients freedom to burst while enforcing a sustainable average rate.
import time
import threading
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""Thread-safe token bucket rate limiter."""
capacity: float # max tokens (burst size)
refill_rate: float # tokens per second
tokens: float = field(init=False)
last_refill: float = field(init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
def __post_init__(self):
self.tokens = self.capacity
self.last_refill = time.monotonic()
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
added = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + added)
self.last_refill = now
def consume(self, tokens: float = 1.0) -> bool:
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
@property
def available(self) -> float:
with self._lock:
self._refill()
return self.tokens
# Usage
limiter = TokenBucket(capacity=10, refill_rate=2) # 10 burst, 2/sec sustained
for i in range(15):
allowed = limiter.consume()
print(f"Request {i+1}: {'✓ allowed' if allowed else '✗ rejected'}")
# Per-client bucket (one bucket per IP/user)
class ClientRateLimiter:
def __init__(self, capacity: float, refill_rate: float):
self._buckets: dict[str, TokenBucket] = {}
self._lock = threading.Lock()
self.capacity = capacity
self.refill_rate = refill_rate
def is_allowed(self, client_id: str) -> bool:
with self._lock:
if client_id not in self._buckets:
self._buckets[client_id] = TokenBucket(self.capacity, self.refill_rate)
return self._buckets[client_id].consume()
rate_limiter = ClientRateLimiter(capacity=20, refill_rate=5)
print(rate_limiter.is_allowed("user_123")) # True
Sliding Window with Redis
For distributed systems where multiple app instances serve requests, use Redis for shared rate limit state. The sliding window log uses a Redis sorted set — request timestamps as scores — to count requests within the past N seconds.
import time
import redis.asyncio as aioredis
redis = aioredis.from_url("redis://localhost:6379")
async def sliding_window_check(
key: str,
limit: int,
window_seconds: int,
) -> tuple[bool, int, int]:
"""
Returns: (allowed, current_count, retry_after_seconds)
Uses a sorted set — score=timestamp, member=unique request ID.
"""
now = time.time()
window_start = now - window_seconds
pipe = redis.pipeline()
# Remove expired requests
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests
pipe.zcard(key)
# Add this request
pipe.zadd(key, {str(now): now})
# Set TTL
pipe.expire(key, window_seconds + 1)
results = await pipe.execute()
current_count = results[1]
if current_count >= limit:
return False, current_count, window_seconds
return True, current_count + 1, 0
# Redis Lua script for atomic fixed-window counter (faster, O(1) memory)
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return {0, current, redis.call('TTL', key)}
end
return {1, current, 0}
"""
async def fixed_window_check(key: str, limit: int, window: int):
script = redis.register_script(RATE_LIMIT_SCRIPT)
allowed, count, retry_after = await script(keys=[key], args=[limit, window])
return bool(allowed), count, retry_after
slowapi: FastAPI Rate Limiting
pip install slowapi
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address) # limit by IP
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return {"data": "your data here"}
@app.get("/api/search")
@limiter.limit("10/second;100/minute;1000/hour")
async def search(request: Request, q: str):
return {"results": []}
# Custom key: limit per authenticated user
def get_user_id(request: Request) -> str:
user = getattr(request.state, "user", None)
return str(user.id) if user else get_remote_address(request)
limiter_by_user = Limiter(key_func=get_user_id)
@app.get("/api/expensive")
@limiter_by_user.limit("10/minute")
async def expensive_op(request: Request):
return {"status": "ok"}
Per-User and Per-Endpoint Limits
import asyncio
from fastapi import FastAPI, Request, HTTPException, Depends
from typing import Annotated
app = FastAPI()
# Tiered rate limits: free vs paid
LIMITS = {
"free": {"requests_per_minute": 60, "requests_per_day": 1_000},
"starter": {"requests_per_minute": 300, "requests_per_day": 10_000},
"pro": {"requests_per_minute": 1000, "requests_per_day": 100_000},
}
async def check_rate_limit(request: Request, user=Depends(get_current_user)):
tier = getattr(user, "tier", "free")
limits = LIMITS[tier]
minute_key = f"rl:{user.id}:minute:{int(time.time() // 60)}"
day_key = f"rl:{user.id}:day:{int(time.time() // 86400)}"
async with redis.pipeline() as pipe:
pipe.incr(minute_key)
pipe.expire(minute_key, 61)
pipe.incr(day_key)
pipe.expire(day_key, 86401)
results = await pipe.execute()
minute_count, _, day_count, _ = results
if minute_count > limits["requests_per_minute"]:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded: too many requests per minute",
headers={
"X-RateLimit-Limit": str(limits["requests_per_minute"]),
"X-RateLimit-Remaining": "0",
"Retry-After": "60",
},
)
if day_count > limits["requests_per_day"]:
raise HTTPException(status_code=429, detail="Daily quota exceeded")
# Attach rate limit info to response via request state
request.state.rate_limit_remaining = limits["requests_per_minute"] - minute_count
@app.get("/api/query", dependencies=[Depends(check_rate_limit)])
async def run_query(request: Request, q: str):
return {"results": [], "remaining": request.state.rate_limit_remaining}
Client-Side Rate Limiting
import asyncio
import time
from collections import deque
class AsyncRateLimiter:
"""Client-side rate limiter for outgoing API calls."""
def __init__(self, calls: int, period: float):
self.calls = calls
self.period = period
self._timestamps: deque = deque()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
# Remove timestamps outside the window
while self._timestamps and now - self._timestamps[0] >= self.period:
self._timestamps.popleft()
if len(self._timestamps) >= self.calls:
# Wait until oldest timestamp expires
sleep_time = self.period - (now - self._timestamps[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self._timestamps.append(time.monotonic())
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
pass
# Usage: call an external API at most 10 req/sec
api_limiter = AsyncRateLimiter(calls=10, period=1.0)
async def call_external_api(url: str):
async with api_limiter:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
Rate Limit Headers
from fastapi import Response
from fastapi.middleware.base import BaseHTTPMiddleware
class RateLimitHeaderMiddleware(BaseHTTPMiddleware):
"""Add standard rate limit headers to all responses."""
async def dispatch(self, request, call_next):
response = await call_next(request)
# Populate from request state if set by rate limit dependency
limit = getattr(request.state, "rate_limit", 100)
remaining = getattr(request.state, "rate_limit_remaining", 100)
reset_time = getattr(request.state, "rate_limit_reset", int(time.time()) + 60)
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
response.headers["X-RateLimit-Reset"] = str(reset_time)
response.headers["X-RateLimit-Policy"] = f"{limit};w=60"
return response
app.add_middleware(RateLimitHeaderMiddleware)
Frequently Asked Questions
- What HTTP status code should I return for rate limit exceeded?
- Return
429 Too Many Requestswith aRetry-Afterheader indicating how many seconds to wait. IncludeX-RateLimit-Limit,X-RateLimit-Remaining, andX-RateLimit-Resetheaders on every response so clients can throttle proactively. - How do I handle rate limiting in a load-balanced environment?
- Use Redis as the shared counter store — never in-process memory. All app instances write to the same Redis keys, so the rate limit is enforced across the entire fleet regardless of which instance handles the request.
- How should I rate limit webhooks?
- Apply rate limits on the sending side (client) and implement idempotency on the receiving side. Use exponential backoff when you receive 429 responses. Implement a dead-letter queue for retrying failed deliveries.