Apache Kafka is a distributed streaming platform that enables building real-time data pipelines and streaming applications. This guide covers everything you need to know about using Kafka with Java.
Key areas covered:
public class KafkaProducerExample {
private final KafkaProducer producer;
private final String topic;
public KafkaProducerExample(String bootstrapServers,
String topic) {
this.topic = topic;
// Producer properties
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Additional configurations
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
this.producer = new KafkaProducer<>(props);
}
public void sendMessage(String key, String value) {
ProducerRecord record =
new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Message sent successfully: topic={}, partition={}, offset={}",
metadata.topic(),
metadata.partition(),
metadata.offset());
} else {
log.error("Error sending message", exception);
}
});
}
public void sendMessageSync(String key, String value)
throws Exception {
ProducerRecord record =
new ProducerRecord<>(topic, key, value);
try {
RecordMetadata metadata = producer
.send(record)
.get(); // Block for result
log.info("Message sent synchronously: topic={}, partition={}, offset={}",
metadata.topic(),
metadata.partition(),
metadata.offset());
} catch (Exception e) {
log.error("Error sending message synchronously", e);
throw e;
}
}
}
public class CustomerSerializer implements Serializer {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, Customer customer) {
try {
return mapper.writeValueAsBytes(customer);
} catch (Exception e) {
throw new SerializationException(
"Error serializing Customer", e);
}
}
}
public class KafkaCustomerProducer {
private final KafkaProducer producer;
public KafkaCustomerProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
CustomerSerializer.class.getName());
this.producer = new KafkaProducer<>(props);
}
public void sendCustomer(String key, Customer customer) {
ProducerRecord record =
new ProducerRecord<>("customers", key, customer);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Customer sent successfully");
} else {
log.error("Error sending customer", exception);
}
});
}
}
public class KafkaConsumerExample {
private final KafkaConsumer consumer;
private final String topic;
private volatile boolean running = true;
public KafkaConsumerExample(String bootstrapServers,
String groupId,
String topic) {
this.topic = topic;
// Consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Additional configurations
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
500);
this.consumer = new KafkaConsumer<>(props);
}
public void startConsuming() {
try {
consumer.subscribe(Collections.singletonList(topic));
while (running) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
processRecord(record);
}
// Manual commit after processing
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private void processRecord(
ConsumerRecord record) {
log.info("Received message: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
// Process the message
}
public void stopConsuming() {
running = false;
}
}
public class BatchConsumerExample {
private final KafkaConsumer consumer;
private final int batchSize;
private final BatchProcessor batchProcessor;
public void processBatches() {
List> batch =
new ArrayList<>();
try {
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
batch.add(record);
if (batch.size() >= batchSize) {
processBatch(batch);
consumer.commitSync();
batch.clear();
}
}
if (!batch.isEmpty()) {
processBatch(batch);
consumer.commitSync();
batch.clear();
}
}
} finally {
consumer.close();
}
}
private void processBatch(
List> batch) {
try {
batchProcessor.process(batch);
} catch (Exception e) {
log.error("Error processing batch", e);
// Implement retry or error handling logic
}
}
}
public class KafkaStreamsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream source =
builder.stream("input-topic");
// Process stream
KStream processed = source
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase())
.peek((key, value) ->
log.info("Processing: key={}, value={}",
key, value));
// Write to output topic
processed.to("output-topic");
// Build and start the topology
KafkaStreams streams =
new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
}
}
public class WindowedStreamExample {
public void processWindowedStream() {
StreamsBuilder builder = new StreamsBuilder();
// Configure time windows
TimeWindows timeWindows = TimeWindows
.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1));
// Create windowed aggregation
builder.stream("input-topic",
Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(timeWindows)
.aggregate(
() -> 0L, // Initializer
(key, value, aggregate) -> aggregate + 1L, // Adder
Materialized.with(Serdes.String(),
Serdes.Long())
)
.toStream()
.map((key, value) ->
KeyValue.pair(key.key(), value.toString()))
.to("output-topic");
}
}
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory producerFactory() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG,
"group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class KafkaProducerService {
private final KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message)
.addCallback(
result -> log.info("Message sent successfully"),
ex -> log.error("Error sending message", ex)
);
}
}
@Service
public class KafkaConsumerService {
@KafkaListener(
topics = "topic-name",
groupId = "group_id"
)
public void consume(String message) {
log.info("Received message: {}", message);
// Process the message
}
@KafkaListener(
topics = "batch-topic",
groupId = "batch_group",
containerFactory = "batchListenerFactory"
)
public void consumeBatch(List messages) {
log.info("Received batch of {} messages",
messages.size());
// Process the batch
}
}
@Service
public class ErrorHandlingProducer {
private final KafkaTemplate kafkaTemplate;
private final String topic;
public void sendWithRetry(String message, int maxRetries) {
int retries = 0;
while (retries < maxRetries) {
try {
kafkaTemplate.send(topic, message).get();
return;
} catch (Exception e) {
retries++;
if (retries == maxRetries) {
log.error("Failed to send message after {} retries",
maxRetries, e);
throw new KafkaException(
"Failed to send message", e);
}
log.warn("Retry attempt {} of {}",
retries, maxRetries);
sleep(exponentialBackoff(retries));
}
}
}
private long exponentialBackoff(int retryCount) {
return Math.min(1000L * (long) Math.pow(2, retryCount),
60000L);
}
}
@Configuration
public class ErrorHandlingConfig {
@Bean
public ConsumerFactory consumerFactory() {
// ... basic configuration ...
// Error handling configuration
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
300000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
500);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure error handler
factory.setErrorHandler((exception, data) -> {
log.error("Error in consumer", exception);
// Handle the error (e.g., send to DLQ)
});
return factory;
}
}
@Configuration
public class DeadLetterConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory(
KafkaTemplate template) {
ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure DLQ error handler
factory.setErrorHandler(new DeadLetterPublishingRecoverer(
template,
(record, exception) -> {
return new TopicPartition(
record.topic() + ".DLQ",
record.partition()
);
}
));
return factory;
}
}
@Service
public class DeadLetterConsumer {
@KafkaListener(
topics = "${kafka.topic.name}.DLQ",
groupId = "dlq-group"
)
public void consumeDeadLetter(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
log.error("Processing dead letter: topic={}, message={}, timestamp={}",
topic, message, timestamp);
// Handle dead letter message
}
}
Apache Kafka is a powerful tool for building scalable event-driven applications. By following the patterns and practices outlined in this guide, you can effectively implement Kafka-based solutions in your Java applications.
Remember to regularly monitor your Kafka clusters, implement proper error handling, and follow best practices for optimal performance and reliability.