Python Kafka Consumer with confluent-kafka

The confluent-kafka library is the fastest Python Kafka client — it wraps librdkafka (a C library) and delivers much higher throughput than the pure-Python kafka-python package. Building a production Kafka consumer requires understanding consumer groups for horizontal scaling, offset commit strategies to avoid data loss or duplication, dead-letter queues for poison messages, and Schema Registry integration for Avro/Protobuf deserialization. This guide covers all these patterns with working code.

Installation and Basic Consumer

pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys

running = True

def shutdown(sig, frame):
    global running
    running = False

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)

conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor-v1",
    "auto.offset.reset": "earliest",   # start from beginning if no committed offset
    "enable.auto.commit": False,        # manual commit for exactly-once semantics
    "session.timeout.ms": 30000,
    "max.poll.interval.ms": 300000,
}

consumer = Consumer(conf)
consumer.subscribe(["orders.placed", "orders.updated"])

try:
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {msg.partition()}")
            else:
                raise KafkaException(msg.error())
            continue

        data = json.loads(msg.value().decode("utf-8"))
        print(f"[{msg.topic()}] partition={msg.partition()} offset={msg.offset()} key={msg.key()}")
        print(f"  payload: {data}")

        # Process the message
        process_order(data)

        # Commit only after successful processing
        consumer.commit(message=msg, asynchronous=False)

finally:
    consumer.close()


def process_order(data: dict):
    print(f"Processing order {data.get('order_id')}")
Auto-commit vs manual commit: enable.auto.commit=True commits offsets periodically regardless of whether your processing succeeded — leading to data loss on crashes. Always use enable.auto.commit=False and commit only after you have durably processed (or stored) the message.

Consumer Groups and Partition Assignment

A consumer group allows horizontal scaling — Kafka assigns each partition to exactly one consumer in the group. Add more consumer instances to scale throughput; the maximum useful parallelism equals the number of partitions. Rebalances occur when consumers join or leave the group.

from confluent_kafka import Consumer
from confluent_kafka import TopicPartition


class RebalanceHandler:
    def on_assign(self, consumer, partitions):
        print(f"Assigned partitions: {[p.partition for p in partitions]}")
        # Optionally seek to a specific offset on assignment
        for p in partitions:
            p.offset = self._load_last_offset(p.topic, p.partition)
        consumer.assign(partitions)

    def on_revoke(self, consumer, partitions):
        print(f"Revoking partitions: {[p.partition for p in partitions]}")
        # Commit offsets before giving up partitions
        consumer.commit(asynchronous=False)

    def on_lost(self, consumer, partitions):
        print(f"Lost partitions (rebalance timeout): {[p.partition for p in partitions]}")

    def _load_last_offset(self, topic: str, partition: int) -> int:
        # Load from your own offset store (Redis, DB) for exactly-once
        return -1001  # -1001 = OFFSET_STORED (use committed offset)


handler = RebalanceHandler()

conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor-v1",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    # Cooperative sticky assignor — minimizes partition movements on rebalance
    "partition.assignment.strategy": "cooperative-sticky",
}

consumer = Consumer(conf)
consumer.subscribe(["orders"], on_assign=handler.on_assign, on_revoke=handler.on_revoke)

Offset Management

Committing offsets correctly is the most critical aspect of Kafka consumer correctness. Commit too early and you lose messages on crash. Commit too late and you reprocess messages. Batch commits balance throughput and safety.

from confluent_kafka import Consumer, TopicPartition
from collections import defaultdict
import time

COMMIT_INTERVAL_SECONDS = 5
BATCH_SIZE = 500

conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor-v1",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
}

consumer = Consumer(conf)
consumer.subscribe(["orders"])

pending_offsets: dict[tuple, int] = defaultdict(int)
last_commit_time = time.monotonic()
batch_count = 0

while True:
    msg = consumer.poll(timeout=0.1)

    if msg and not msg.error():
        data = process(msg.value())

        # Track highest offset per (topic, partition)
        key = (msg.topic(), msg.partition())
        pending_offsets[key] = msg.offset() + 1  # +1 = next offset to consume
        batch_count += 1

    # Commit on batch size or time interval
    now = time.monotonic()
    should_commit = (
        batch_count >= BATCH_SIZE or
        (batch_count > 0 and now - last_commit_time >= COMMIT_INTERVAL_SECONDS)
    )

    if should_commit:
        offsets_to_commit = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in pending_offsets.items()
        ]
        consumer.commit(offsets=offsets_to_commit, asynchronous=False)
        pending_offsets.clear()
        batch_count = 0
        last_commit_time = now
        print(f"Committed {len(offsets_to_commit)} partition offsets")


def process(value: bytes) -> dict:
    import json
    return json.loads(value)

Error Handling and Dead-Letter Queue

Kafka consumers must handle two classes of errors: retriable (transient network issues, broker unavailable) and non-retriable (deserialization failures, schema violations, business logic errors). Send non-retriable messages to a dead-letter topic (DLQ) to avoid blocking the consumer on a single bad message.

from confluent_kafka import Consumer, Producer, KafkaException
import json
import traceback
from dataclasses import dataclass, asdict
from datetime import datetime, timezone


