Python Pika: RabbitMQ Messaging with AMQP

Pika is the official Python AMQP client for RabbitMQ. RabbitMQ's exchange-queue-binding model is more flexible than Kafka for task distribution patterns: work queues, pub/sub fan-out, topic routing, and RPC-over-messaging. This guide implements the four main messaging patterns with Pika, adds dead-letter exchange (DLX) handling for failed messages, and shows the async aio-pika library for FastAPI integration.

Installation and Connection

pip install pika aio-pika
import pika
import os

RABBITMQ_URL = os.environ.get("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/")

def get_connection() -> pika.BlockingConnection:
    params = pika.URLParameters(RABBITMQ_URL)
    params.heartbeat = 600
    params.blocked_connection_timeout = 300
    return pika.BlockingConnection(params)

# Simple test
conn = get_connection()
channel = conn.channel()
print("Connected to RabbitMQ")
conn.close()
RabbitMQ concepts: Exchange receives published messages and routes them to queues based on binding rules. Queue stores messages until consumed. Binding links an exchange to a queue with a routing key. Consumer subscribes to a queue and receives messages. Producers never write to queues directly — always to exchanges.

Work Queue (Task Queue)

A work queue distributes tasks across multiple worker processes. Each message is delivered to exactly one worker. Set prefetch_count=1 so workers only receive one task at a time — preventing fast workers from hoarding messages while slow workers idle.

import pika
import json
import time

QUEUE = "task_queue"


def publisher():
    """Send tasks to the work queue."""
    conn = get_connection()
    ch = conn.channel()
    ch.queue_declare(
        queue=QUEUE,
        durable=True,   # survive broker restarts
    )

    for i in range(10):
        task = {"task_id": i, "payload": f"process item {i}", "priority": i % 3}
        ch.basic_publish(
            exchange="",
            routing_key=QUEUE,
            body=json.dumps(task).encode(),
            properties=pika.BasicProperties(
                delivery_mode=pika.DeliveryMode.Persistent,  # persist to disk
            ),
        )
        print(f"Published task {i}")

    conn.close()


def worker():
    """Process tasks from the work queue."""
    conn = get_connection()
    ch = conn.channel()
    ch.queue_declare(queue=QUEUE, durable=True)
    ch.basic_qos(prefetch_count=1)  # process one message at a time

    def callback(ch, method, properties, body):
        task = json.loads(body)
        print(f"Processing task {task['task_id']}...")
        try:
            time.sleep(0.1)  # simulate work
            print(f"Task {task['task_id']} complete")
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"Task failed: {e}")
            # nack and requeue=False → goes to DLX if configured
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    ch.basic_consume(queue=QUEUE, on_message_callback=callback)
    print("Worker started. Waiting for tasks...")
    ch.start_consuming()


if __name__ == "__main__":
    import sys
    if sys.argv[1] == "publish":
        publisher()
    else:
        worker()

Publish/Subscribe with Fanout

A fanout exchange delivers every message to all bound queues. Use it for broadcasting events — order placed notification should go to email service, SMS service, analytics service, and inventory service simultaneously.

import pika
import json


EXCHANGE = "order.events"


def setup_fanout(channel):
    channel.exchange_declare(exchange=EXCHANGE, exchange_type="fanout", durable=True)


def publish_event(event_type: str, payload: dict):
    conn = get_connection()
    ch = conn.channel()
    setup_fanout(ch)

    message = {"event": event_type, "data": payload}
    ch.basic_publish(
        exchange=EXCHANGE,
        routing_key="",    # ignored by fanout
        body=json.dumps(message).encode(),
        properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
    )
    conn.close()
    print(f"Published event: {event_type}")


def subscribe(service_name: str, handler):
    """Each service gets its own durable queue bound to the fanout exchange."""
    conn = get_connection()
    ch = conn.channel()
    setup_fanout(ch)

    queue_name = f"{EXCHANGE}.{service_name}"
    ch.queue_declare(queue=queue_name, durable=True)
    ch.queue_bind(exchange=EXCHANGE, queue=queue_name)

    def callback(ch, method, properties, body):
        event = json.loads(body)
        try:
            handler(event)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"{service_name} failed to process: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    ch.basic_qos(prefetch_count=10)
    ch.basic_consume(queue=queue_name, on_message_callback=callback)
    print(f"{service_name} listening on {queue_name}")
    ch.start_consuming()


# Usage
publish_event("order.placed", {"order_id": "ord-123", "amount": 99.99})
# subscribe("email-service", lambda e: print(f"Email: {e}"))
# subscribe("analytics", lambda e: print(f"Analytics: {e}"))

Topic Routing

Topic exchanges route messages by pattern-matching the routing key. * matches one word, # matches zero or more words. This enables fine-grained routing: send order.europe.placed to European order handlers only.

import pika
import json


def setup_topic_exchange(ch, exchange: str = "events.topic"):
    ch.exchange_declare(exchange=exchange, exchange_type="topic", durable=True)
    return exchange


def publish_topic(routing_key: str, payload: dict, exchange: str = "events.topic"):
    conn = get_connection()
    ch = conn.channel()
    setup_topic_exchange(ch, exchange)
    ch.basic_publish(
        exchange=exchange,
        routing_key=routing_key,
        body=json.dumps(payload).encode(),
        properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
    )
    conn.close()


