Python JWT Authentication: Access Tokens and Refresh Flow
JSON Web Tokens (JWT) are the dominant mechanism for stateless API authentication. A short-lived access token grants API access; a long-lived refresh token lets clients obtain new access tokens without re-authentication. This guide covers PyJWT, token generation and validation, the refresh flow, FastAPI integration, and security pitfalls to avoid.
Table of Contents
JWT Anatomy
A JWT is three Base64URL-encoded sections joined by dots: header.payload.signature. The header specifies the algorithm. The payload (claims) carries the data — sub (subject), exp (expiry), iat (issued at), and any custom claims. The signature is an HMAC or RSA signature over the first two sections, preventing tampering. JWTs are not encrypted by default — anyone can read the payload. Never put secrets in a JWT.
import base64
import json
# Manually decode a JWT (without verification — for illustration only)
def decode_jwt_payload(token: str) -> dict:
payload_b64 = token.split(".")[1]
# JWT uses base64url (no padding) — add padding if needed
padding = 4 - len(payload_b64) % 4
payload_b64 += "=" * (padding % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64))
# Example output for a typical JWT:
# {
# "sub": "user_123",
# "iat": 1718265600,
# "exp": 1718269200,
# "type": "access",
# "roles": ["user"]
# }
PyJWT Basics: Encode and Decode
import jwt
import os
from datetime import datetime, timedelta, timezone
SECRET_KEY = os.environ["JWT_SECRET"] # min 32 random bytes
ALGORITHM = "HS256"
def create_token(payload: dict, expires_in: timedelta) -> str:
now = datetime.now(timezone.utc)
payload = {
**payload,
"iat": now,
"exp": now + expires_in,
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
try:
return jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM],
options={"require": ["exp", "iat", "sub"]},
)
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {e}")
# Usage
token = create_token({"sub": "user_123", "roles": ["admin"]}, timedelta(minutes=15))
claims = decode_token(token)
print(claims["sub"]) # user_123
print(claims["roles"]) # ['admin']
Access Token + Refresh Token Flow
The dual-token pattern uses a short-lived access token (15 minutes) for API requests and a long-lived refresh token (7–30 days) stored server-side. When the access token expires, the client uses the refresh token to get a new pair. Each refresh rotates the refresh token, invalidating the old one.
import secrets
from datetime import timedelta
ACCESS_TOKEN_EXPIRE = timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)
class TokenService:
def __init__(self, redis_client):
self.redis = redis_client
def create_token_pair(self, user_id: str, roles: list[str]) -> dict:
access_token = create_token(
{"sub": user_id, "roles": roles, "type": "access"},
ACCESS_TOKEN_EXPIRE,
)
refresh_token = secrets.token_urlsafe(48) # opaque random token
# Store refresh token in Redis with TTL
key = f"refresh:{refresh_token}"
self.redis.setex(key, int(REFRESH_TOKEN_EXPIRE.total_seconds()), user_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": int(ACCESS_TOKEN_EXPIRE.total_seconds()),
}
def rotate_refresh_token(self, old_refresh_token: str) -> dict:
key = f"refresh:{old_refresh_token}"
user_id = self.redis.get(key)
if not user_id:
raise ValueError("Invalid or expired refresh token")
# Delete old token (rotation — each refresh token is single-use)
self.redis.delete(key)
# Issue new pair
user_id = user_id.decode() if isinstance(user_id, bytes) else user_id
return self.create_token_pair(user_id, roles=self._get_roles(user_id))
def revoke_refresh_token(self, refresh_token: str) -> None:
self.redis.delete(f"refresh:{refresh_token}")
def _get_roles(self, user_id: str) -> list[str]:
# In production, look up roles from DB
return ["user"]
FastAPI Integration
FastAPI's dependency injection makes JWT authentication clean and reusable. Define a dependency that extracts and validates the token, then use it on any route that requires authentication.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
app = FastAPI()
security = HTTPBearer()
class TokenClaims(BaseModel):
sub: str
roles: list[str] = []
type: str = "access"
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenClaims:
token = credentials.credentials
try:
payload = decode_token(token)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
headers={"WWW-Authenticate": "Bearer"},
)
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Not an access token")
return TokenClaims(**payload)
def require_role(role: str):
def checker(user: TokenClaims = Depends(get_current_user)) -> TokenClaims:
if role not in user.roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return checker
@app.post("/auth/login")
async def login(username: str, password: str):
# Verify credentials against DB (simplified)
if username == "admin" and password == "secret":
token_service = TokenService(redis_client)
return token_service.create_token_pair("user_123", ["admin"])
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.post("/auth/refresh")
async def refresh(refresh_token: str):
try:
return token_service.rotate_refresh_token(refresh_token)
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
@app.get("/users/me")
async def get_me(user: TokenClaims = Depends(get_current_user)):
return {"user_id": user.sub, "roles": user.roles}
@app.delete("/users/{user_id}")
async def delete_user(
user_id: str,
admin: TokenClaims = Depends(require_role("admin")),
):
return {"deleted": user_id}
Token Blacklisting with Redis
Access tokens cannot be revoked before expiry by default — once issued, they're valid until expiry. For logout or security incidents, maintain a Redis blocklist of revoked JTI (JWT ID) claims.
import uuid
def create_access_token(user_id: str, roles: list[str]) -> str:
jti = str(uuid.uuid4()) # unique token ID
return create_token(
{"sub": user_id, "roles": roles, "type": "access", "jti": jti},
ACCESS_TOKEN_EXPIRE,
)
def is_token_revoked(jti: str, redis_client) -> bool:
return redis_client.exists(f"blocklist:{jti}") > 0
def revoke_token(jti: str, redis_client, ttl_seconds: int) -> None:
redis_client.setex(f"blocklist:{jti}", ttl_seconds, "1")
# Updated get_current_user with blocklist check
def get_current_user_checked(
credentials: HTTPAuthorizationCredentials = Depends(security),
redis=Depends(get_redis),
) -> TokenClaims:
payload = decode_token(credentials.credentials)
jti = payload.get("jti")
if jti and is_token_revoked(jti, redis):
raise HTTPException(status_code=401, detail="Token has been revoked")
return TokenClaims(**payload)
RS256 with RSA Key Pairs
HS256 uses a shared secret — whoever can verify can also create tokens. RS256 uses asymmetric keys: the private key signs tokens (only your auth server), the public key verifies them (all services). This is the right choice for multi-service architectures.
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
# Generate RSA key pair (do this once, store securely)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
public_key = private_key.public_key()
# Sign with private key (auth server only)
def create_rs256_token(payload: dict, expires_in: timedelta) -> str:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
claims = {**payload, "iat": now, "exp": now + expires_in}
return jwt.encode(claims, private_key, algorithm="RS256")
# Verify with public key (any service)
def verify_rs256_token(token: str) -> dict:
return jwt.decode(token, public_key, algorithms=["RS256"])
Security Best Practices
- Never log tokens — they're credentials. Redact
Authorizationheaders from access logs. - Use HTTPS always — JWTs sent over plain HTTP can be intercepted and replayed.
- Short access token lifetime — 15 minutes is standard. Don't use hours.
- Store refresh tokens server-side — opaque tokens in Redis, not JWTs. This allows revocation.
- Specify algorithm explicitly — always pass
algorithms=["HS256"]todecode(). Never usealgorithms=["none"]. - Include audience claim — use
audto prevent tokens for service A from being used on service B. - Rotate refresh tokens — one refresh token, one use. Reuse of an old token indicates theft; revoke the entire token family.
Frequently Asked Questions
- Should I store the refresh token in localStorage or a cookie?
- Use an
HttpOnly; Secure; SameSite=Strictcookie for the refresh token. This prevents JavaScript access (XSS protection) and automatic cross-site submission (CSRF protection). Store the short-lived access token in memory only. - How do I handle token refresh on the client side?
- Use an HTTP interceptor (Axios, fetch wrapper) that catches 401 responses, calls
/auth/refreshto get a new access token, then retries the original request transparently. - Can I put user permissions in the JWT?
- Yes, but only if you can accept permission changes taking effect after the current token expires. For immediate revocation of permissions, check a fast cache (Redis) on each request using the
subclaim as the key.