Celery: Distributed Task Queue with Python and Redis (2026)

Published June 6, 2026 • 14 min read

Celery is the most widely deployed distributed task queue in the Python ecosystem. It decouples long-running work — sending emails, processing images, generating reports, syncing with third-party APIs — from your HTTP request cycle, keeping response times fast and servers responsive. This guide covers the full Celery stack: Redis broker, workers, Beat scheduler, retry strategies, Flower monitoring, Django integration, and the canvas primitives that let you build complex task workflows.

Celery Architecture

Understanding the four components prevents most production surprises:

ComponentRoleCommon Choice
BrokerReceives task messages from producers; queues them for workersRedis, RabbitMQ
WorkerPulls tasks from broker, executes themCelery worker process
Result BackendStores task results/status for retrieval by producersRedis, PostgreSQL, Django DB
BeatPeriodic task scheduler — puts tasks on the broker on a scheduleCelery Beat process
Note: The broker and result backend can be the same Redis instance in development. In production, use separate Redis instances (or databases within the same instance) so a surge in result storage doesn't starve the broker queue.

Setup with Redis

pip install celery[redis] redis kombu

# Run Redis locally (Docker)
docker run -d -p 6379:6379 --name redis redis:7-alpine
# celery_app.py
from celery import Celery

app = Celery(
    'myproject',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['myproject.tasks.email', 'myproject.tasks.media'],
)

app.conf.update(
    task_serializer          = 'json',
    accept_content           = ['json'],
    result_serializer        = 'json',
    timezone                 = 'UTC',
    enable_utc               = True,
    task_track_started       = True,
    task_acks_late           = True,   # ack after execution, not on receipt
    worker_prefetch_multiplier = 1,    # one task per worker at a time (safer)
    result_expires           = 3600,   # delete results after 1 hour
)
# Start a worker (development)
celery -A celery_app worker --loglevel=info

# Start with concurrency (production: match CPU cores or 2× for I/O tasks)
celery -A celery_app worker --concurrency=4 --loglevel=warning

# Inspect active workers
celery -A celery_app inspect active

Defining and Calling Tasks

Three real-world tasks — sending email, processing an uploaded image, and exporting data:

# tasks/email.py
from celery_app import app
from celery.utils.log import get_task_logger
import smtplib
from email.message import EmailMessage

logger = get_task_logger(__name__)

@app.task(bind=True, name='tasks.send_welcome_email',
          autoretry_for=(smtplib.SMTPException,),
          retry_backoff=True, max_retries=3)
def send_welcome_email(self, user_id: int, email: str, username: str) -> dict:
    logger.info(f"Sending welcome email to {email} (attempt {self.request.retries + 1})")
    msg = EmailMessage()
    msg['Subject'] = f"Welcome to Techoral, {username}!"
    msg['From']    = 'noreply@techoral.com'
    msg['To']      = email
    msg.set_content(f"Hi {username},\n\nYour account is ready.")
    with smtplib.SMTP('smtp.example.com', 587) as smtp:
        smtp.starttls()
        smtp.login('user', 'pass')
        smtp.send_message(msg)
    return {'status': 'sent', 'user_id': user_id}
# tasks/media.py
from celery_app import app
from PIL import Image
import os

@app.task(name='tasks.process_image', bind=True, max_retries=2)
def process_image(self, image_path: str, output_dir: str) -> dict:
    """Resize uploaded image to thumbnail and web sizes."""
    try:
        with Image.open(image_path) as img:
            sizes = {'thumbnail': (150, 150), 'web': (800, 600)}
            results = {}
            for name, size in sizes.items():
                img_copy = img.copy()
                img_copy.thumbnail(size, Image.LANCZOS)
                out_path = os.path.join(output_dir, f"{name}_{os.path.basename(image_path)}")
                img_copy.save(out_path, optimize=True, quality=85)
                results[name] = out_path
        return {'status': 'ok', 'files': results}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)
# Calling tasks
from tasks.email import send_welcome_email
from tasks.media import process_image

# Fire and forget
send_welcome_email.delay(user_id=42, email='alice@example.com', username='alice')

# Get a result handle
result = process_image.delay('/uploads/photo.jpg', '/processed/')
# Poll for result (blocks until done or timeout)
output = result.get(timeout=60)

# Schedule for a specific time
from datetime import datetime, timezone, timedelta
send_welcome_email.apply_async(
    args=[42, 'alice@example.com', 'alice'],
    eta=datetime.now(timezone.utc) + timedelta(minutes=5),
    queue='email',   # route to a specific queue
)

Celery Beat — Periodic Tasks

# celery_app.py — add beat schedule
from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-expired-sessions': {
        'task': 'tasks.maintenance.cleanup_sessions',
        'schedule': crontab(hour=3, minute=0),  # daily at 3 AM UTC
    },
    'send-weekly-digest': {
        'task': 'tasks.email.send_weekly_digest',
        'schedule': crontab(day_of_week='monday', hour=9, minute=0),
    },
    'refresh-cache-every-5min': {
        'task': 'tasks.cache.refresh_homepage_cache',
        'schedule': 300.0,   # seconds (float)
    },
    'process-pending-exports': {
        'task': 'tasks.exports.process_pending',
        'schedule': crontab(minute='*/15'),  # every 15 minutes
        'args': (),
        'kwargs': {'batch_size': 50},
    },
}
# Start Beat (run exactly ONE Beat process per cluster)
celery -A celery_app beat --loglevel=info