def subscribe_topic(queue: str, binding_pattern: str, handler, exchange: str = "events.topic"):
    conn = get_connection()
    ch = conn.channel()
    setup_topic_exchange(ch, exchange)

    ch.queue_declare(queue=queue, durable=True)
    ch.queue_bind(exchange=exchange, queue=queue, routing_key=binding_pattern)

    def callback(ch, method, properties, body):
        data = json.loads(body)
        try:
            handler(data, method.routing_key)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    ch.basic_consume(queue=queue, on_message_callback=callback)
    ch.start_consuming()


# Routing examples:
# publish_topic("order.europe.placed", {...})
# publish_topic("order.asia.cancelled", {...})
# subscribe_topic("all-orders", "order.#", handler)       # all order events
# subscribe_topic("europe-orders", "order.europe.*", h)   # only Europe events
# subscribe_topic("placed-orders", "*.*.placed", h)       # only placed events

Dead-Letter Exchange

Configure a dead-letter exchange (DLX) to capture failed messages — those that were rejected (nacked without requeue), expired, or exceeded the queue's max-length. This prevents message loss and gives operators visibility into processing failures.

import pika
import json


def setup_dlx(ch):
    """Create DLX exchange and dead-letter queue."""
    # Dead-letter exchange
    ch.exchange_declare(exchange="dlx", exchange_type="direct", durable=True)

    # Dead-letter queue — operators inspect this for failed messages
    ch.queue_declare(
        queue="dead.letters",
        durable=True,
        arguments={
            "x-message-ttl": 7 * 24 * 60 * 60 * 1000,  # keep 7 days
        },
    )
    ch.queue_bind(exchange="dlx", queue="dead.letters", routing_key="failed")


def setup_work_queue_with_dlx(ch, queue_name: str):
    """Work queue that sends failures to DLX."""
    setup_dlx(ch)

    ch.queue_declare(
        queue=queue_name,
        durable=True,
        arguments={
            "x-dead-letter-exchange": "dlx",
            "x-dead-letter-routing-key": "failed",
            "x-message-ttl": 60 * 60 * 1000,           # messages expire after 1h
            "x-max-length": 100_000,                     # max queue depth
        },
    )
    return queue_name


conn = get_connection()
ch = conn.channel()
setup_work_queue_with_dlx(ch, "orders.processing")

# Any message nacked with requeue=False or expired goes to dead.letters queue
# Monitor dead.letters to detect systematic failures
conn.close()

Async with aio-pika and FastAPI

import asyncio
import json
import aio_pika
from contextlib import asynccontextmanager
from fastapi import FastAPI

_connection: aio_pika.abc.AbstractConnection | None = None
_channel: aio_pika.abc.AbstractChannel | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global _connection, _channel
    _connection = await aio_pika.connect_robust(
        "amqp://guest:guest@localhost/",
        heartbeat=600,
    )
    _channel = await _connection.channel()
    await _channel.set_qos(prefetch_count=10)
    yield
    await _connection.close()


app = FastAPI(lifespan=lifespan)


@app.post("/orders")
async def create_order(order: dict):
    await _channel.default_exchange.publish(
        aio_pika.Message(
            body=json.dumps(order).encode(),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        ),
        routing_key="orders",
    )
    return {"status": "queued"}


async def start_consumer():
    conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await conn.channel()
    await channel.set_qos(prefetch_count=5)

    queue = await channel.declare_queue("orders", durable=True)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                order = json.loads(message.body)
                print(f"Processing order: {order}")
                await asyncio.sleep(0.1)   # async work here

Production Patterns

import pika
import time
import logging

log = logging.getLogger(__name__)


class RobustConsumer:
    """Auto-reconnecting RabbitMQ consumer."""

    def __init__(self, url: str, queue: str, handler):
        self.url = url
        self.queue = queue
        self.handler = handler

    def run(self):
        while True:
            try:
                conn = pika.BlockingConnection(pika.URLParameters(self.url))
                ch = conn.channel()
                ch.queue_declare(queue=self.queue, durable=True)
                ch.basic_qos(prefetch_count=1)

                def callback(ch, method, properties, body):
                    try:
                        self.handler(body)
                        ch.basic_ack(delivery_tag=method.delivery_tag)
                    except Exception as e:
                        log.exception("Handler failed")
                        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

                ch.basic_consume(queue=self.queue, on_message_callback=callback)
                log.info(f"Consuming from {self.queue}")
                ch.start_consuming()

            except pika.exceptions.AMQPConnectionError as e:
                log.warning(f"Connection lost: {e}. Reconnecting in 5s...")
                time.sleep(5)
            except KeyboardInterrupt:
                break

Frequently Asked Questions

RabbitMQ vs Kafka — when to use each?
Use RabbitMQ for task queues, RPC, complex routing, and when messages should be deleted after consumption. Use Kafka for high-throughput event streaming, log aggregation, replay, and when consumers need to process historical events. RabbitMQ is easier to set up; Kafka scales to millions of events/second.
What is the difference between ack and nack?
basic_ack tells RabbitMQ the message was processed successfully — it is removed from the queue. basic_nack(requeue=True) returns it to the front of the queue for retry. basic_nack(requeue=False) discards it (goes to DLX if configured). Never leave messages unacked — the queue holds them in memory until the connection closes.
How do I monitor queue depth?
Use the RabbitMQ Management API (http://localhost:15672/api/queues) or the rabbitmq-management plugin. For production, export RabbitMQ metrics to Prometheus via rabbitmq_prometheus plugin and alert when queue depth exceeds N messages.