@dataclass
class DeadLetterMessage:
    original_topic: str
    original_partition: int
    original_offset: int
    error_type: str
    error_message: str
    failed_at: str
    payload: str


dlq_producer = Producer({"bootstrap.servers": "localhost:9092"})
DLQ_TOPIC = "orders.dead-letter"


def send_to_dlq(msg, error: Exception):
    dlq = DeadLetterMessage(
        original_topic=msg.topic(),
        original_partition=msg.partition(),
        original_offset=msg.offset(),
        error_type=type(error).__name__,
        error_message=str(error),
        failed_at=datetime.now(timezone.utc).isoformat(),
        payload=msg.value().decode("utf-8", errors="replace"),
    )
    dlq_producer.produce(
        DLQ_TOPIC,
        key=msg.key(),
        value=json.dumps(asdict(dlq)).encode(),
    )
    dlq_producer.flush()


consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor-v1",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
})
consumer.subscribe(["orders"])

MAX_RETRIES = 3

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None or msg.error():
        continue

    retries = 0
    success = False

    while retries < MAX_RETRIES and not success:
        try:
            data = json.loads(msg.value())
            process_order(data)
            success = True
        except (json.JSONDecodeError, KeyError, ValueError) as e:
            # Non-retriable: bad data — go to DLQ immediately
            send_to_dlq(msg, e)
            success = True  # treat DLQ send as "handled"
            print(f"Sent to DLQ: {e}")
        except Exception as e:
            retries += 1
            if retries >= MAX_RETRIES:
                send_to_dlq(msg, e)
                print(f"Max retries exceeded, sent to DLQ: {e}")
            else:
                import time; time.sleep(2 ** retries)  # exponential backoff

    consumer.commit(message=msg, asynchronous=False)

Avro with Schema Registry

Schema Registry enforces schema compatibility across producers and consumers. Messages carry only a schema ID (4 bytes), not the full schema — the consumer fetches and caches the schema by ID, then deserializes with Avro.

pip install confluent-kafka[avro]
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer

schema_registry_conf = {"url": "http://localhost:8081"}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(
    schema_registry_client,
    schema_str=None,  # fetch schema dynamically from registry by ID
    from_dict=lambda obj, ctx: obj,  # keep as dict
)

consumer_conf = {
    "bootstrap.servers": "localhost:9092",
    "key.deserializer": StringDeserializer("utf_8"),
    "value.deserializer": avro_deserializer,
    "group.id": "order-processor-avro",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(["orders.avro"])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue

    order = msg.value()   # already deserialized dict
    print(f"Order: {order}")
    consumer.commit(message=msg, asynchronous=False)

Async Processing Pattern

For I/O-bound processing (HTTP calls, DB writes), run a thread pool alongside the Kafka poll loop. Poll messages on the main thread, submit processing to the pool, and commit offsets only after the pool task completes.

from confluent_kafka import Consumer
from concurrent.futures import ThreadPoolExecutor, as_completed
import json

WORKERS = 8
consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processor-threaded",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
})
consumer.subscribe(["orders"])

futures_to_msg = {}

with ThreadPoolExecutor(max_workers=WORKERS) as pool:
    while True:
        msg = consumer.poll(timeout=0.01)

        if msg and not msg.error():
            future = pool.submit(process_order_io, json.loads(msg.value()))
            futures_to_msg[future] = msg

        # Collect completed futures and commit
        done = [f for f in list(futures_to_msg) if f.done()]
        for future in done:
            original_msg = futures_to_msg.pop(future)
            try:
                future.result()
                consumer.commit(message=original_msg, asynchronous=False)
            except Exception as e:
                print(f"Processing failed: {e}")
                # send to DLQ and commit to move past poison message


def process_order_io(data: dict):
    import time
    time.sleep(0.05)   # simulate DB write or HTTP call
    print(f"Processed {data.get('order_id')}")

Producer Basics

from confluent_kafka import Producer
import json

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "acks": "all",                # wait for all ISR replicas to ack
    "retries": 5,
    "retry.backoff.ms": 500,
    "compression.type": "lz4",
    "batch.size": 65536,
    "linger.ms": 5,               # wait up to 5ms to batch messages
})

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()}[{msg.partition()}] @ {msg.offset()}")

# Produce a message
producer.produce(
    topic="orders",
    key="order-123",
    value=json.dumps({"order_id": "123", "amount": 99.99}).encode(),
    callback=delivery_report,
)
producer.flush()  # block until all messages delivered

Frequently Asked Questions

confluent-kafka vs kafka-python — which to use?
Use confluent-kafka for production. It wraps librdkafka (C library) and delivers 5–10x higher throughput and lower latency than the pure-Python kafka-python. It also has first-class Schema Registry support. kafka-python is easier to install (no C dependency) but not suitable for high-throughput production use.
How do I achieve exactly-once processing?
True exactly-once requires: (1) disable auto-commit, (2) store your output and the committed offset atomically — e.g., write to Postgres and commit the offset in the same transaction, or use Kafka transactions if writing back to Kafka. Without atomic output+commit, you get at-least-once with idempotent processing.
What is the right number of consumers per group?
Maximum useful consumers = number of partitions. Extra consumers sit idle. If processing is slow, first increase partitions (requires rebalance), then add consumers up to the partition count.