# Combined worker + beat for development only
celery -A celery_app worker --beat --loglevel=info
Pro Tip: Never run more than one Beat process against the same broker — it will duplicate task submissions. In Kubernetes, use a single-replica Deployment for Beat, separate from the worker Deployment.

Retries with Exponential Backoff

import requests
from celery_app import app

@app.task(
    bind=True,
    name='tasks.sync_external_api',
    # autoretry_for: exception classes that trigger an automatic retry
    autoretry_for=(requests.exceptions.RequestException, ConnectionError),
    # retry_backoff: exponential backoff — waits 2s, 4s, 8s, 16s...
    retry_backoff=2,
    # retry_backoff_max: cap the wait time at 600 seconds
    retry_backoff_max=600,
    # retry_jitter: add randomness to avoid thundering herd
    retry_jitter=True,
    max_retries=5,
    default_retry_delay=60,
)
def sync_external_api(self, resource_id: int) -> dict:
    resp = requests.get(
        f'https://api.partner.com/resources/{resource_id}',
        timeout=10,
    )
    resp.raise_for_status()
    data = resp.json()
    # ... process data ...
    return {'synced': resource_id, 'status': data['status']}

# Manual retry inside exception handler
@app.task(bind=True, max_retries=3)
def call_flaky_service(self, payload: dict):
    try:
        result = external_service(payload)
        return result
    except TemporaryServiceError as exc:
        # Exponential: 60, 120, 240 seconds
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Flower — Real-Time Monitoring

pip install flower

# Start Flower (port 5555)
celery -A celery_app flower --port=5555

# With basic auth
celery -A celery_app flower --basic-auth=admin:password --port=5555

# Flower as a Docker service
docker run -p 5555:5555 mher/flower \
  celery flower --broker=redis://redis:6379/0

Flower shows active workers, task throughput, success/failure rates, active vs. queued tasks, and lets you revoke tasks from the UI. Access it at http://localhost:5555.

Django + Celery Integration

# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()  # finds tasks.py in every INSTALLED_APPS app
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py — Celery config with CELERY_ prefix
CELERY_BROKER_URL        = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND    = 'django-db'  # stores results in Django ORM
CELERY_CACHE_BACKEND     = 'default'
CELERY_TASK_SERIALIZER   = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT    = ['json']
CELERY_TIMEZONE          = 'UTC'

# Add to INSTALLED_APPS for django-db backend
# 'django_celery_results',
# 'django_celery_beat',  # if using Beat with DB-backed schedule
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Order

@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), retry_backoff=True)
def fulfill_order(self, order_id: int) -> str:
    order = Order.objects.select_related('user').get(pk=order_id)
    # ... process payment, update inventory ...
    order.status = 'fulfilled'
    order.save(update_fields=['status'])
    send_mail(
        subject=f'Order #{order_id} Confirmed',
        message=f'Hi {order.user.first_name}, your order is on the way!',
        from_email='orders@techoral.com',
        recipient_list=[order.user.email],
    )
    return f'Order {order_id} fulfilled'

Task Canvas: chain, group, chord

from celery import chain, group, chord
from tasks.media import process_image, generate_thumbnail, upload_to_cdn
from tasks.email import send_notification

# chain: sequential pipeline — output of each step feeds into the next
pipeline = chain(
    process_image.s('/uploads/photo.jpg', '/tmp/'),
    generate_thumbnail.s(),
    upload_to_cdn.s(bucket='media-bucket'),
)
pipeline.delay()

# group: parallel execution of independent tasks
parallel = group(
    process_image.s(f'/uploads/img{i}.jpg', '/tmp/')
    for i in range(10)
)
result = parallel.delay()
outputs = result.get()  # list of 10 results

# chord: parallel tasks + callback when ALL complete
# Great for: fan-out processing with a final aggregation step
header = group(
    process_image.s(f'/uploads/img{i}.jpg', '/tmp/')
    for i in range(10)
)
callback = send_notification.s(user_id=42, message="All images processed!")
workflow = chord(header)(callback)
workflow.delay()
Note: Chords require a result backend. If your result backend is Redis, chords work out of the box. With the Django DB backend, chords are supported but can generate a lot of database reads — use Redis for backends in chord-heavy workflows.

Frequently Asked Questions

What is the difference between .delay() and .apply_async()?
.delay(*args, **kwargs) is a shortcut for .apply_async(args, kwargs) with no extra options. Use .apply_async() when you need to set eta, countdown, expires, queue, priority, or custom headers. Both return an AsyncResult you can poll.
How do I route different tasks to different queues?
Define a task_routes dict in your Celery config mapping task names to queue names. Start separate workers consuming from each queue with --queues=email,default. This lets you scale email workers independently from CPU-heavy image processing workers.
How do I handle task idempotency?
Design tasks to be safe to run multiple times. For database writes, use update_or_create() instead of create(). For external API calls, check state before sending. Store a deduplication key (e.g., celery_task_id) on your model and skip execution if already processed.
Should I use Celery or FastAPI's BackgroundTasks?
Use BackgroundTasks for lightweight work that must run within the same process and doesn't need retry logic or result tracking (e.g., logging, analytics pings). Use Celery when tasks are heavy, need retries, need to be scheduled, need result persistence, or need to run on separate worker machines.
What causes workers to silently swallow exceptions?
Setting task_acks_early=True (the default) acknowledges the message before execution — if the worker crashes mid-task, the message is lost. Set task_acks_late=True to acknowledge only after successful completion, which ensures at-least-once delivery (paired with idempotent tasks).