This guide covers the most frequently asked Apache Kafka interview questions in 2026 — from core architecture and replication to Kafka Streams, Schema Registry, Kafka Connect, and production operations including KRaft mode.
Apache Kafka is a distributed event streaming platform — a high-throughput, fault-tolerant, horizontally scalable commit log. It solves the problem of connecting many data producers and consumers without tight coupling.
Before Kafka: point-to-point integrations created an N×M mesh of pipelines — N sources each writing to M destinations. Adding one system required updating every producer that feeds it.
With Kafka: producers write to topics; consumers read from topics. Add a consumer without changing any producer. The commit log persists data (default 7 days), so consumers can replay, backfill, or process at their own pace.
Use cases: real-time analytics, event sourcing, CDC (change data capture), activity tracking, microservice communication, log aggregation, metrics pipelines, stream processing.
Key guarantees: durable (replicated to disk), ordered (within a partition), high-throughput (millions of events/sec per broker), low-latency (sub-10ms p99).
replica.lag.time.max.ms. Only ISR members can be elected leader.# Topic "orders" with 3 partitions, replication factor 3:
Partition 0: Leader=Broker1, Followers=[Broker2, Broker3], ISR=[1,2,3]
Partition 1: Leader=Broker2, Followers=[Broker1, Broker3], ISR=[2,1,3]
Partition 2: Leader=Broker3, Followers=[Broker1, Broker2], ISR=[3,1,2]
# Leader handles all produce/fetch requests for its partitions
# Followers replicate from the leader's log
# ISR: set of replicas that are fully caught up with the leader
# - A follower is removed from ISR if it falls behind by > replica.lag.time.max.ms (default 30s)
# - min.insync.replicas: minimum ISR size for a produce to succeed
# (prevents "blind" acks when most replicas are down)
Unclean leader election: unclean.leader.election.enable=false (default) — only ISR members can become leader. Setting to true allows a lagging replica to become leader (availability over consistency), risking data loss.
ZooKeeper (legacy): stored cluster metadata — broker registration, topic configurations, partition assignments, controller election, ACLs. Separate ZooKeeper quorum required alongside Kafka cluster.
Problems with ZooKeeper:
KRaft mode (Kafka 3.3+ production-ready, ZK removed in Kafka 4.0): Kafka manages its own metadata using a Raft consensus protocol. A subset of brokers act as controllers and store metadata in an internal __cluster_metadata topic.
# KRaft: no separate ZooKeeper process
# Brokers are: broker role, controller role, or both (combined mode for dev)
# Metadata log replicated via Raft — much faster controller failover (~10ms vs seconds)
# Supports millions of partitions per cluster
# kafka.server.config:
process.roles=broker,controller # combined mode (dev/small clusters)
node.id=1
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
Kafka's throughput comes from several design choices that maximise hardware efficiency:
sendfile() to transfer data directly from page cache to NIC buffer in kernel space — 0 CPU copies, 2 context switches instead of 4.The Controller is a special broker (one per cluster in ZK mode; a quorum in KRaft) that manages cluster-wide administrative operations:
In ZooKeeper mode: any broker can become controller — elected via ZK ephemeral node. If controller fails, ZK triggers re-election among remaining brokers. On election, new controller must reload all metadata from ZK (slow for large clusters).
In KRaft mode: the controller quorum (3 or 5 nodes) runs Raft. Leadership is explicit and fast to fail over. Metadata changes are a Raft log — faster, atomic, and auditable.
# Partition storage on disk:
/kafka-logs/orders-0/ ← partition 0 of topic "orders"
00000000000000000000.log ← segment file (records)
00000000000000000000.index ← sparse offset index
00000000000000000000.timeindex ← timestamp index
00000000000001234567.log ← next segment (after roll)
producer-snapshot ← idempotence tracking
# Segments roll when: segment.bytes (default 1GB) or segment.ms exceeded
# Old segments deleted when: retention.bytes OR retention.ms exceeded (default 7 days)
Log compaction (cleanup.policy=compact): instead of deleting old records by time/size, Kafka retains only the latest record per key. Older records with the same key are garbage-collected. Useful for changelog topics, CDC state stores.
# Before compaction: After compaction:
key=user1 → {"age":25} ← deleted
key=user1 → {"age":26} ← deleted
key=user1 → {"age":27} ← retained (latest)
key=user2 → {"city":"NY"} ← retained (only entry)
# Tombstone: produce a record with key=K, value=null → deletes K after compaction
# Use compact+delete: cleanup.policy=compact,delete — compact AND delete old segments
enable.idempotence=true) — broker deduplicates retried produce requests using sequence numbers.// EOS producer config:
enable.idempotence=true
transactional.id=my-app-producer-1 // unique per producer instance
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=5 // up to 5 with idempotence
// EOS consumer config:
isolation.level=read_committed
Partition count determines parallelism — the maximum number of consumer instances that can process a topic concurrently equals the partition count.
# Formula (rough starting point):
partitions = max(throughput_target / max_producer_throughput_per_partition,
throughput_target / max_consumer_throughput_per_partition)
# Rule of thumb: ~1 partition per 10 MB/s or per consumer thread you plan to run
# Typical: 6–24 partitions for most topics; 100+ for high-volume topics
# Considerations:
# ✅ More partitions = more parallelism, faster recovery via parallel rebalance
# ❌ More partitions = more file handles, memory (ISR lists), controller overhead
# ❌ More partitions = slower leader election if broker fails (more leaders to elect)
# ❌ Partitions are hard to reduce later — plan ahead but don't over-partition
# Recommended upper limits:
# < 10,000 partitions per cluster (ZK mode)
# 1M+ partitions per cluster (KRaft — main scalability win)
# A Kafka record consists of:
# - Topic (routing metadata, not stored in the record itself)
# - Partition (assigned by producer or partitioner)
# - Offset (assigned by broker after write)
# - Timestamp (CreateTime by producer OR LogAppendTime by broker)
# - Headers: list of key-value pairs (String key, byte[] value)
# — useful for routing metadata, correlation IDs, trace context
# - Key: optional byte[] — used for partitioning and compaction
# - Value: byte[] — the actual payload (JSON, Avro, Protobuf, raw bytes)
# - Compression codec (per batch)
# Null key: records round-robined (or sticky-partitioned) across partitions
# With key: records with same key always go to same partition
# → order preserved for that key
# → useful for user events (key=userId), entity state (key=orderId)
A Kafka cluster is a set of brokers sharing a common metadata store (ZooKeeper or KRaft). Scaling is horizontal — add brokers, then redistribute partitions.
# Scale-out steps:
1. Add new broker(s) to the cluster (auto-registers in ZK/KRaft)
2. Reassign partitions to include new broker using kafka-reassign-partitions.sh
or Cruise Control (automated rebalancing tool from LinkedIn)
3. Kafka throttles replica data movement to avoid impacting producers/consumers
(replica.fetch.max.bytes, throttle setting on reassignment)
# kafka-reassign-partitions.sh:
# Step 1: generate reassignment plan
kafka-reassign-partitions.sh --bootstrap-server broker:9092 \
--generate --topics-to-move-json-file topics.json \
--broker-list "1,2,3,4"
# Step 2: execute
kafka-reassign-partitions.sh --bootstrap-server broker:9092 \
--execute --reassignment-json-file reassignment.json \
--throttle 50000000 # 50 MB/s replication throttle
Tiered Storage (Kafka 3.6+ GA) enables Kafka to offload older log segments to remote object storage (S3, GCS, Azure Blob) while keeping recent hot data on local broker disk.
# Without tiered storage:
# Retention limited by local disk. Large retention → large, expensive brokers.
# With tiered storage:
# Local disk: recent segments (hours/days of hot data)
# Remote storage: older segments (weeks/months of cold data)
# Consumers can fetch from remote when offset is older than local retention
# Brokers stay small/cheap; retention essentially unlimited
# Config:
remote.log.storage.system.enable=true
remote.log.metadata.manager.class.name=... # plugin
remote.log.storage.manager.class.name=... # plugin (S3, GCS, etc.)
remote.log.segment.delete.delay.ms=60000
# Confluent Cloud and MSK already support tiered storage
Combined with log compaction, tiered storage enables Kafka as a long-term event store — full history available to new consumers without bloating broker disk.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// Durability & reliability:
props.put("acks", "all"); // wait for all ISR to ack (most durable)
// "1" = leader only; "0" = no ack
props.put("retries", Integer.MAX_VALUE); // retry indefinitely (with idempotence)
props.put("enable.idempotence", "true"); // exactly-once per session
// Throughput:
props.put("linger.ms", "5"); // wait up to 5ms to fill a batch
props.put("batch.size", "65536"); // 64KB batch (default 16KB)
props.put("compression.type", "lz4"); // compress batches (snappy/lz4/zstd)
props.put("buffer.memory", "67108864"); // 64MB producer buffer
props.put("max.block.ms", "60000"); // how long send() blocks if buffer full
// Latency:
props.put("linger.ms", "0"); // send immediately (low-latency)
props.put("acks", "1"); // leader ack only (lower latency vs acks=all)
ProducerRecord. Overrides all partitioner logic.murmur2(key) % numPartitions. Same key always same partition. Consistent as long as partition count doesn't change.linger.ms expires, then switches. Better batching than pure round-robin.Partitioner interface. Useful for hot key avoidance, tenant isolation, or custom routing logic.// Custom partitioner to avoid hot partition for "LARGE_CUSTOMER" key:
public class SmartPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partitions = cluster.partitionCountForTopic(topic);
if ("LARGE_CUSTOMER".equals(key)) {
// spread across first half of partitions
return (int)(Math.random() * partitions / 2);
}
return Utils.toPositive(Utils.murmur2(keyBytes)) % partitions;
}
}
# Idempotent producer (enable.idempotence=true):
# Each producer gets a unique Producer ID (PID) assigned by the broker.
# Each record in a batch gets a monotonically increasing sequence number (SN).
# Broker tracks: last SN per (PID, partition).
# On retry: broker sees same PID+SN → deduplicates (discards duplicate, acks original)
# Without idempotence:
# Producer sends batch → network glitch → broker acked but response lost
# Producer retries → broker writes DUPLICATE record
# Result: duplicate processing downstream
# With idempotence:
# Same scenario — broker sees same PID+SN → discards duplicate → acks
# No duplicate on retry.
# Limitations:
# - Per session only: if producer restarts, new PID → different session
# (use transactional.id for cross-restart idempotence)
# - Per partition: dedup within a partition, not across topics/partitions
# - max.in.flight.requests.per.connection ≤ 5 (with idempotence, ordering preserved)
// Kafka transactions: atomically write to multiple partitions
// AND commit consumer offsets — all or nothing
KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Produce to multiple topics atomically:
producer.send(new ProducerRecord<>("output-topic-1", key, value1));
producer.send(new ProducerRecord<>("output-topic-2", key, value2));
// Commit consumer offset as part of the transaction
// (consumer's offset is only visible to read_committed consumers after commit)
producer.sendOffsetsToTransaction(offsetsMap, consumerGroupMetadata);
producer.commitTransaction(); // all visible atomically
} catch (ProducerFencedException e) {
producer.close(); // another instance with same transactional.id took over
} catch (Exception e) {
producer.abortTransaction(); // rolls back all writes in this transaction
}
Use cases: Kafka Streams exactly-once processing (reads from input, writes to output, commits offset — atomically), financial double-entry bookkeeping across topics, fan-out (write to N topics atomically).
acks=all vs acks=1 vs acks=0?Easy| acks | Durability | Latency | Risk |
|---|---|---|---|
0 | None (fire and forget) | Lowest | Message lost on any failure |
1 | Leader only | Low | Lost if leader fails before replication |
all (-1) | All ISR replicas | Higher | No loss as long as ≥1 ISR survives |
acks=all with min.insync.replicas=2 (on broker/topic) for meaningful durability. With RF=3 and min.insync.replicas=2, the cluster tolerates 1 broker failure without data loss. With min.insync.replicas=1, acks=all is only as durable as acks=1.# Producer internal buffer: buffer.memory (default 32MB)
# If buffer full and broker slow: send() blocks for max.block.ms
# After max.block.ms: throws BufferExhaustedException (if block.on.buffer.full=false)
# or TimeoutException
# Strategies for backpressure:
# 1. Increase buffer.memory for burst absorption
# 2. Increase max.block.ms for temporary broker slowness
# 3. Add more partitions (increases parallel write paths)
# 4. Tune linger.ms/batch.size for better throughput
# 5. Use async send with callback and implement producer-side rate limiting:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// handle error: log, dead letter, circuit break
errorHandler.handle(exception, record);
}
});
# 6. Application-level semaphore to limit in-flight requests:
Semaphore semaphore = new Semaphore(1000);
semaphore.acquire();
producer.send(record, (meta, ex) -> semaphore.release());
A Dead Letter Queue (DLQ) is a topic where messages that cannot be processed successfully are routed after exhausting retries. Prevents poison-pill records from blocking a partition forever.
# Pattern:
# 1. Consumer processes record — fails (deserialization error, validation, DB down)
# 2. Retry logic (exponential backoff): retry.topic.orders.retry-1, .retry-2
# 3. After max retries: route to dead-letter topic "orders.DLQ"
# 4. DLQ message includes original record + error info in headers:
# - X-Exception-Message, X-Exception-Stacktrace, X-Failed-At (timestamp),
# X-Original-Topic, X-Original-Partition, X-Original-Offset
# Spring Kafka DLQ config:
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate,?> template) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(record.topic() + ".DLQ", -1));
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(1_000L);
backOff.setMultiplier(2.0);
return new DefaultErrorHandler(recoverer, backOff);
}
| Codec | Ratio | CPU | Best for |
|---|---|---|---|
| none | 1× | None | Already-compressed data (JPEG, video) |
| lz4 | 2–3× | Very low | Default recommendation — fast + good ratio |
| snappy | 2–3× | Low | Google's codec, similar to lz4 |
| gzip | 3–5× | High | Max compression, bandwidth-constrained |
| zstd | 3–4× | Medium | Best ratio/CPU balance — recommended for Kafka 2.1+ |
# Default: max.message.bytes = 1MB (broker-wide); message.max.bytes (per topic)
# Recommended: keep messages small (< 1MB), use Kafka as routing + metadata layer
# For large payloads (images, videos, large JSON):
# Option 1: Claim Check Pattern (preferred)
# - Store large payload in S3/GCS/MinIO
# - Produce only a pointer (URL/key) to Kafka
# - Consumer fetches from object store using the pointer
producer.send(new ProducerRecord<>("photos", photo.getId(),
objectStore.upload(photo.getData()))); // URL in message, data in S3
# Option 2: Tune limits (last resort — increases broker memory pressure)
# broker: message.max.bytes=10485760 (10MB)
# producer: max.request.size=10485760
# consumer: max.partition.fetch.bytes=10485760
# topic:
kafka-topics.sh --alter --topic large-topic \
--config max.message.bytes=10485760
// ProducerInterceptor: intercept records before send and after ack/error
// Use for: adding trace headers, metrics, audit logging, schema validation
public class TracingInterceptor implements ProducerInterceptor {
public ProducerRecord onSend(ProducerRecord record) {
// Add trace context header before the record is sent
Headers headers = record.headers();
headers.add("X-Trace-ID", currentTraceId().getBytes());
return record; // must return a ProducerRecord
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) metricsRegistry.incrementErrorCounter();
else metricsRegistry.recordLatency(metadata.timestamp());
}
public void close() { /* cleanup */ }
public void configure(Map configs) { /* init */ }
}
// Config:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.TracingInterceptor,com.example.MetricsInterceptor");
A consumer group is a set of consumers that collectively consume a topic. Kafka ensures each partition is consumed by exactly one consumer within the group — providing parallelism with ordering guarantees.
# Topic "orders": 6 partitions
# Group "order-service" with 3 consumers:
Consumer-1 → P0, P1
Consumer-2 → P2, P3
Consumer-3 → P4, P5
# Add 4th consumer: each gets 1-2 partitions
# Add 7th consumer: one consumer is idle (partitions < consumers)
# Remove Consumer-2: rebalance → Consumer-1 or Consumer-3 takes P2, P3
# Multiple groups can consume the same topic independently:
# Group "analytics" reads all 6 partitions (its own offsets)
# Group "order-service" reads all 6 partitions (separate offsets)
# → pub/sub fanout with ordering
Assignment strategies:
RangeAssignor (default) — assigns contiguous partitions. Consumer 1 gets P0–P1, Consumer 2 gets P2–P3. May be uneven if partition count not divisible by consumer count.RoundRobinAssignor — round-robin across all partitions and consumers. More even.StickyAssignor — minimises partition movement on rebalance. Preserves existing assignments where possible.CooperativeStickyAssignor — incremental rebalancing (see Q24).A rebalance is triggered when a consumer joins, leaves, or crashes, or when a topic's partition count changes. During an Eager (classic) rebalance, all consumers stop consuming, revoke all partitions, rejoin the group, and get new partition assignments. This is the "stop-the-world" problem — seconds of processing pause for large groups.
# Incremental Cooperative Rebalancing (Kafka 2.4+, CooperativeStickyAssignor):
# Phase 1: Kafka identifies which partitions need to move → consumers revoke ONLY those
# Phase 2: Revoked partitions re-assigned to new consumers
# Consumers not involved in the rebalance continue processing!
# 0 downtime for consumers that keep their partitions
# Config:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
# Triggers that cause rebalances:
# - Consumer fails to send heartbeat within session.timeout.ms (default 45s)
# - Consumer poll() not called within max.poll.interval.ms (default 5 min)
# - Consumer joins or leaves the group
# - New partitions added to a subscribed topic
max.poll.interval.ms to match your worst-case processing time. If processing one batch takes 2 minutes but max.poll.interval.ms=300s, Kafka assumes the consumer died and triggers a rebalance.# Offsets stored in __consumer_offsets topic (internal, compacted)
# Groups commit offsets to track their position in each partition
# AUTO COMMIT (enable.auto.commit=true, default):
# Kafka commits current offset every auto.commit.interval.ms (default 5s)
# Risk: process record → crash before commit → reprocess after restart (at-least-once)
# auto-commit fires → consumer crashes mid-batch → some records skipped (at-most-once)
# MANUAL COMMIT (preferred for production):
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
process(record);
}
// Commit after processing all records in the batch:
consumer.commitSync(); // synchronous — blocks until committed
// OR:
consumer.commitAsync((offsets, exception) -> { // async — higher throughput
if (exception != null) log.error("Commit failed: {}", offsets);
});
}
// Commit specific offsets (fine-grained):
Map offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // +1: next record to fetch
consumer.commitSync(offsets);
# Consumer lag = (partition's latest offset) - (consumer group's committed offset)
# Lag=0: consumer is caught up
# Lag growing: consumer processing slower than producer is producing
# → scale consumers, optimise processing, or increase batch sizes
# Monitor via kafka-consumer-groups.sh:
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--describe --group order-service
# Output:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-service orders 0 1234567 1234589 22
order-service orders 1 2345678 2345678 0
# Metrics endpoints:
# JMX: kafka.consumer:type=consumer-fetch-manager-metrics,records-lag-max
# Prometheus: via JMX exporter or Confluent Platform REST API
# Burrow (LinkedIn): dedicated consumer lag monitoring service
# Grafana dashboards: kafka-overview, consumer-lag dashboards
# Problem: one consumer thread can only process one record at a time
# Solution: per-partition thread pool with ordering preserved per partition
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
consumer.pause(consumer.assignment()); // pause polling while processing
// Submit each partition's records to a dedicated thread
Map> futures = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List> partitionRecords = records.records(partition);
futures.put(partition, executor.submit(() -> processAll(partitionRecords)));
}
// Wait for all partitions to complete
Map offsets = new HashMap<>();
for (Map.Entry> entry : futures.entrySet()) {
entry.getValue().get(); // wait
List> pr = records.records(entry.getKey());
ConsumerRecord,?> last = pr.get(pr.size()-1);
offsets.put(entry.getKey(), new OffsetAndMetadata(last.offset()+1));
}
consumer.commitSync(offsets);
consumer.resume(consumer.assignment());
}
# Seek API: override the consumer's position before polling
# Use for: replaying data, skipping to a specific offset, debugging
// Seek to the beginning (replay all):
consumer.subscribe(List.of("orders"));
consumer.poll(Duration.ZERO); // force partition assignment
consumer.seekToBeginning(consumer.assignment());
// Seek to end (skip all existing data, only new messages):
consumer.seekToEnd(consumer.assignment());
// Seek to specific offset:
consumer.seek(new TopicPartition("orders", 0), 1234567L);
// Seek to timestamp (find offset at/after a given time):
Map timestamps = new HashMap<>();
consumer.assignment().forEach(tp -> timestamps.put(tp, targetTimestampMs));
Map result = consumer.offsetsForTimes(timestamps);
result.forEach((tp, oat) -> {
if (oat != null) consumer.seek(tp, oat.offset());
});
// ConsumerInterceptor:
public class AuditInterceptor implements ConsumerInterceptor {
public ConsumerRecords onConsume(ConsumerRecords records) {
records.forEach(r -> audit.log(r.topic(), r.offset()));
return records;
}
public void onCommit(Map offsets) { }
}
Kafka uses a pull model — consumers request data from brokers at their own pace. Brokers do not push data to consumers.
max.partition.fetch.bytes per poll, aggregating records into efficient batches.fetch.max.wait.ms (default 500ms) before responding empty. Avoids tight polling loops.RabbitMQ uses push model (broker pushes messages to consumers). This means consumers can be overwhelmed by bursts; prefetch count (basicQos) must be configured carefully. Kafka's pull model is generally more resilient under varying loads.
auto.offset.reset and when does it apply?Easy# auto.offset.reset applies ONLY when:
# 1. A new consumer group starts (no committed offset exists for the group)
# 2. The committed offset is out of range (data was deleted due to retention)
# Values:
# "latest" (default): start consuming new messages only — ignore history
# → use for: real-time processing where history doesn't matter
# "earliest": replay from beginning of the topic (or earliest retained offset)
# → use for: initial load, audit, data pipeline backfills
# "none": throw NoOffsetForPartitionException — fail fast (requires existing offset)
# Example:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
# Gotcha: auto.offset.reset has NO effect for an existing group with committed offsets
# To force replay of an existing group: kafka-consumer-groups.sh --reset-offsets
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--group my-group --topic orders --reset-offsets --to-earliest --execute
# Poison pill: a record that consistently fails to deserialize or process
# Without handling: consumer crashes in a loop (CrashLoopBackOff for the partition)
# Solution 1: ErrorHandlingDeserializer (Spring Kafka)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
KafkaAvroDeserializer.class);
# If deserialization fails: wraps error in DeserializationException header,
# passes null value → Spring's error handler routes to DLQ
# Solution 2: Try-catch in deserializer + send to DLQ
public class SafeDeserializer implements Deserializer {
public MyEvent deserialize(String topic, byte[] data) {
try { return objectMapper.readValue(data, MyEvent.class); }
catch (Exception e) {
dlqProducer.send(new ProducerRecord<>(topic + ".DLQ", data));
return null; // return null to skip in processing
}
}
}
# Solution 3: skip offset manually
consumer.seek(new TopicPartition(record.topic(), record.partition()),
record.offset() + 1); // skip this record
# Throughput-focused (batch large, poll infrequently):
fetch.min.bytes=1048576 # wait until 1MB available (default 1)
fetch.max.wait.ms=500 # wait up to 500ms for data
max.partition.fetch.bytes=10485760 # 10MB per partition per fetch (default 1MB)
max.poll.records=2000 # process 2000 records per poll (default 500)
# Latency-focused (get records ASAP):
fetch.min.bytes=1 # return as soon as any data available (default)
fetch.max.wait.ms=0 # no waiting
max.poll.records=100 # small batches, process quickly
# Reliability:
session.timeout.ms=30000 # heartbeat timeout (default 45s) — lower = faster failure detection
heartbeat.interval.ms=3000 # heartbeat frequency (should be < session.timeout.ms / 3)
max.poll.interval.ms=600000 # max time between polls before considered dead (default 5min)
# increase for long-running batch processing
# Memory:
receive.buffer.bytes=1048576 # TCP receive buffer for fetches
Kafka Streams is a Java library for building stateful stream processing applications on top of Kafka. Unlike a raw consumer, it provides:
filter, map, flatMap, groupBy, aggregate, join, windowingprocessing.guarantee=exactly_once_v2)StreamsBuilder builder = new StreamsBuilder();
KStream orders = builder.stream("orders");
orders
.filter((key, order) -> order.getStatus().equals("COMPLETED"))
.mapValues(order -> new OrderSummary(order.getId(), order.getTotal()))
.to("order-summaries", Produced.with(Serdes.String(), orderSummarySerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// KStream: unbounded sequence of records (append log)
// Each record is independent. Great for events: clicks, transactions, log lines.
KStream views = builder.stream("page-views");
// key=userId, value={page, timestamp}
// Every record consumed and processed independently
// KTable: changelog stream — only latest value per key is retained (like a database table)
// Represents the current state of an entity.
// Underlying Kafka topic must be compacted.
KTable users = builder.table("user-profiles");
// key=userId, value=UserProfile (latest)
// Old records for same key are replaced in the local RocksDB store
// Stream-Table Join (enrich a stream with table data):
KStream enriched = views.join(
users,
(view, profile) -> new EnrichedView(view, profile.getCountry())
);
// For each PageView event, looks up the user's profile in the KTable (local store)
// Very efficient — no Kafka round-trip, local lookup
// Tumbling window: fixed-size, non-overlapping (e.g. 1-minute buckets)
KTable, Long> pageViewsPerMinute = views
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("page-views-store"));
// Hopping window: fixed-size, overlapping (e.g. 5-min window every 1 min)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
// Session window: dynamic, closes after inactivity gap
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
// Sliding window: window moves continuously; event appears in all windows that contain it
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
// Grace period: allow late-arriving records (event time < window close time)
// Without grace: late records dropped (on-time only)
// With grace(Duration.ofMinutes(2)): accept records up to 2 min late
# State store: local RocksDB embedded database per Streams instance
# Stores aggregation results, join lookups, windowed counts
# Changelog topic: Kafka topic that mirrors the state store contents
# Every state store write → appended to changelog topic
# On failure/restart: Streams restores state store by replaying changelog
# Fault-tolerance flow:
# 1. Instance-A processes partitions 0-2, stores state in RocksDB
# 2. Instance-A crashes
# 3. Rebalance: Instance-B now owns partitions 0-2
# 4. Instance-B restores state by consuming changelog topic from last checkpoint
# 5. Resumes processing (warm standby = Instance-B had already been replicating changelog)
# Standby replicas (for fast recovery):
num.standby.replicas=1 # each partition's state has 1 warm copy on another instance
# On failover: standby takes over immediately (no changelog replay needed)
# Interactive queries: query state store directly without Kafka round-trip
ReadOnlyKeyValueStore store =
streams.store(StoreQueryParameters.fromNameAndType(
"page-views-store", QueryableStoreTypes.keyValueStore()));
Long count = store.get("userId123");
# Set timestamp type on topic:
kafka-topics.sh --config message.timestamp.type=LogAppendTime # ingestion time
kafka-topics.sh --config message.timestamp.type=CreateTime # event time (default)
# Kafka Streams uses CreateTime (event time) by default for windowing
# Override with custom extractor:
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class); // use processing time
# ksqlDB: SQL interface for stream processing on top of Kafka
# Runs as a separate cluster; Kafka topics as tables/streams
-- Create a stream from a topic:
CREATE STREAM orders (
order_id VARCHAR,
user_id VARCHAR,
total DOUBLE
) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
-- Real-time filtering:
CREATE STREAM large_orders AS
SELECT * FROM orders WHERE total > 1000;
-- Windowed aggregation:
CREATE TABLE orders_per_minute AS
SELECT user_id, COUNT(*) AS order_count
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY user_id;
-- Push query (continuous output to client):
SELECT * FROM large_orders EMIT CHANGES;
-- Pull query (point-in-time state lookup):
SELECT order_count FROM orders_per_minute WHERE user_id = 'user123';
Use Kafka Streams when: complex business logic, Java/Kotlin ecosystem, custom state stores, fine-grained control, microservice embedded processing.
Use ksqlDB when: SQL-first team, rapid prototyping, operational analytics, no Java expertise needed, quick dashboards from Kafka data.
# Repartitioning: writing to an intermediate topic and reading back
# Happens automatically when groupBy() changes the key of a record
# because aggregations must process all records for a key on the SAME partition
KStream orders = builder.stream("orders"); // key=orderId
// Change key to userId for grouping:
KTable ordersByUser = orders
.groupBy((orderId, order) -> order.getUserId()) // ← TRIGGERS repartition
.count();
# What Kafka Streams does automatically:
# 1. Creates internal repartition topic: "app-id-KSTREAM-GROUPBY-STATE-STORE-0000000003-repartition"
# 2. Writes all records to repartition topic with new key (userId)
# 3. Reads from repartition topic for aggregation (now ordered by userId per partition)
# Cost: extra Kafka I/O + extra latency
# Avoid unnecessary repartitions by choosing the right initial key
# Name a repartition topic to make it reusable:
.selectKey((k,v) -> v.getUserId(), Named.as("by-user"))
.repartition(Repartitioned.as("orders-by-user").withNumberOfPartitions(12))
# exactly_once_v2 (EOS-v2, default since Kafka 3.0):
# Atomically: read from input topic + update state store + write to output topic
# All in one transaction, committed together
# If Streams crashes mid-processing: transaction aborts, no partial writes visible
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
# Requires Kafka broker >= 2.5
# EOS-v2 vs EOS-v1:
# EOS-v1 (exactly_once): one transactional producer per partition — high resource use
# EOS-v2: one transactional producer per Task group (thread) — much more efficient
# Trade-offs:
# ✅ No duplicates on failure/retry
# ✅ State store consistent with output topic
# ❌ ~10–15% throughput overhead vs at_least_once
# ❌ Higher latency (commit.interval.ms must be < transaction.timeout.ms)
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // commit every 100ms
# Shorter = lower latency but more transaction overhead
Kafka Connect is a framework for streaming data between Kafka and external systems (databases, S3, Elasticsearch, etc.) without writing custom producers/consumers. Connectors are reusable plugins.
# Architecture:
# - Connect Worker: JVM process that runs connectors/tasks
# - Source Connector: pulls data from external system → Kafka topic
# - Sink Connector: reads from Kafka topic → pushes to external system
# - Task: unit of work within a connector (parallelism)
# Distributed mode (production — multiple workers, fault-tolerant):
# Workers coordinate via Kafka internal topics:
# config.storage.topic, offset.storage.topic, status.storage.topic
# Deploy a JDBC Source Connector via REST API:
curl -X POST http://connect:8083/connectors -H 'Content-Type: application/json' -d '{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/shop",
"table.whitelist": "orders",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "db.",
"tasks.max": "3"
}
}'
# Popular connectors: Debezium (CDC), S3 Sink, Elasticsearch Sink, JDBC, MongoDB
CDC (Change Data Capture) streams database changes (INSERT/UPDATE/DELETE) as events to Kafka in near real-time. Debezium is the leading CDC tool — reads database transaction logs, not query results.
# Debezium reads database bin-log / WAL / redo log:
# PostgreSQL → WAL (logical replication)
# MySQL → bin-log
# MongoDB → oplog
# SQL Server → CDC tables
# Debezium Postgres Connector produces events to: server.schema.tableName
# Each event includes: before, after, op (c=create, u=update, d=delete), ts_ms
# Example output to "myserver.public.orders" topic:
{
"op": "u",
"before": { "id": 123, "status": "PENDING" },
"after": { "id": 123, "status": "SHIPPED" },
"ts_ms": 1719100000000,
"source": { "db": "shop", "table": "orders" }
}
# Use cases:
# - Sync database to Elasticsearch / Redis / data warehouse
# - Invalidate caches on DB change
# - Event sourcing: all DB changes as events
# - Microservice data consistency via outbox pattern
Schema Registry stores and validates schemas for Kafka messages (Avro, Protobuf, JSON Schema). Producers register a schema; consumers look up the schema to deserialize records. Prevents breaking changes.
# Message format with Avro + Schema Registry:
# [magic byte (1)] [schema-id (4 bytes)] [avro payload]
# Consumer looks up schema-id in registry → deserializes payload
# Schema compatibility modes:
# BACKWARD (default): new schema can read data written by old schema
# → can delete optional fields; add new fields with defaults
# FORWARD: old schema can read data written by new schema
# FULL: both backward + forward compatible
# NONE: no compatibility check (dangerous in production)
# Register schema:
curl -X POST http://registry:8081/subjects/orders-value/versions \
-H 'Content-Type: application/vnd.schemaregistry.v1+json' \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",..."}'
# Producer config:
props.put("schema.registry.url", "http://registry:8081");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
# Consumer config:
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("specific.avro.reader", "true"); // deserialize to generated Java class
# MirrorMaker 2 (MM2): built on Kafka Connect framework
# Replicates topics across Kafka clusters (active-passive or active-active)
# Use cases:
# - Disaster recovery (primary → secondary cluster)
# - Geo-replication (US cluster → EU cluster)
# - Cloud migration
# - Multi-region active-active
# mm2.properties:
clusters = source, target
source.bootstrap.servers = source-kafka:9092
target.bootstrap.servers = target-kafka:9092
source->target.enabled = true
source->target.topics = orders, payments, user-events
source->target.groups = order-service, payment-service
# MM2 features:
# - Offset translation: source offset → target offset mapping (stored in checkpoint topic)
# - Consumer group offset sync: consumer groups can fail over to target cluster
# - Topic auto-creation on target
# - Heartbeat topic for measuring replication lag
# - Replication topics prefixed: "source.orders" on target cluster
# Active-active: both clusters produce; MM2 replicates in both directions
# Avoids infinite replication loop with provenance tracking
# 1. Encryption in transit: TLS
listeners=SASL_SSL://:9093
ssl.keystore.location=/certs/kafka.keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/certs/kafka.truststore.jks
ssl.truststore.password=changeit
# 2. Authentication (who are you?):
# SASL/PLAIN: username+password (simple, SSL required)
# SASL/SCRAM-SHA-256: password with challenge-response
# SASL/GSSAPI (Kerberos): enterprise SSO
# mTLS: certificate-based auth (client cert = identity)
# 3. Authorization (what can you do?): ACLs
kafka-acls.sh --bootstrap-server broker:9093 \
--add --allow-principal User:order-service \
--operation Write --topic orders
kafka-acls.sh --add --allow-principal User:analytics \
--operation Read --topic orders --group analytics-group
# 4. Encryption at rest:
# Kafka doesn't encrypt data at rest natively
# Use OS-level encryption (LUKS) or cloud KMS (AWS KMS, GCP CMEK)
# Or encrypt before writing (application-level encryption)
# 5. Audit logging:
# Confluent Platform: built-in audit log topic
# Open source: use authorizer log (kafka-authorizer.log)
# Broker JMX metrics (critical):
# kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec → throughput
# kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec → bandwidth
# kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec → fan-out
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions → ALERT if > 0
# kafka.server:type=ReplicaManager,name=OfflinePartitionsCount → ALERT if > 0
# kafka.controller:type=KafkaController,name=ActiveControllerCount → ALERT if != 1
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce → produce latency
# kafka.log:type=LogFlushRateAndTimeMs → disk flush time
# Consumer metrics:
# records-lag-max per (group, topic, partition) → ALERT on growing lag
# fetch-latency-avg
# commit-rate
# Producer metrics:
# record-error-rate → ALERT if > 0
# record-send-rate
# batch-size-avg (low = batching inefficient)
# request-latency-avg
# OS-level:
# Disk I/O utilisation (iostat) — Kafka is I/O heavy
# Network throughput
# JVM GC pauses (should use G1GC or ZGC, avoid full GC)
# Open file descriptors (Kafka opens many log files)
# Broker:
num.network.threads=8 # network I/O threads (≥ CPUs / 2)
num.io.threads=16 # disk I/O threads (≥ CPUs)
socket.send.buffer.bytes=1048576 # 1MB TCP send buffer
socket.receive.buffer.bytes=1048576 # 1MB TCP receive buffer
socket.request.max.bytes=104857600 # 100MB max request size
# Log/storage:
log.flush.interval.messages=Long.MAX # let OS flush (don't sync every message)
log.flush.interval.ms=Long.MAX # rely on replication, not sync flush
log.segment.bytes=1073741824 # 1GB segment files
num.recovery.threads.per.data.dir=8 # faster startup recovery
# JVM:
# Heap: 4–6GB (no more — large heaps cause GC pauses, page cache more important)
# GC: G1GC or ZGC
# -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:G1HeapRegionSize=16M
# OS:
# file descriptor limit: ulimit -n 1000000
# vm.swappiness=1 # avoid swapping page cache to swap
# net.core.rmem_max=134217728 # large TCP receive buffer
# noatime mount option for Kafka log dirs (reduce inode writes)
The Outbox Pattern solves the dual-write problem in microservices: atomically updating a database AND publishing a Kafka event (without distributed transactions).
# Problem: service writes to DB + produces to Kafka — one may fail
# → inconsistency: DB updated but event lost (or vice versa)
# Outbox Pattern solution:
# 1. Write DB update AND outbox record in same local DB transaction:
BEGIN;
UPDATE orders SET status='SHIPPED' WHERE id=123;
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', 123, 'OrderShipped', '{"orderId":123,...}');
COMMIT;
# 2. Debezium CDC connector reads the outbox table changes from WAL
# 3. Publishes to Kafka topic (guaranteed at-least-once, idempotent consumer handles duplicates)
# 4. Outbox records deleted/compacted after publish
# Benefits:
# - Atomicity via local DB transaction (no 2PC, no distributed transaction)
# - Events always consistent with DB state
# - Debezium Outbox Router SMT (Single Message Transform) extracts routing from outbox record
# Debezium transforms outbox record into proper Kafka event:
# topic = aggregate_type + "." + event_type (e.g., "Order.OrderShipped")
# key = aggregate_id
# value = payload
# Under-replicated partition: a partition where ISR < replication factor
# Causes: broker failure, slow broker, network issues, GC pause
# Detection:
kafka-topics.sh --bootstrap-server broker:9092 \
--describe --under-replicated-partitions
# Broker failure recovery (automatic):
# 1. Controller detects broker down (ZK/KRaft heartbeat timeout)
# 2. Controller elects new leaders for all partitions where failed broker was leader
# 3. Producers/consumers fetch new metadata and reconnect
# Recovery time: seconds to minutes depending on partition count
# Broker comes back online:
# 1. Broker rejoins, starts as follower for its former partitions
# 2. Fetches from leaders to catch up (becomes ISR member when caught up)
# 3. Preferred leader election: re-elect original leader if desired
auto.leader.rebalance.enable=true # automatic preferred leader election (default true)
# Or manually:
kafka-leader-election.sh --bootstrap-server broker:9092 \
--election-type preferred --all-topic-partitions
# Throttle recovery to avoid impacting producers/consumers:
kafka-configs.sh --bootstrap-server broker:9092 \
--entity-type brokers --entity-name 3 \
--alter --add-config 'follower.replication.throttled.rate=50000000' # 50MB/s
| Aspect | Kafka | RabbitMQ | Pulsar |
|---|---|---|---|
| Model | Log / commit log | Traditional queue / exchange | Unified log + queue |
| Throughput | Millions/sec per broker | Tens of thousands/sec | Millions/sec |
| Retention | Days/weeks (log-based) | Until consumed (deleted) | Tiered (BookKeeper + S3) |
| Replay | Yes (seek to any offset) | No (once consumed, gone) | Yes |
| Ordering | Per partition | Per queue | Per partition |
| Routing | Topic / partitioner | Flexible exchanges (topic/fanout/headers) | Topic / subscription types |
| Best for | High-volume streaming, CDC, event sourcing | Task queues, RPC, complex routing, low latency | Multi-tenancy, geo-replication, hybrid queue+stream |