Message Queues in Java: Beyond JMS (2025)

Modern message queue implementations provide powerful features beyond traditional JMS. This comprehensive guide explores advanced messaging patterns and implementations in Java.
Pro Tip: Understanding modern message queue patterns helps in building scalable and resilient applications.
Table of Contents
Kafka Implementation
Note: Kafka provides high-throughput, fault-tolerant event streaming capabilities.
Producer Implementation
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory producerFactory() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendOrder(Order order) {
kafkaTemplate.send("orders", order.getId(), order)
.addCallback(
success -> log.info("Order sent successfully: {}", order.getId()),
failure -> log.error("Failed to send order: {}", order.getId(), failure)
);
}
public void sendOrderWithPartition(Order order, int partition) {
kafkaTemplate.send("orders", partition, order.getId(), order)
.addCallback(
success -> log.info("Order sent to partition {}: {}", partition, order.getId()),
failure -> log.error("Failed to send order: {}", order.getId(), failure)
);
}
}
Consumer Implementation
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void consumeOrder(Order order) {
log.info("Received order: {}", order.getId());
processOrder(order);
}
@KafkaListener(topics = "orders", groupId = "order-group",
containerFactory = "kafkaListenerContainerFactory")
public void consumeOrderWithRetry(Order order, Acknowledgment ack) {
try {
processOrder(order);
ack.acknowledge();
} catch (Exception e) {
log.error("Error processing order: {}", order.getId(), e);
// Implement retry logic
}
}
}
RabbitMQ Patterns
Pro Tip: RabbitMQ provides flexible messaging patterns with support for various exchange types.
Publisher Implementation
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
@Service
public class OrderPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void publishOrder(Order order) {
rabbitTemplate.convertAndSend(
"order-exchange",
"order.routing.key",
order,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setPriority(1);
return message;
}
);
}
public void publishOrderWithConfirmation(Order order) {
rabbitTemplate.convertAndSend(
"order-exchange",
"order.routing.key",
order,
message -> {
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
);
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
if (ack) {
log.info("Order published successfully: {}", order.getId());
} else {
log.error("Failed to publish order: {}", order.getId(), cause);
}
});
}
}
Consumer Implementation
@Configuration
public class RabbitMQConsumerConfig {
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order-queue")
.withArgument("x-message-ttl", 60000)
.withArgument("x-dead-letter-exchange", "order-dlx")
.build();
}
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange("order-exchange")
.durable(true)
.build();
}
@Bean
public Binding binding(Queue orderQueue, Exchange orderExchange) {
return BindingBuilder
.bind(orderQueue)
.to(orderExchange)
.with("order.routing.key")
.noargs();
}
}
@Service
public class OrderConsumer {
@RabbitListener(queues = "order-queue")
public void consumeOrder(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
processOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
try {
channel.basicNack(tag, false, true);
} catch (IOException ex) {
log.error("Error sending NACK", ex);
}
}
}
@RabbitListener(queues = "order-dlq")
public void consumeDeadLetter(Order order) {
log.error("Processing dead letter order: {}", order.getId());
// Implement dead letter handling
}
}
Event Streaming
Note: Event streaming enables real-time processing of continuous data streams.
Stream Processing
@Configuration
public class KafkaStreamsConfig {
@Bean
public StreamsBuilder streamsBuilder() {
return new StreamsBuilder();
}
@Bean
public KStream orderStream(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream("orders");
}
@Bean
public KTable orderCounts(KStream orderStream) {
return orderStream
.groupByKey()
.count()
.mapValues(count -> new OrderCount(count));
}
}
@Service
public class OrderStreamProcessor {
@Autowired
private StreamsBuilder streamsBuilder;
public void processOrderStream() {
KStream orders = streamsBuilder.stream("orders");
orders
.filter((key, order) -> order.getAmount() > 1000)
.mapValues(order -> {
order.setStatus("HIGH_VALUE");
return order;
})
.to("high-value-orders");
orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
.toStream()
.to("order-counts");
}
}
Event Sourcing
@Service
public class OrderEventSourcing {
@Autowired
private KafkaTemplate kafkaTemplate;
public void saveOrderEvent(OrderEvent event) {
kafkaTemplate.send(
"order-events",
event.getOrderId(),
event
);
}
public List getOrderEvents(String orderId) {
// Implement event retrieval logic
return new ArrayList<>();
}
public Order reconstructOrder(String orderId) {
List events = getOrderEvents(orderId);
Order order = new Order();
events.forEach(event -> applyEvent(order, event));
return order;
}
private void applyEvent(Order order, OrderEvent event) {
switch (event.getType()) {
case "CREATED":
order.setId(event.getOrderId());
order.setAmount(event.getAmount());
break;
case "UPDATED":
order.setStatus(event.getStatus());
break;
// Handle other event types
}
}
}
Message Patterns
Pro Tip: Understanding message patterns helps in designing robust messaging systems.
Publish-Subscribe Pattern
@Service
public class OrderPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void publishOrder(Order order) {
rabbitTemplate.convertAndSend(
"order-fanout-exchange",
"",
order
);
}
}
@Service
public class OrderSubscriber {
@RabbitListener(queues = "order-notification-queue")
public void handleNotification(Order order) {
// Handle notification
}
@RabbitListener(queues = "order-processing-queue")
public void handleProcessing(Order order) {
// Handle processing
}
}
Request-Reply Pattern
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public OrderResponse processOrder(Order order) {
return rabbitTemplate.convertSendAndReceive(
"order-exchange",
"order.request",
order,
message -> {
message.getMessageProperties().setReplyTo("order-reply-queue");
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
);
}
}
@Service
public class OrderProcessor {
@RabbitListener(queues = "order-request-queue")
public OrderResponse processOrderRequest(Order order) {
// Process order
return new OrderResponse(order.getId(), "PROCESSED");
}
}
Best Practices
Pro Tip: Following message queue best practices ensures reliable and maintainable systems.
Message Queue Best Practices
- Use appropriate message formats (JSON, Avro, Protocol Buffers)
- Implement proper error handling and retry mechanisms
- Use message compression for large payloads
- Implement proper monitoring and alerting
- Use dead letter queues for failed messages
- Implement proper message validation
- Use message versioning
- Implement proper security measures
- Use appropriate message routing strategies
- Implement proper message persistence
- Use appropriate message acknowledgment modes
- Implement proper message filtering
- Use appropriate message transformation
- Implement proper message batching
- Follow message queue standards and patterns
Conclusion
Modern message queue implementations provide powerful features for building scalable and resilient applications. By understanding and implementing these patterns and best practices, you can create robust messaging systems that meet your application's needs.