Java Apache Kafka Guide: Complete Tutorial

1️⃣ Introduction

Apache Kafka is a distributed streaming platform that enables building real-time data pipelines and streaming applications. This guide covers everything you need to know about using Kafka with Java.

Key areas covered:

  • Kafka Architecture and Concepts
  • Producer and Consumer APIs
  • Stream Processing
  • Spring Kafka Integration
  • Error Handling
  • Monitoring and Management
  • Security Configuration
  • Best Practices

2️⃣ Kafka Producer

🔹 Basic Producer

public class KafkaProducerExample {
    private final KafkaProducer producer;
    private final String topic;
    
    public KafkaProducerExample(String bootstrapServers, 
            String topic) {
        this.topic = topic;
        
        // Producer properties
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        
        // Additional configurations
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        
        this.producer = new KafkaProducer<>(props);
    }
    
    public void sendMessage(String key, String value) {
        ProducerRecord record = 
            new ProducerRecord<>(topic, key, value);
            
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                log.info("Message sent successfully: topic={}, partition={}, offset={}", 
                    metadata.topic(), 
                    metadata.partition(), 
                    metadata.offset());
            } else {
                log.error("Error sending message", exception);
            }
        });
    }
    
    public void sendMessageSync(String key, String value) 
            throws Exception {
        ProducerRecord record = 
            new ProducerRecord<>(topic, key, value);
            
        try {
            RecordMetadata metadata = producer
                .send(record)
                .get();  // Block for result
                
            log.info("Message sent synchronously: topic={}, partition={}, offset={}", 
                metadata.topic(), 
                metadata.partition(), 
                metadata.offset());
        } catch (Exception e) {
            log.error("Error sending message synchronously", e);
            throw e;
        }
    }
}

🔹 Custom Serializer

public class CustomerSerializer implements Serializer {
    private final ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(String topic, Customer customer) {
        try {
            return mapper.writeValueAsBytes(customer);
        } catch (Exception e) {
            throw new SerializationException(
                "Error serializing Customer", e);
        }
    }
}

public class KafkaCustomerProducer {
    private final KafkaProducer producer;
    
    public KafkaCustomerProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            CustomerSerializer.class.getName());
            
        this.producer = new KafkaProducer<>(props);
    }
    
    public void sendCustomer(String key, Customer customer) {
        ProducerRecord record = 
            new ProducerRecord<>("customers", key, customer);
            
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                log.info("Customer sent successfully");
            } else {
                log.error("Error sending customer", exception);
            }
        });
    }
}

3️⃣ Kafka Consumer

🔹 Basic Consumer

public class KafkaConsumerExample {
    private final KafkaConsumer consumer;
    private final String topic;
    private volatile boolean running = true;
    
    public KafkaConsumerExample(String bootstrapServers, 
            String groupId, 
            String topic) {
        this.topic = topic;
        
        // Consumer properties
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
            groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        
        // Additional configurations
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
            "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
            "false");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
            500);
        
        this.consumer = new KafkaConsumer<>(props);
    }
    
    public void startConsuming() {
        try {
            consumer.subscribe(Collections.singletonList(topic));
            
            while (running) {
                ConsumerRecords records = 
                    consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord record : records) {
                    processRecord(record);
                }
                
                // Manual commit after processing
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
    
    private void processRecord(
            ConsumerRecord record) {
        log.info("Received message: topic={}, partition={}, offset={}, key={}, value={}", 
            record.topic(), 
            record.partition(), 
            record.offset(), 
            record.key(), 
            record.value());
            
        // Process the message
    }
    
    public void stopConsuming() {
        running = false;
    }
}

🔹 Batch Processing

public class BatchConsumerExample {
    private final KafkaConsumer consumer;
    private final int batchSize;
    private final BatchProcessor batchProcessor;
    
    public void processBatches() {
        List> batch = 
            new ArrayList<>();
            
        try {
            while (true) {
                ConsumerRecords records = 
                    consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord record : records) {
                    batch.add(record);
                    
                    if (batch.size() >= batchSize) {
                        processBatch(batch);
                        consumer.commitSync();
                        batch.clear();
                    }
                }
                
                if (!batch.isEmpty()) {
                    processBatch(batch);
                    consumer.commitSync();
                    batch.clear();
                }
            }
        } finally {
            consumer.close();
        }
    }
    
    private void processBatch(
            List> batch) {
        try {
            batchProcessor.process(batch);
        } catch (Exception e) {
            log.error("Error processing batch", e);
            // Implement retry or error handling logic
        }
    }
}

4️⃣ Stream Processing

🔹 Kafka Streams

public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
            "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read from input topic
        KStream source = 
            builder.stream("input-topic");
        
        // Process stream
        KStream processed = source
            .filter((key, value) -> value != null)
            .mapValues(value -> value.toUpperCase())
            .peek((key, value) -> 
                log.info("Processing: key={}, value={}", 
                    key, value));
        
        // Write to output topic
        processed.to("output-topic");
        
        // Build and start the topology
        KafkaStreams streams = 
            new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
        }));
    }
}

🔹 Windowing Operations

public class WindowedStreamExample {
    public void processWindowedStream() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Configure time windows
        TimeWindows timeWindows = TimeWindows
            .of(Duration.ofMinutes(5))
            .advanceBy(Duration.ofMinutes(1));
        
