Python FastAPI Background Tasks and Lifespan Events

FastAPI provides three mechanisms for background work: the built-in BackgroundTasks for lightweight post-response work (sending emails, logging), the lifespan context manager for startup/shutdown hooks (opening DB pools, starting background workers), and asyncio.create_task() for long-running concurrent tasks that outlive a single request. This guide covers all three with practical patterns for periodic jobs, health checks, and resource lifecycle management.

BackgroundTasks

BackgroundTasks runs after the response is sent to the client. Use it for work that doesn't need to block the response: sending email confirmations, writing audit logs, updating search indexes. The tasks run in the same process and event loop — don't use for CPU-heavy work or tasks that must survive server restarts.

import asyncio
from fastapi import FastAPI, BackgroundTasks, Depends
from pydantic import BaseModel

app = FastAPI()


async def send_welcome_email(email: str, name: str):
    """Runs after response is sent — doesn't block the client."""
    await asyncio.sleep(0)  # yield to event loop
    print(f"Sending welcome email to {email} for {name}")
    # await email_client.send(to=email, subject="Welcome!", ...)


def log_signup(user_id: int, source: str):
    """Sync functions also work — run in a thread pool automatically."""
    print(f"Logging signup: user_id={user_id} source={source}")
    # analytics_db.insert(...)


class SignupRequest(BaseModel):
    email: str
    name: str
    source: str = "organic"


@app.post("/signup", status_code=201)
async def signup(req: SignupRequest, background_tasks: BackgroundTasks):
    # Create user synchronously
    user_id = await create_user(req.email, req.name)

    # Schedule background work (runs after this handler returns)
    background_tasks.add_task(send_welcome_email, req.email, req.name)
    background_tasks.add_task(log_signup, user_id, req.source)

    # Response returned immediately — email and log happen in background
    return {"user_id": user_id, "email": req.email}


async def create_user(email: str, name: str) -> int:
    return 42  # stub
BackgroundTasks limitations: Tasks run in the same process — if the server restarts, pending tasks are lost. For durability, use Celery or a dedicated task queue. Don't run CPU-intensive work with BackgroundTasks — it blocks the event loop; use asyncio.to_thread() or a thread pool instead.

Lifespan Context Manager

The lifespan parameter replaces the deprecated @app.on_event("startup") / @app.on_event("shutdown") decorators. Code before yield runs at startup; code after yield runs at shutdown. Use it to open connection pools, load ML models, start background consumers, and ensure clean teardown.

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx
import redis.asyncio as aioredis


# App-level shared resources
_http_client: httpx.AsyncClient | None = None
_redis: aioredis.Redis | None = None
_background_tasks: list[asyncio.Task] = []


@asynccontextmanager
async def lifespan(app: FastAPI):
    global _http_client, _redis

    # ---- STARTUP ----
    print("Starting up...")

    # HTTP client with connection pool
    _http_client = httpx.AsyncClient(
        timeout=httpx.Timeout(connect=5, read=30, write=10, pool=5),
        limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
    )

    # Redis connection pool
    _redis = aioredis.from_url("redis://localhost:6379", decode_responses=True, max_connections=20)

    # Start background worker
    task = asyncio.create_task(background_cache_warmer())
    _background_tasks.append(task)

    print("Startup complete")

    yield   # Application is running

    # ---- SHUTDOWN ----
    print("Shutting down...")

    # Cancel background tasks gracefully
    for t in _background_tasks:
        t.cancel()
    await asyncio.gather(*_background_tasks, return_exceptions=True)

    # Close connection pools
    await _http_client.aclose()
    await _redis.aclose()

    print("Shutdown complete")


app = FastAPI(lifespan=lifespan)


async def background_cache_warmer():
    """Long-running background coroutine started at app startup."""
    while True:
        try:
            print("Warming cache...")
            await asyncio.sleep(300)   # run every 5 minutes
        except asyncio.CancelledError:
            print("Cache warmer cancelled")
            break


# Access shared resources in handlers
@app.get("/status")
async def status():
    ping = await _redis.ping()
    return {"redis": "ok" if ping else "error"}

asyncio.create_task() for Long-Running Jobs

import asyncio
from fastapi import FastAPI, Request
import uuid

app = FastAPI()
_running_jobs: dict[str, asyncio.Task] = {}


async def long_running_report(job_id: str, params: dict):
    """Simulates a report generation job that takes minutes."""
    print(f"[{job_id}] Starting report generation")
    try:
        await asyncio.sleep(60)   # simulate work
        print(f"[{job_id}] Report complete")
        # Store result in Redis or DB
    except asyncio.CancelledError:
        print(f"[{job_id}] Job cancelled")
        raise


@app.post("/reports/generate")
async def generate_report(params: dict):
    job_id = str(uuid.uuid4())
    task = asyncio.create_task(long_running_report(job_id, params))
    _running_jobs[job_id] = task

    # Clean up completed tasks
    task.add_done_callback(lambda t: _running_jobs.pop(job_id, None))

    return {"job_id": job_id, "status": "running"}


@app.get("/reports/{job_id}/status")
async def job_status(job_id: str):
    task = _running_jobs.get(job_id)
    if not task:
        return {"status": "not_found_or_complete"}
    if task.done():
        exc = task.exception()
        return {"status": "failed", "error": str(exc)} if exc else {"status": "complete"}
    return {"status": "running"}


