Python OAuth2 with FastAPI: Google and GitHub Login
OAuth2 social login lets users authenticate with existing accounts (Google, GitHub, Microsoft) rather than creating new credentials. The OAuth2 Authorization Code flow redirects users to the provider, which returns an authorization code that your server exchanges for an access token and user profile. FastAPI and Authlib make this clean and production-ready. This guide covers Google and GitHub OAuth2, PKCE for public clients, session management, and account linking.
Table of Contents
OAuth2 Authorization Code Flow
The Authorization Code flow is the most secure OAuth2 grant type for server-side applications. It uses a two-step exchange: the authorization code returned from the provider is short-lived and single-use, and the actual access token never travels through the browser. Understanding the flow helps debug issues and implement security checks like state validation.
"""
Authorization Code Flow steps:
1. User clicks "Login with Google"
2. Server generates state (random nonce) and stores in session
3. Server redirects user to Google auth URL with:
- client_id
- redirect_uri (must match registered URI)
- scope (what data you need: email, profile)
- state (CSRF protection)
- response_type=code
4. User authenticates with Google and grants permissions
5. Google redirects to /auth/google/callback?code=AUTH_CODE&state=STATE
6. Server validates state matches session (CSRF check)
7. Server exchanges code for access_token (server-to-server, no browser)
8. Server fetches user profile from Google API
9. Server creates/updates user in database
10. Server creates session or JWT and redirects user to app
"""
# Key security checks:
# - Validate state parameter (CSRF protection)
# - Verify the redirect_uri exactly matches what's registered
# - Use HTTPS for all redirect URIs in production
# - Store tokens server-side, never expose provider tokens to client
Setup: FastAPI + Authlib + Starlette Sessions
Authlib is the standard Python library for OAuth2 and OpenID Connect. It handles the authorization URL generation, state management, and token exchange. Starlette's session middleware (using signed cookies) stores the OAuth state and user session server-side.
pip install fastapi uvicorn authlib httpx starlette itsdangerous python-multipart
import os
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from authlib.integrations.starlette_client import OAuth
import httpx
app = FastAPI(title="OAuth2 Demo")
# Session middleware — signs cookie with secret key
app.add_middleware(
SessionMiddleware,
secret_key=os.environ["SESSION_SECRET"], # 32+ byte random secret
same_site="lax",
https_only=True, # True in production
)
# Configure OAuth providers
oauth = OAuth()
oauth.register(
name="google",
client_id=os.environ["GOOGLE_CLIENT_ID"],
client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
oauth.register(
name="github",
client_id=os.environ["GITHUB_CLIENT_ID"],
client_secret=os.environ["GITHUB_CLIENT_SECRET"],
authorize_url="https://github.com/login/oauth/authorize",
access_token_url="https://github.com/login/oauth/access_token",
userinfo_endpoint="https://api.github.com/user",
client_kwargs={"scope": "user:email"},
)
Google OAuth2 Integration
Google uses OpenID Connect (OIDC) on top of OAuth2, which means it returns a standardized ID token containing user identity claims alongside the access token. Authlib handles the OpenID Connect discovery document, token verification, and userinfo fetching automatically when you use the server_metadata_url.
@app.get("/auth/google/login")
async def google_login(request: Request):
"""Redirect user to Google OAuth2 consent screen."""
redirect_uri = str(request.url_for("google_callback"))
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/google/callback", name="google_callback")
async def google_callback(request: Request):
"""Handle the callback from Google with authorization code."""
try:
# Exchange code for token (validates state automatically)
token = await oauth.google.authorize_access_token(request)
except Exception as e:
raise HTTPException(status_code=400, detail=f"OAuth error: {e}")
# ID token contains user info (already verified by Authlib)
user_info = token.get("userinfo")
if not user_info:
# Fallback: fetch from userinfo endpoint
user_info = await oauth.google.userinfo(token=token)
google_user = {
"provider": "google",
"provider_id": user_info["sub"],
"email": user_info["email"],
"name": user_info.get("name"),
"picture": user_info.get("picture"),
"email_verified": user_info.get("email_verified", False),
}
# Create or update user in your database
user = await upsert_oauth_user(google_user)
# Create session
request.session["user_id"] = str(user["id"])
request.session["provider"] = "google"
return RedirectResponse(url="/dashboard")
async def upsert_oauth_user(oauth_data: dict) -> dict:
"""Create user if not exists, or return existing user."""
# Check by provider + provider_id (most reliable)
# user = await db.users.find_one({"provider": oauth_data["provider"],
# "provider_id": oauth_data["provider_id"]})
# if not user:
# # Also check by email (for account linking)
# user = await db.users.find_one({"email": oauth_data["email"]})
# if not user:
# user = await db.users.insert_one(oauth_data)
return {**oauth_data, "id": "user_123"}
GitHub OAuth2 Integration
GitHub's OAuth2 doesn't use OpenID Connect, so the user profile must be fetched separately from the GitHub API. GitHub users may also have private email addresses — a second API call to /user/emails is needed to retrieve the primary verified email.
@app.get("/auth/github/login")
async def github_login(request: Request):
redirect_uri = str(request.url_for("github_callback"))
return await oauth.github.authorize_redirect(request, redirect_uri)
@app.get("/auth/github/callback", name="github_callback")
async def github_callback(request: Request):
try:
token = await oauth.github.authorize_access_token(request)
except Exception as e:
raise HTTPException(status_code=400, detail=f"GitHub OAuth error: {e}")
access_token = token["access_token"]
# Fetch user profile
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json",
}
profile_resp = await client.get("https://api.github.com/user", headers=headers)
profile = profile_resp.json()
# GitHub may not expose email in public profile
email = profile.get("email")
if not email:
emails_resp = await client.get("https://api.github.com/user/emails", headers=headers)
emails = emails_resp.json()
# Find primary verified email
primary = next(
(e for e in emails if e.get("primary") and e.get("verified")), None
)
email = primary["email"] if primary else None
github_user = {
"provider": "github",
"provider_id": str(profile["id"]),
"email": email,
"name": profile.get("name") or profile.get("login"),
"picture": profile.get("avatar_url"),
"username": profile.get("login"),
"email_verified": True, # GitHub only exposes verified emails
}
user = await upsert_oauth_user(github_user)
request.session["user_id"] = str(user["id"])
request.session["provider"] = "github"
return RedirectResponse(url="/dashboard")
User Creation and Account Linking
Account linking allows users who signed up with email to later log in with Google, or who signed up with Google to also link their GitHub account. The key insight: email is the primary identity key. If a user with the same email already exists, link the new OAuth provider to that account rather than creating a duplicate.
from pydantic import BaseModel
from typing import Optional
class OAuthAccount(BaseModel):
provider: str # "google" | "github"
provider_id: str # Provider's unique user ID
access_token: str # Provider access token (optional storage)
class User(BaseModel):
id: str
email: str
name: Optional[str]
picture: Optional[str]
oauth_accounts: list[OAuthAccount] = []
created_at: str
async def find_or_create_user(oauth_data: dict) -> User:
"""Find existing user or create new one, handling account linking."""
# 1. Try to find by exact provider match (most reliable)
existing = await db.users.find_one({
"oauth_accounts": {
"$elemMatch": {
"provider": oauth_data["provider"],
"provider_id": oauth_data["provider_id"]
}
}
})
if existing:
return User(**existing)
# 2. Try to find by email (account linking)
if oauth_data.get("email") and oauth_data.get("email_verified"):
existing = await db.users.find_one({"email": oauth_data["email"]})
if existing:
# Link this OAuth account to existing user
await db.users.update_one(
{"_id": existing["_id"]},
{"$push": {"oauth_accounts": {
"provider": oauth_data["provider"],
"provider_id": oauth_data["provider_id"],
}}}
)
return User(**existing)
# 3. Create new user
new_user = {
"email": oauth_data.get("email"),
"name": oauth_data.get("name"),
"picture": oauth_data.get("picture"),
"oauth_accounts": [{
"provider": oauth_data["provider"],
"provider_id": oauth_data["provider_id"],
}],
}
result = await db.users.insert_one(new_user)
new_user["id"] = str(result.inserted_id)
return User(**new_user)
PKCE for Enhanced Security
PKCE (Proof Key for Code Exchange, pronounced "pixie") adds an extra layer of protection for public clients (SPAs, mobile apps) where a client secret cannot be stored securely. The client generates a random code_verifier, sends its SHA-256 hash (code_challenge) with the authorization request, and presents the original verifier during token exchange. An intercepted authorization code is useless without the verifier.
import hashlib
import base64
import secrets
import os
def generate_pkce_pair() -> tuple[str, str]:
"""Generate (code_verifier, code_challenge) for PKCE."""
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode()
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
return code_verifier, code_challenge
@app.get("/auth/google/pkce-login")
async def google_pkce_login(request: Request):
"""OAuth2 with PKCE — for SPA or mobile app backends."""
code_verifier, code_challenge = generate_pkce_pair()
request.session["pkce_verifier"] = code_verifier
redirect_uri = str(request.url_for("google_pkce_callback"))
return await oauth.google.authorize_redirect(
request,
redirect_uri,
code_challenge=code_challenge,
code_challenge_method="S256",
)
@app.get("/auth/google/pkce-callback", name="google_pkce_callback")
async def google_pkce_callback(request: Request):
code_verifier = request.session.pop("pkce_verifier", None)
if not code_verifier:
raise HTTPException(status_code=400, detail="Missing PKCE verifier")
token = await oauth.google.authorize_access_token(
request,
code_verifier=code_verifier,
)
user_info = token.get("userinfo") or await oauth.google.userinfo(token=token)
# ... rest of user handling
return {"email": user_info["email"]}
Production Patterns
Production OAuth2 requires careful handling of error states, session storage at scale, and token refresh for long-lived features. Common pitfalls include expired provider tokens when users return after weeks, CSRF attacks via forged state parameters, and open redirect vulnerabilities in the post-login redirect.
from fastapi import Depends
from fastapi.responses import JSONResponse
# Protected route using session
async def get_session_user(request: Request) -> dict:
user_id = request.session.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
user = await db.users.find_one({"_id": user_id})
if not user:
request.session.clear()
raise HTTPException(status_code=401, detail="User not found")
return user
@app.get("/dashboard")
async def dashboard(user: dict = Depends(get_session_user)):
return {"welcome": user["name"], "email": user["email"]}
@app.get("/auth/logout")
async def logout(request: Request):
request.session.clear()
return RedirectResponse(url="/")
# Safe redirect after login (prevent open redirect)
from urllib.parse import urlparse
ALLOWED_HOSTS = {"techoral.com", "www.techoral.com", "localhost"}
def safe_redirect_url(next_url: str, default: str = "/") -> str:
if not next_url:
return default
parsed = urlparse(next_url)
# Only allow relative URLs or known hosts
if parsed.netloc and parsed.netloc not in ALLOWED_HOSTS:
return default
return next_url
@app.get("/auth/google/login-v2")
async def google_login_v2(request: Request, next: str = "/"):
safe_next = safe_redirect_url(next)
request.session["next_url"] = safe_next
redirect_uri = str(request.url_for("google_callback_v2"))
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/google/callback-v2", name="google_callback_v2")
async def google_callback_v2(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get("userinfo")
# ... handle user
next_url = request.session.pop("next_url", "/")
return RedirectResponse(url=safe_redirect_url(next_url))