        // Create windowed aggregation
        builder.stream("input-topic", 
                Consumed.with(Serdes.String(), Serdes.String()))
            .groupByKey()
            .windowedBy(timeWindows)
            .aggregate(
                () -> 0L,  // Initializer
                (key, value, aggregate) -> aggregate + 1L,  // Adder
                Materialized.with(Serdes.String(), 
                    Serdes.Long())
            )
            .toStream()
            .map((key, value) -> 
                KeyValue.pair(key.key(), value.toString()))
            .to("output-topic");
    }
}

5️⃣ Spring Kafka Integration

🔹 Configuration

@Configuration
@EnableKafka
public class KafkaConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ProducerFactory producerFactory() {
        Map config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ConsumerFactory consumerFactory() {
        Map config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, 
            "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory 
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory 
            factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

🔹 Producer and Consumer

@Service
public class KafkaProducerService {
    private final KafkaTemplate kafkaTemplate;
    
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message)
            .addCallback(
                result -> log.info("Message sent successfully"), 
                ex -> log.error("Error sending message", ex)
            );
    }
}

@Service
public class KafkaConsumerService {
    @KafkaListener(
        topics = "topic-name",
        groupId = "group_id"
    )
    public void consume(String message) {
        log.info("Received message: {}", message);
        // Process the message
    }
    
    @KafkaListener(
        topics = "batch-topic",
        groupId = "batch_group",
        containerFactory = "batchListenerFactory"
    )
    public void consumeBatch(List messages) {
        log.info("Received batch of {} messages", 
            messages.size());
        // Process the batch
    }
}

6️⃣ Error Handling

🔹 Producer Error Handling

@Service
public class ErrorHandlingProducer {
    private final KafkaTemplate kafkaTemplate;
    private final String topic;
    
    public void sendWithRetry(String message, int maxRetries) {
        int retries = 0;
        while (retries < maxRetries) {
            try {
                kafkaTemplate.send(topic, message).get();
                return;
            } catch (Exception e) {
                retries++;
                if (retries == maxRetries) {
                    log.error("Failed to send message after {} retries", 
                        maxRetries, e);
                    throw new KafkaException(
                        "Failed to send message", e);
                }
                log.warn("Retry attempt {} of {}", 
                    retries, maxRetries);
                sleep(exponentialBackoff(retries));
            }
        }
    }
    
    private long exponentialBackoff(int retryCount) {
        return Math.min(1000L * (long) Math.pow(2, retryCount), 
            60000L);
    }
}

@Configuration
public class ErrorHandlingConfig {
    @Bean
    public ConsumerFactory consumerFactory() {
        // ... basic configuration ...
        
        // Error handling configuration
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
            false);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
            300000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
            500);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory 
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory 
            factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Configure error handler
        factory.setErrorHandler((exception, data) -> {
            log.error("Error in consumer", exception);
            // Handle the error (e.g., send to DLQ)
        });
        
        return factory;
    }
}

🔹 Dead Letter Queue

@Configuration
public class DeadLetterConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory 
            kafkaListenerContainerFactory(
            KafkaTemplate template) {
        ConcurrentKafkaListenerContainerFactory 
            factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Configure DLQ error handler
        factory.setErrorHandler(new DeadLetterPublishingRecoverer(
            template, 
            (record, exception) -> {
                return new TopicPartition(
                    record.topic() + ".DLQ", 
                    record.partition()
                );
            }
        ));
        
        return factory;
    }
}

@Service
public class DeadLetterConsumer {
    @KafkaListener(
        topics = "${kafka.topic.name}.DLQ",
        groupId = "dlq-group"
    )
    public void consumeDeadLetter(
            @Payload String message,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
        log.error("Processing dead letter: topic={}, message={}, timestamp={}", 
            topic, message, timestamp);
        // Handle dead letter message
    }
}

7️⃣ Q&A / Frequently Asked Questions

For reliable delivery: (1) Use appropriate acks configuration. (2) Implement proper error handling. (3) Use idempotent producer. (4) Configure retries. (5) Use manual commit for consumers. (6) Implement dead letter queues. (7) Monitor delivery metrics. (8) Use transaction if needed.

Performance best practices: (1) Proper partition count. (2) Batch size optimization. (3) Compression configuration. (4) Producer buffer sizing. (5) Consumer group design. (6) Proper serialization. (7) Monitoring and metrics. (8) Hardware sizing. (9) Network configuration. (10) Regular maintenance.

Schema evolution strategies: (1) Use Avro or Protobuf. (2) Implement Schema Registry. (3) Backward compatibility. (4) Forward compatibility. (5) Version management. (6) Default values. (7) Migration strategy. (8) Testing approach. (9) Documentation. (10) Monitoring.

8️⃣ Best Practices & Pro Tips 🚀

  • Use appropriate partitioning strategy
  • Implement proper error handling
  • Configure message retention
  • Monitor consumer lag
  • Use compression when needed
  • Implement proper security
  • Regular maintenance
  • Proper logging and monitoring
  • Schema management
  • Performance tuning
  • Backup and recovery
  • Documentation

Read Next 📖

Conclusion

Apache Kafka is a powerful tool for building scalable event-driven applications. By following the patterns and practices outlined in this guide, you can effectively implement Kafka-based solutions in your Java applications.

Remember to regularly monitor your Kafka clusters, implement proper error handling, and follow best practices for optimal performance and reliability.