@app.delete("/reports/{job_id}")
async def cancel_job(job_id: str):
    task = _running_jobs.get(job_id)
    if task and not task.done():
        task.cancel()
        return {"status": "cancelled"}
    return {"status": "not_found"}

Periodic Background Jobs

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
import time


async def periodic(func, interval_seconds: float, name: str = ""):
    """Generic periodic task runner with error isolation."""
    while True:
        start = time.monotonic()
        try:
            await func()
        except asyncio.CancelledError:
            break
        except Exception as e:
            print(f"Periodic task {name} failed: {e}")
        # Sleep for remaining interval (accounting for execution time)
        elapsed = time.monotonic() - start
        await asyncio.sleep(max(0, interval_seconds - elapsed))


async def cleanup_expired_sessions():
    print("Cleaning expired sessions...")
    # await db.execute("DELETE FROM sessions WHERE expires_at < NOW()")


async def refresh_exchange_rates():
    print("Refreshing exchange rates...")
    # rates = await forex_api.get_rates()
    # await cache.set("exchange_rates", rates, ex=3600)


async def aggregate_analytics():
    print("Aggregating hourly analytics...")


_tasks: list[asyncio.Task] = []


@asynccontextmanager
async def lifespan(app: FastAPI):
    _tasks.append(asyncio.create_task(periodic(cleanup_expired_sessions, 300, "session-cleanup")))
    _tasks.append(asyncio.create_task(periodic(refresh_exchange_rates, 3600, "exchange-rates")))
    _tasks.append(asyncio.create_task(periodic(aggregate_analytics, 3600, "analytics")))
    yield
    for t in _tasks:
        t.cancel()
    await asyncio.gather(*_tasks, return_exceptions=True)


app = FastAPI(lifespan=lifespan)

APScheduler Integration

pip install apscheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from contextlib import asynccontextmanager
from fastapi import FastAPI

scheduler = AsyncIOScheduler(timezone="Asia/Kolkata")


async def daily_digest():
    print("Sending daily digest emails...")


async def hourly_cleanup():
    print("Running hourly cleanup...")


async def weekly_report():
    print("Generating weekly report...")


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Schedule jobs with cron expressions
    scheduler.add_job(daily_digest, CronTrigger(hour=9, minute=0))           # 9am daily
    scheduler.add_job(hourly_cleanup, CronTrigger(minute=0))                  # every hour
    scheduler.add_job(weekly_report, CronTrigger(day_of_week="mon", hour=8)) # Monday 8am

    scheduler.start()
    yield
    scheduler.shutdown(wait=False)


app = FastAPI(lifespan=lifespan)


@app.get("/jobs")
def list_jobs():
    return [
        {"id": job.id, "name": job.name, "next_run": str(job.next_run_time)}
        for job in scheduler.get_jobs()
    ]

Sharing State Between Requests and Background

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import asyncio

# Use app.state for shared mutable state
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.cache = {}
    app.state.job_results = {}
    app.state.metrics = {"requests": 0, "errors": 0}
    yield

app = FastAPI(lifespan=lifespan)

@app.middleware("http")
async def track_requests(request: Request, call_next):
    request.app.state.metrics["requests"] += 1
    try:
        response = await call_next(request)
        return response
    except Exception:
        request.app.state.metrics["errors"] += 1
        raise

@app.get("/metrics")
async def get_metrics(request: Request):
    return request.app.state.metrics

Production Patterns

import asyncio
from contextlib import asynccontextmanager

# Pattern: Supervisor that restarts crashed background tasks
class TaskSupervisor:
    def __init__(self):
        self._tasks: dict[str, tuple] = {}  # name -> (coro_factory, task)

    def add(self, name: str, coro_factory, restart_delay: float = 5.0):
        self._tasks[name] = (coro_factory, restart_delay, None)

    async def start(self):
        for name in self._tasks:
            await self._spawn(name)

    async def _spawn(self, name: str):
        coro_factory, delay, _ = self._tasks[name]
        task = asyncio.create_task(self._supervised(name, coro_factory, delay))
        self._tasks[name] = (coro_factory, delay, task)

    async def _supervised(self, name, factory, delay):
        while True:
            try:
                await factory()
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Task {name} crashed: {e}. Restarting in {delay}s...")
                await asyncio.sleep(delay)

    async def stop(self):
        for name, (_, _, task) in self._tasks.items():
            if task:
                task.cancel()
        tasks = [t for _, _, t in self._tasks.values() if t]
        await asyncio.gather(*tasks, return_exceptions=True)

Frequently Asked Questions

BackgroundTasks vs Celery — when to use each?
Use BackgroundTasks for lightweight fire-and-forget tasks that run within the same process, don't need retry logic, and can be lost on server restart (email notifications, cache invalidation). Use Celery when you need durability, retries, rate limiting, distributed execution, or tasks that might take minutes.
How do I run CPU-heavy work in a background task?
Never run CPU-heavy code in an async function — it blocks the event loop and prevents other requests from being served. Use await asyncio.to_thread(cpu_function, args) to run it in a thread pool, or use ProcessPoolExecutor via loop.run_in_executor() for true parallel CPU execution.
How do I test lifespan code?
Use httpx.AsyncClient(app=app, base_url="http://test") as an async context manager — it triggers the lifespan startup and shutdown automatically. Or use from fastapi.testclient import TestClient with with TestClient(app) as client: — the with block triggers lifespan events.