Python Background Tasks: Celery Beat and APScheduler

Most production applications need recurring background work: sending daily digest emails, syncing data from external APIs, cleaning up expired records, generating reports, and running health checks. Python has two dominant approaches — Celery Beat for distributed task queues with scheduling, and APScheduler for in-process scheduling that works without a broker. This guide covers both with FastAPI integration patterns.

Celery Setup with Redis

pip install celery[redis] flower redis
# celery_app.py
from celery import Celery
import os

celery = Celery(
    "myapp",
    broker=os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
    include=["myapp.tasks"],
)

celery.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,        # re-queue on worker crash
    worker_prefetch_multiplier=1,  # fair task distribution
    result_expires=3600,        # delete results after 1 hour
)

# tasks.py
from celery_app import celery
import time

@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to: str, subject: str, body: str):
    """Send email — retries up to 3 times on failure."""
    try:
        # import smtplib; send email...
        print(f"Sending email to {to}: {subject}")
        return {"status": "sent", "to": to}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

@celery.task
def process_report(report_id: int) -> dict:
    """Generate a report synchronously."""
    time.sleep(2)  # simulate work
    return {"report_id": report_id, "status": "complete", "rows": 1500}

# Dispatch tasks
result = send_email.delay("alice@example.com", "Welcome!", "Thanks for signing up.")
print(result.id)       # task ID
print(result.status)   # PENDING / STARTED / SUCCESS / FAILURE

# Wait for result (don't do this in HTTP handlers)
data = process_report.delay(42).get(timeout=30)
# Start worker
celery -A celery_app worker --loglevel=info --concurrency=4

# Start worker for specific queue
celery -A celery_app worker -Q emails --loglevel=info

Celery Beat: Periodic Tasks

# celery_app.py — add beat schedule
from celery.schedules import crontab

celery.conf.beat_schedule = {
    # Every minute
    "health-check-every-minute": {
        "task": "myapp.tasks.health_check",
        "schedule": 60.0,
    },
    # Every day at 8am UTC
    "daily-digest-email": {
        "task": "myapp.tasks.send_daily_digest",
        "schedule": crontab(hour=8, minute=0),
        "args": (),
    },
    # Every Monday at 9am
    "weekly-report": {
        "task": "myapp.tasks.generate_weekly_report",
        "schedule": crontab(hour=9, minute=0, day_of_week="monday"),
    },
    # Every 15 minutes
    "sync-external-data": {
        "task": "myapp.tasks.sync_data",
        "schedule": crontab(minute="*/15"),
        "kwargs": {"source": "crm"},
    },
    # First day of each month at midnight
    "monthly-billing": {
        "task": "myapp.tasks.run_billing",
        "schedule": crontab(hour=0, minute=0, day_of_month=1),
    },
}

# Periodic tasks in tasks.py
@celery.task
def health_check():
    import requests
    try:
        r = requests.get("http://localhost:8000/health", timeout=5)
        return {"status": "ok", "code": r.status_code}
    except Exception as e:
        return {"status": "error", "error": str(e)}

@celery.task
def send_daily_digest():
    from myapp.db import get_users, get_daily_stats
    users = get_users(subscribed_to_digest=True)
    stats = get_daily_stats()
    for user in users:
        send_email.delay(user.email, "Daily Digest", format_digest(stats))
    return {"sent": len(users)}
# Start beat scheduler (run ONE instance only)
celery -A celery_app beat --loglevel=info

# Combined worker + beat (development only)
celery -A celery_app worker --beat --loglevel=info
One beat instance only: Run exactly one Celery Beat process per deployment. Multiple Beat processes running simultaneously will duplicate scheduled task executions. In Kubernetes, deploy Beat as a separate Deployment with replicas: 1.

Task Routing and Priorities

# Separate queues for different task types
celery.conf.task_routes = {
    "myapp.tasks.send_email": {"queue": "emails"},
    "myapp.tasks.process_report": {"queue": "reports"},
    "myapp.tasks.health_check": {"queue": "monitoring"},
}

# Or route by task name pattern
celery.conf.task_routes = {
    "myapp.tasks.send_*": {"queue": "emails"},
    "myapp.tasks.*_report": {"queue": "reports"},
}

# Dispatch to specific queue
send_email.apply_async(
    args=["user@example.com", "Alert!", "Something happened"],
    queue="emails",
    priority=9,        # 0 (low) to 9 (high) — Redis supports this
    countdown=300,     # delay 5 minutes
    expires=3600,      # discard if not executed within 1 hour
)

