Event-Driven Architecture (EDA) is a software design pattern where the flow of the application is determined by events such as user actions, sensor outputs, or system messages. This guide explores implementing EDA in Java applications, covering patterns, tools, and best practices for building scalable and loosely coupled systems.
Key benefits of Event-Driven Architecture:
// Domain Event
public class OrderCreatedEvent implements DomainEvent {
private final String orderId;
private final String customerId;
private final BigDecimal amount;
private final LocalDateTime createdAt;
// Constructor, getters
}
// Integration Event
public class OrderShippedEvent implements IntegrationEvent {
private final String orderId;
private final String trackingNumber;
private final LocalDateTime shippedAt;
// Constructor, getters
}
// System Event
public class SystemHealthEvent {
private final String component;
private final HealthStatus status;
private final Map<String, Object> metrics;
// Constructor, getters
}
// Event publisher interface
public interface EventPublisher {
void publish(DomainEvent event);
void publishAsync(DomainEvent event);
}
// Event subscriber interface
public interface EventSubscriber<T extends DomainEvent> {
void handle(T event);
Class<T> getEventType();
}
// Implementation with Spring Events
@Service
public class SpringEventPublisher implements EventPublisher {
private final ApplicationEventPublisher publisher;
public SpringEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Override
public void publish(DomainEvent event) {
publisher.publishEvent(event);
}
@Override
public void publishAsync(DomainEvent event) {
CompletableFuture.runAsync(() -> publisher.publishEvent(event));
}
}
// Event subscriber implementation
@Component
public class OrderCreatedEventHandler
implements EventSubscriber<OrderCreatedEvent> {
private final NotificationService notificationService;
@Override
public void handle(OrderCreatedEvent event) {
notificationService.notifyCustomer(
event.getCustomerId(),
"Order " + event.getOrderId() + " has been created"
);
}
@Override
public Class<OrderCreatedEvent> getEventType() {
return OrderCreatedEvent.class;
}
}
// Kafka configuration
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Event> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
// Kafka producer service
@Service
public class KafkaEventPublisher implements EventPublisher {
private final KafkaTemplate<String, Event> kafkaTemplate;
@Override
public void publish(DomainEvent event) {
kafkaTemplate.send("events-topic", event.getEventId(), event);
}
}
// Kafka consumer
@Service
public class KafkaEventConsumer {
private final EventProcessor eventProcessor;
@KafkaListener(topics = "events-topic", groupId = "events-group")
public void consume(ConsumerRecord<String, Event> record) {
Event event = record.value();
eventProcessor.process(event);
}
}
// RabbitMQ configuration
@Configuration
public class RabbitConfig {
@Bean
public Queue ordersQueue() {
return new Queue("orders-queue", true);
}
@Bean
public TopicExchange ordersExchange() {
return new TopicExchange("orders-exchange");
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("order.*");
}
}
// RabbitMQ producer
@Service
public class RabbitEventPublisher implements EventPublisher {
private final RabbitTemplate rabbitTemplate;
@Override
public void publish(DomainEvent event) {
rabbitTemplate.convertAndSend(
"orders-exchange",
"order." + event.getClass().getSimpleName(),
event
);
}
}
// RabbitMQ consumer
@Service
public class RabbitEventConsumer {
private final EventProcessor eventProcessor;
@RabbitListener(queues = "orders-queue")
public void consume(Event event) {
eventProcessor.process(event);
}
}
// Application properties
spring:
cloud:
stream:
bindings:
orderEvents-out-0:
destination: orders
content-type: application/json
orderEvents-in-0:
destination: orders
group: order-processing
kafka:
binder:
brokers: localhost:9092
// Stream configuration
@Configuration
public class StreamConfig {
@Bean
public Function<OrderCreatedEvent, OrderProcessedEvent> processOrder() {
return event -> {
// Process order
return new OrderProcessedEvent(event.getOrderId());
};
}
}
// Message channels
public interface OrderChannels {
String OUTPUT = "orderEvents-out-0";
String INPUT = "orderEvents-in-0";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
@Service
public class OrderEventProcessor {
private final StreamsBuilder streamsBuilder;
@Autowired
public void buildPipeline() {
KStream<String, Order> orderStream =
streamsBuilder.stream("orders");
// Process high-value orders
KStream<String, Order> highValueOrders = orderStream
.filter((key, order) -> order.getAmount().compareTo(
new BigDecimal("1000")) > 0);
// Apply business rules
highValueOrders
.mapValues(order -> enrichOrder(order))
.to("high-value-orders");
}
private Order enrichOrder(Order order) {
// Add additional information
return order;
}
}
// Event store interface
public interface EventStore {
void saveEvents(String aggregateId,
List<DomainEvent> events,
int expectedVersion);
List<DomainEvent> getEvents(String aggregateId);
}
// Event store implementation
@Service
public class EventStoreImpl implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
public void saveEvents(String aggregateId,
List<DomainEvent> events,
int expectedVersion) {
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
event.setVersion(version);
jdbcTemplate.update(
"INSERT INTO events (aggregate_id, type, version, data) " +
"VALUES (?, ?, ?, ?::jsonb)",
aggregateId,
event.getClass().getSimpleName(),
version,
toJsonString(event)
);
}
}
@Override
public List<DomainEvent> getEvents(String aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM events WHERE aggregate_id = ? ORDER BY version",
(rs, rowNum) -> {
String type = rs.getString("type");
String data = rs.getString("data");
return deserializeEvent(type, data);
},
aggregateId
);
}
}
// Base aggregate root
public abstract class AggregateRoot {
protected String id;
protected int version = -1;
private final List<DomainEvent> changes = new ArrayList<>();
protected void applyChange(DomainEvent event, boolean isNew) {
this.apply(event);
if (isNew) {
changes.add(event);
}
}
protected abstract void apply(DomainEvent event);
public List<DomainEvent> getUncommittedChanges() {
return Collections.unmodifiableList(changes);
}
public void markChangesAsCommitted() {
changes.clear();
}
}
// Order aggregate
public class Order extends AggregateRoot {
private OrderStatus status;
private List<OrderLine> orderLines;
public void createOrder(String orderId, List<OrderLine> lines) {
applyChange(new OrderCreatedEvent(orderId, lines), true);
}
public void addOrderLine(OrderLine line) {
if (status != OrderStatus.DRAFT) {
throw new IllegalStateException("Can only add lines to draft orders");
}
applyChange(new OrderLineAddedEvent(id, line), true);
}
@Override
protected void apply(DomainEvent event) {
if (event instanceof OrderCreatedEvent) {
apply((OrderCreatedEvent) event);
} else if (event instanceof OrderLineAddedEvent) {
apply((OrderLineAddedEvent) event);
}
}
private void apply(OrderCreatedEvent event) {
this.id = event.getOrderId();
this.orderLines = new ArrayList<>(event.getOrderLines());
this.status = OrderStatus.DRAFT;
}
private void apply(OrderLineAddedEvent event) {
this.orderLines.add(event.getOrderLine());
}
}
@Test
public void whenOrderCreated_thenEventsAreEmitted() {
// Arrange
Order order = new Order();
List<OrderLine> lines = Arrays.asList(
new OrderLine("product1", 2, new BigDecimal("10.00")),
new OrderLine("product2", 1, new BigDecimal("15.00"))
);
// Act
order.createOrder("order-123", lines);
// Assert
List<DomainEvent> uncommittedChanges = order.getUncommittedChanges();
assertEquals(1, uncommittedChanges.size());
assertTrue(uncommittedChanges.get(0) instanceof OrderCreatedEvent);
OrderCreatedEvent event = (OrderCreatedEvent) uncommittedChanges.get(0);
assertEquals("order-123", event.getOrderId());
assertEquals(2, event.getOrderLines().size());
}
@SpringBootTest
@TestContainers
public class OrderEventProcessingIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer();
@Autowired
private EventPublisher eventPublisher;
@Autowired
private OrderRepository orderRepository;
@Test
public void whenOrderEventPublished_thenOrderIsProcessed() {
// Arrange
OrderCreatedEvent event = new OrderCreatedEvent("order-123",
Arrays.asList(new OrderLine("product1", 1, new BigDecimal("10.00"))));
// Act
eventPublisher.publish(event);
// Assert
await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> {
Optional<Order> order = orderRepository.findById("order-123");
return order.isPresent() &&
order.get().getStatus() == OrderStatus.CREATED;
});
}
}
@Configuration
public class EventMonitoringConfig {
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}
}
@Aspect
@Component
public class EventMonitoringAspect {
private final MeterRegistry meterRegistry;
@Around("@annotation(EventHandler)")
public Object monitorEventProcessing(ProceedingJoinPoint joinPoint)
throws Throwable {
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = joinPoint.proceed();
sample.stop(meterRegistry.timer("event.processing",
"type", joinPoint.getSignature().getName(),
"status", "success"
));
return result;
} catch (Exception e) {
sample.stop(meterRegistry.timer("event.processing",
"type", joinPoint.getSignature().getName(),
"status", "error"
));
throw e;
}
}
}
@Component
public class EventDebugger {
private final Logger log = LoggerFactory.getLogger(EventDebugger.class);
@EventListener
public void onEvent(DomainEvent event) {
log.debug("Event received: {}", event);
if (log.isTraceEnabled()) {
log.trace("Event details: {}",
JsonUtils.toJsonString(event));
}
}
public void enableDebugMode() {
// Set log level to DEBUG
LoggerContext loggerContext =
(LoggerContext) LoggerFactory.getILoggerFactory();
Logger rootLogger = loggerContext.getLogger("com.example.events");
rootLogger.setLevel(Level.DEBUG);
}
}
Event-Driven Architecture provides a powerful approach for building scalable, loosely coupled systems in Java. While it introduces some complexity, the benefits of improved scalability, maintainability, and flexibility make it an excellent choice for modern distributed applications.
Remember to carefully consider your use case and requirements when implementing EDA. Start with simple patterns and evolve your architecture as needed, always keeping in mind the trade-offs between complexity and benefits.