RabbitMQ is a robust message broker that supports multiple messaging protocols. This guide covers everything you need to know about using RabbitMQ with Java.
Key areas covered:
public class RabbitMQConnection {
private final ConnectionFactory factory;
private Connection connection;
private Channel channel;
public RabbitMQConnection(String host, int port,
String username, String password) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
// Enable automatic recovery
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
}
public void connect() throws Exception {
connection = factory.newConnection();
channel = connection.createChannel();
}
public Channel getChannel() {
return channel;
}
public void close() {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
log.error("Error closing RabbitMQ connection", e);
}
}
}
public class MessagePublisher {
private final Channel channel;
private final String exchange;
private final String routingKey;
public MessagePublisher(Channel channel,
String exchange, String routingKey) {
this.channel = channel;
this.exchange = exchange;
this.routingKey = routingKey;
try {
// Declare exchange
channel.exchangeDeclare(exchange,
BuiltinExchangeType.DIRECT, true);
} catch (IOException e) {
log.error("Error declaring exchange", e);
}
}
public void publish(String message) throws IOException {
channel.basicPublish(exchange,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
}
public void publishWithConfirm(String message)
throws Exception {
channel.confirmSelect();
publish(message);
if (!channel.waitForConfirms()) {
throw new IOException("Message not confirmed");
}
}
}
public class MessageConsumer {
private final Channel channel;
private final String queue;
public MessageConsumer(Channel channel, String queue,
String exchange, String routingKey)
throws IOException {
this.channel = channel;
this.queue = queue;
// Declare queue and bind to exchange
channel.queueDeclare(queue, true, false,
false, null);
channel.queueBind(queue, exchange, routingKey);
}
public void consume(MessageHandler handler)
throws IOException {
// Set QoS
channel.basicQos(1);
DeliverCallback deliverCallback =
(consumerTag, delivery) -> {
String message = new String(
delivery.getBody(), "UTF-8");
try {
handler.handleMessage(message);
channel.basicAck(
delivery.getEnvelope().getDeliveryTag(),
false);
} catch (Exception e) {
channel.basicNack(
delivery.getEnvelope().getDeliveryTag(),
false, true);
log.error("Error processing message", e);
}
};
channel.basicConsume(queue, false,
deliverCallback, consumerTag -> {});
}
public interface MessageHandler {
void handleMessage(String message) throws Exception;
}
}
public class DeadLetterConfig {
private final Channel channel;
public DeadLetterConfig(Channel channel) {
this.channel = channel;
}
public void setupDeadLetterQueue(String originalQueue,
String dlxExchange, String dlxQueue)
throws IOException {
// Declare DLX exchange
channel.exchangeDeclare(dlxExchange,
BuiltinExchangeType.DIRECT, true);
// Declare DLX queue
channel.queueDeclare(dlxQueue, true, false,
false, null);
channel.queueBind(dlxQueue, dlxExchange,
"dead-letter");
// Setup original queue with DLX
Map args = new HashMap<>();
args.put("x-dead-letter-exchange", dlxExchange);
args.put("x-dead-letter-routing-key",
"dead-letter");
channel.queueDeclare(originalQueue, true, false,
false, args);
}
}
public class ConfirmPublisher {
private final Channel channel;
private final ConcurrentNavigableMap
outstandingConfirms = new ConcurrentSkipListMap<>();
public ConfirmPublisher(Channel channel)
throws IOException {
this.channel = channel;
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber,
multiple) -> {
if (multiple) {
ConcurrentNavigableMap
confirmed = outstandingConfirms
.headMap(sequenceNumber, true);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
}, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(
sequenceNumber);
log.error("Message {} was nack-ed. Retrying...",
body);
// Implement retry logic
});
}
public void publish(String exchange, String routingKey,
String message) throws IOException {
outstandingConfirms.put(
channel.getNextPublishSeqNo(), message);
channel.basicPublish(exchange,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
}
}
public class RabbitMQMonitor {
private final Connection connection;
private final MetricRegistry metrics;
public RabbitMQMonitor(Connection connection) {
this.connection = connection;
this.metrics = new MetricRegistry();
setupMetrics();
}
private void setupMetrics() {
connection.addBlockedListener(new BlockedListener() {
@Override
public void handleBlocked(String reason) {
metrics.counter("rabbitmq.blocked").inc();
log.warn("RabbitMQ connection blocked: {}",
reason);
}
@Override
public void handleUnblocked() {
metrics.counter("rabbitmq.unblocked").inc();
log.info("RabbitMQ connection unblocked");
}
});
connection.addShutdownListener(cause -> {
metrics.counter("rabbitmq.shutdown").inc();
log.error("RabbitMQ connection shutdown: {}",
cause.getMessage());
});
}
public void reportMetrics() {
// Report metrics to monitoring system
ConsoleReporter reporter = ConsoleReporter
.forRegistry(metrics)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(1, TimeUnit.MINUTES);
}
}
RabbitMQ provides powerful messaging capabilities for Java applications. By following the patterns and practices outlined in this guide, you can effectively implement reliable messaging solutions.
Remember to focus on reliability, proper error handling, and monitoring for robust message-based systems.