# Start workers for specific queues
# celery -A celery_app worker -Q emails --concurrency=8
# celery -A celery_app worker -Q reports --concurrency=2

Monitoring with Flower

# Start Flower web dashboard
celery -A celery_app flower --port=5555
# Open: http://localhost:5555

# With basic auth
celery -A celery_app flower --basic-auth=admin:secret --port=5555
# Programmatic task inspection
from celery_app import celery

inspect = celery.control.inspect()

# Active tasks
active = inspect.active()

# Reserved (queued) tasks
reserved = inspect.reserved()

# Worker statistics
stats = inspect.stats()

# Revoke (cancel) a task
celery.control.revoke(task_id="abc-123", terminate=True)

# Retry a failed task
from celery.result import AsyncResult
result = AsyncResult("abc-123")
print(result.state)    # FAILURE
print(result.info)     # exception info

APScheduler: In-Process Scheduling

APScheduler runs inside your Python process — no broker, no worker processes. Perfect for applications that need scheduled tasks but don't need distributed execution. Works great with FastAPI via the async scheduler.

pip install apscheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.asyncio import AsyncIOExecutor
import asyncio

# Configure with Redis job store (persists jobs across restarts)
jobstores = {
    "default": RedisJobStore(host="localhost", port=6379, db=2),
}

scheduler = AsyncIOScheduler(
    jobstores=jobstores,
    executors={"default": AsyncIOExecutor()},
    job_defaults={"coalesce": True, "max_instances": 1},
    timezone="UTC",
)

# Add jobs
async def send_digests():
    print("Sending daily digests...")

async def sync_crm():
    print("Syncing CRM data...")

# Interval trigger
scheduler.add_job(
    sync_crm,
    trigger=IntervalTrigger(minutes=15),
    id="sync_crm",
    replace_existing=True,
    misfire_grace_time=300,  # run up to 5 min late before skipping
)

# Cron trigger
scheduler.add_job(
    send_digests,
    trigger=CronTrigger(hour=8, minute=0),
    id="daily_digest",
    replace_existing=True,
)

# One-time scheduled job
from datetime import datetime, timedelta
scheduler.add_job(
    lambda: print("Reminder!"),
    trigger="date",
    run_date=datetime.utcnow() + timedelta(hours=24),
    id="one_time_reminder",
)

FastAPI Integration

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

# Trigger tasks from HTTP endpoints
@app.post("/tasks/sync")
async def trigger_sync():
    scheduler.add_job(sync_crm, id="manual_sync", replace_existing=True)
    return {"status": "scheduled"}

@app.get("/tasks")
async def list_jobs():
    return [
        {
            "id": job.id,
            "next_run": str(job.next_run_time),
            "trigger": str(job.trigger),
        }
        for job in scheduler.get_jobs()
    ]

@app.delete("/tasks/{job_id}")
async def remove_job(job_id: str):
    scheduler.remove_job(job_id)
    return {"removed": job_id}

# Celery + FastAPI — dispatch tasks from HTTP handlers
from celery_app import celery
from myapp.tasks import send_email, process_report

@app.post("/reports/{report_id}/generate")
async def generate_report(report_id: int):
    task = process_report.delay(report_id)
    return {"task_id": task.id, "status": "queued"}

@app.get("/reports/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = celery.AsyncResult(task_id)
    return {"task_id": task_id, "status": result.status, "result": result.result}

Celery Beat vs APScheduler

FeatureCelery BeatAPScheduler
Requires brokerYes (Redis/RabbitMQ)No (optional Redis store)
Distributed workersYesNo
In-processNoYes
Task persistenceYes (broker)Optional (jobstore)
Best forHigh-volume, distributedSimple, single-process
FastAPI integrationVia .delay()Native async

Frequently Asked Questions

Can I use Celery without Redis?
Yes. Celery supports RabbitMQ, Amazon SQS, and even a database broker (via celery[sqlalchemy]). Redis is the most popular choice because it also serves as a result backend and is easy to set up.
What happens if a Beat task misses its scheduled time?
By default, missed tasks are skipped. Set task_default_rate_limit and configure misfire_grace_time (APScheduler) or task_annotations (Celery) to control behavior. Celery's coalesce=True runs missed tasks once instead of piling them up.
How do I prevent duplicate task execution?
Celery Beat with only one Beat process prevents duplicate scheduling. For task-level deduplication, use a task lock in Redis: check and set a key before executing, clear it on completion. Libraries like celery-once automate this pattern.