Event Sourcing in Java: A Practical Guide (2025)


Event Sourcing in Java

Event Sourcing is a powerful pattern for building event-driven applications. This comprehensive guide explores practical implementation patterns and best practices in Java.

Pro Tip: Event Sourcing provides a complete audit trail and enables rebuilding application state from events.

Event Store Implementation

Note: Event stores provide persistence and retrieval of domain events.

Event Store Interface


public interface EventStore {
    void saveEvents(String aggregateId, List events, long expectedVersion);
    List getEvents(String aggregateId);
    List getEvents(String aggregateId, long fromVersion);
    List getAllEvents();
    void saveSnapshot(String aggregateId, AggregateSnapshot snapshot);
    AggregateSnapshot getLatestSnapshot(String aggregateId);
}

@Entity
@Table(name = "events")
public class EventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String aggregateId;
    private String eventType;
    private long version;
    private LocalDateTime timestamp;
    
    @Column(columnDefinition = "TEXT")
    private String eventData;
    
    // Getters and setters
}

@Entity
@Table(name = "snapshots")
public class SnapshotEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String aggregateId;
    private long version;
    private LocalDateTime timestamp;
    
    @Column(columnDefinition = "TEXT")
    private String state;
    
    // Getters and setters
}

JPA Event Store Implementation


@Repository
public class JpaEventStore implements EventStore {
    
    @Autowired
    private EventRepository eventRepository;
    
    @Autowired
    private SnapshotRepository snapshotRepository;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Override
    @Transactional
    public void saveEvents(String aggregateId, List events, long expectedVersion) {
        List existingEvents = eventRepository.findByAggregateId(aggregateId);
        if (existingEvents.size() != expectedVersion) {
            throw new ConcurrencyException("Concurrent modification detected");
        }
        
        List eventEntities = events.stream()
            .map(event -> {
                EventEntity entity = new EventEntity();
                entity.setAggregateId(aggregateId);
                entity.setEventType(event.getClass().getName());
                entity.setVersion(expectedVersion + 1);
                entity.setTimestamp(LocalDateTime.now());
                entity.setEventData(objectMapper.writeValueAsString(event));
                return entity;
            })
            .collect(Collectors.toList());
            
        eventRepository.saveAll(eventEntities);
    }
    
    @Override
    public List getEvents(String aggregateId) {
        List events = eventRepository.findByAggregateIdOrderByVersionAsc(aggregateId);
        return events.stream()
            .map(this::deserializeEvent)
            .collect(Collectors.toList());
    }
    
    private DomainEvent deserializeEvent(EventEntity entity) {
        try {
            Class eventType = Class.forName(entity.getEventType());
            return (DomainEvent) objectMapper.readValue(entity.getEventData(), eventType);
        } catch (Exception e) {
            throw new EventDeserializationException("Failed to deserialize event", e);
        }
    }
}

Event Handling

Pro Tip: Event handlers process domain events and update read models or trigger side effects.

Event Handler Implementation


@Component
public class OrderEventHandler {
    
    @Autowired
    private OrderReadModelRepository readModelRepository;
    
    @Autowired
    private NotificationService notificationService;
    
    @EventHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        OrderReadModel readModel = new OrderReadModel();
        readModel.setId(event.getOrderId());
        readModel.setCustomerId(event.getCustomerId());
        readModel.setAmount(event.getAmount());
        readModel.setStatus("CREATED");
        readModel.setCreatedAt(event.getTimestamp());
        
        readModelRepository.save(readModel);
        notificationService.sendOrderConfirmation(event.getOrderId());
    }
    
    @EventHandler
    public void handleOrderUpdated(OrderUpdatedEvent event) {
        OrderReadModel readModel = readModelRepository.findById(event.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
            
        readModel.setStatus(event.getStatus());
        readModel.setUpdatedAt(event.getTimestamp());
        
        readModelRepository.save(readModel);
        
        if ("SHIPPED".equals(event.getStatus())) {
            notificationService.sendShippingNotification(event.getOrderId());
        }
    }
}

@Component
public class EventDispatcher {
    
    @Autowired
    private List eventHandlers;
    
    @Transactional
    public void dispatch(DomainEvent event) {
        eventHandlers.stream()
            .filter(handler -> canHandle(handler, event))
            .forEach(handler -> {
                try {
                    handler.handle(event);
                } catch (Exception e) {
                    log.error("Error handling event: {}", event, e);
                    // Implement retry logic or dead letter queue
                }
            });
    }
    
