Project Reactor is a fourth-generation reactive library for building non-blocking applications on the JVM. This guide covers everything you need to know about reactive programming with Project Reactor.
Key areas covered:
public class ReactorBasics {
public Flux createFlux() {
return Flux.just("Hello", "World")
.map(String::toUpperCase)
.filter(s -> s.startsWith("H"));
}
public Flux createNumbersFlux() {
return Flux.range(1, 5)
.map(i -> i * 2)
.filter(i -> i > 5);
}
public Mono createMono() {
return Mono.just("Hello")
.map(String::toUpperCase);
}
public Flux createFromList() {
List list = Arrays.asList("A", "B", "C");
return Flux.fromIterable(list)
.map(String::toLowerCase);
}
}
public class ReactorOperators {
public Flux transformData() {
return Flux.just("1", "2", "3")
.map(Integer::parseInt)
.map(i -> i * 2)
.map(String::valueOf)
.doOnNext(s -> log.info("Value: {}", s))
.doOnError(e -> log.error("Error: {}", e.getMessage()));
}
public Flux combineFluxes() {
Flux flux1 = Flux.range(1, 3);
Flux flux2 = Flux.range(4, 3);
return Flux.zip(flux1, flux2)
.map(tuple -> tuple.getT1() + tuple.getT2());
}
public Mono handleErrors() {
return Mono.just("data")
.map(this::processData)
.onErrorResume(e -> Mono.just("fallback"))
.doFinally(signal -> cleanup());
}
private String processData(String data) {
// Processing logic
return data.toUpperCase();
}
private void cleanup() {
// Cleanup logic
}
}
public class ReactorErrorHandling {
public Flux handleErrors() {
return Flux.just("A", "B", null, "C")
.map(String::toLowerCase)
.onErrorReturn("default")
.doOnError(e -> log.error("Error occurred", e));
}
public Flux retryOnError() {
return Flux.just("1", "2", "invalid", "3")
.map(Integer::parseInt)
.map(String::valueOf)
.retry(3)
.onErrorResume(e -> Flux.just("fallback"));
}
public Mono handleTimeout() {
return callExternalService()
.timeout(Duration.ofSeconds(5))
.onErrorResume(TimeoutException.class,
e -> Mono.just("timeout fallback"));
}
private Mono callExternalService() {
// Simulated external service call
return Mono.just("response")
.delayElement(Duration.ofSeconds(2));
}
}
public class ReactorTesting {
@Test
public void testFlux() {
Flux flux = Flux.just("A", "B", "C");
StepVerifier.create(flux)
.expectNext("A")
.expectNext("B")
.expectNext("C")
.verifyComplete();
}
@Test
public void testErrorHandling() {
Flux flux = Flux.just("1", "2", "invalid")
.map(Integer::parseInt)
.map(String::valueOf);
StepVerifier.create(flux)
.expectNext("1")
.expectNext("2")
.expectError(NumberFormatException.class)
.verify();
}
@Test
public void testWithVirtualTime() {
StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofSeconds(1))
.take(2))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0L)
.thenAwait(Duration.ofSeconds(1))
.expectNext(1L)
.verifyComplete();
}
}
public class ReactorAdvanced {
public Flux handleBackpressure() {
return Flux.range(1, 100)
.map(this::slowOperation)
.onBackpressureBuffer(10)
.map(String::valueOf);
}
public Flux parallelProcessing() {
return Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel())
.map(this::processItem)
.sequential();
}
private Integer slowOperation(Integer value) {
try {
Thread.sleep(100); // Simulate slow operation
return value * 2;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private String processItem(Integer item) {
return "Processed: " + item;
}
}
Project Reactor provides powerful reactive programming capabilities for Java applications. By following the patterns and practices outlined in this guide, you can effectively implement reactive systems.
Remember to focus on proper error handling, backpressure management, and testing for reliable reactive applications.