    private boolean canHandle(EventHandler handler, DomainEvent event) {
        return handler.getClass().getMethods().stream()
            .filter(method -> method.isAnnotationPresent(EventHandler.class))
            .anyMatch(method -> method.getParameterTypes()[0].isInstance(event));
    }
}

Aggregates and Event Sourcing

Note: Aggregates maintain consistency boundaries and apply events to their state.

Event-Sourced Aggregate


public abstract class EventSourcedAggregate {
    private String id;
    private long version;
    private List uncommittedEvents = new ArrayList<>();
    
    protected EventSourcedAggregate(String id) {
        this.id = id;
        this.version = 0;
    }
    
    public void loadFromHistory(List history) {
        history.forEach(this::apply);
        this.version = history.size();
    }
    
    protected void apply(DomainEvent event) {
        try {
            Method method = this.getClass().getDeclaredMethod("apply", event.getClass());
            method.setAccessible(true);
            method.invoke(this, event);
        } catch (Exception e) {
            throw new EventApplicationException("Failed to apply event", e);
        }
    }
    
    protected void raiseEvent(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }
    
    public List getUncommittedEvents() {
        return new ArrayList<>(uncommittedEvents);
    }
    
    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

public class Order extends EventSourcedAggregate {
    private String customerId;
    private BigDecimal amount;
    private String status;
    
    public Order(String id, String customerId, BigDecimal amount) {
        super(id);
        raiseEvent(new OrderCreatedEvent(id, customerId, amount));
    }
    
    public void updateStatus(String newStatus) {
        raiseEvent(new OrderUpdatedEvent(id, newStatus));
    }
    
    private void apply(OrderCreatedEvent event) {
        this.customerId = event.getCustomerId();
        this.amount = event.getAmount();
        this.status = "CREATED";
    }
    
    private void apply(OrderUpdatedEvent event) {
        this.status = event.getStatus();
    }
}

Event Snapshots

Pro Tip: Snapshots improve performance by reducing the number of events that need to be replayed.

Snapshot Implementation


public class SnapshotManager {
    
    @Autowired
    private EventStore eventStore;
    
    private static final int SNAPSHOT_THRESHOLD = 100;
    
    public void saveSnapshotIfNeeded(String aggregateId, EventSourcedAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            AggregateSnapshot snapshot = new AggregateSnapshot(
                aggregateId,
                aggregate.getVersion(),
                aggregate.getState()
            );
            eventStore.saveSnapshot(aggregateId, snapshot);
        }
    }
    
    public EventSourcedAggregate reconstructAggregate(String aggregateId, Class aggregateType) {
        AggregateSnapshot snapshot = eventStore.getLatestSnapshot(aggregateId);
        EventSourcedAggregate aggregate;
        
        try {
            aggregate = aggregateType.getDeclaredConstructor(String.class)
                .newInstance(aggregateId);
        } catch (Exception e) {
            throw new AggregateCreationException("Failed to create aggregate", e);
        }
        
        if (snapshot != null) {
            aggregate.loadFromSnapshot(snapshot);
            List events = eventStore.getEvents(aggregateId, snapshot.getVersion());
            events.forEach(aggregate::apply);
        } else {
            List events = eventStore.getEvents(aggregateId);
            aggregate.loadFromHistory(events);
        }
        
        return aggregate;
    }
}

Best Practices

Pro Tip: Following event sourcing best practices ensures maintainable and scalable systems.

Event Sourcing Best Practices

  • Use immutable events with clear naming
  • Implement proper event versioning
  • Use snapshots for performance optimization
  • Implement proper error handling
  • Use event handlers for side effects
  • Implement proper event validation
  • Use event sourcing for audit trails
  • Implement proper event serialization
  • Use event sourcing for complex domains
  • Implement proper event migration strategies
  • Use event sourcing for temporal queries
  • Implement proper event replay mechanisms
  • Use event sourcing for scalability
  • Implement proper event monitoring
  • Follow event sourcing patterns and standards

Conclusion

Event Sourcing provides a powerful pattern for building event-driven applications. By understanding and implementing these patterns and best practices, you can create robust and scalable systems that maintain a complete audit trail of all changes.