Java Project Reactor Guide: Complete Tutorial

1️⃣ Introduction

Project Reactor is a fourth-generation reactive library for building non-blocking applications on the JVM. This guide covers everything you need to know about reactive programming with Project Reactor.

Key areas covered:

  • Reactive Streams Basics
  • Flux and Mono
  • Operators
  • Error Handling
  • Testing
  • Best Practices

2️⃣ Reactive Streams Basics

🔹 Flux Creation

public class ReactorBasics {
    public Flux createFlux() {
        return Flux.just("Hello", "World")
            .map(String::toUpperCase)
            .filter(s -> s.startsWith("H"));
    }
    
    public Flux createNumbersFlux() {
        return Flux.range(1, 5)
            .map(i -> i * 2)
            .filter(i -> i > 5);
    }
    
    public Mono createMono() {
        return Mono.just("Hello")
            .map(String::toUpperCase);
    }
    
    public Flux createFromList() {
        List list = Arrays.asList("A", "B", "C");
        return Flux.fromIterable(list)
            .map(String::toLowerCase);
    }
}

🔹 Operators

public class ReactorOperators {
    public Flux transformData() {
        return Flux.just("1", "2", "3")
            .map(Integer::parseInt)
            .map(i -> i * 2)
            .map(String::valueOf)
            .doOnNext(s -> log.info("Value: {}", s))
            .doOnError(e -> log.error("Error: {}", e.getMessage()));
    }
    
    public Flux combineFluxes() {
        Flux flux1 = Flux.range(1, 3);
        Flux flux2 = Flux.range(4, 3);
        
        return Flux.zip(flux1, flux2)
            .map(tuple -> tuple.getT1() + tuple.getT2());
    }
    
    public Mono handleErrors() {
        return Mono.just("data")
            .map(this::processData)
            .onErrorResume(e -> Mono.just("fallback"))
            .doFinally(signal -> cleanup());
    }
    
    private String processData(String data) {
        // Processing logic
        return data.toUpperCase();
    }
    
    private void cleanup() {
        // Cleanup logic
    }
}

3️⃣ Error Handling

🔹 Error Handling Strategies

public class ReactorErrorHandling {
    public Flux handleErrors() {
        return Flux.just("A", "B", null, "C")
            .map(String::toLowerCase)
            .onErrorReturn("default")
            .doOnError(e -> log.error("Error occurred", e));
    }
    
    public Flux retryOnError() {
        return Flux.just("1", "2", "invalid", "3")
            .map(Integer::parseInt)
            .map(String::valueOf)
            .retry(3)
            .onErrorResume(e -> Flux.just("fallback"));
    }
    
    public Mono handleTimeout() {
        return callExternalService()
            .timeout(Duration.ofSeconds(5))
            .onErrorResume(TimeoutException.class, 
                e -> Mono.just("timeout fallback"));
    }
    
    private Mono callExternalService() {
        // Simulated external service call
        return Mono.just("response")
            .delayElement(Duration.ofSeconds(2));
    }
}

4️⃣ Testing

🔹 StepVerifier Usage

public class ReactorTesting {
    @Test
    public void testFlux() {
        Flux flux = Flux.just("A", "B", "C");
        
        StepVerifier.create(flux)
            .expectNext("A")
            .expectNext("B")
            .expectNext("C")
            .verifyComplete();
    }
    
    @Test
    public void testErrorHandling() {
        Flux flux = Flux.just("1", "2", "invalid")
            .map(Integer::parseInt)
            .map(String::valueOf);
            
        StepVerifier.create(flux)
            .expectNext("1")
            .expectNext("2")
            .expectError(NumberFormatException.class)
            .verify();
    }
    
    @Test
    public void testWithVirtualTime() {
        StepVerifier.withVirtualTime(() -> 
            Flux.interval(Duration.ofSeconds(1))
                .take(2))
            .expectSubscription()
            .expectNoEvent(Duration.ofSeconds(1))
            .expectNext(0L)
            .thenAwait(Duration.ofSeconds(1))
            .expectNext(1L)
            .verifyComplete();
    }
}

5️⃣ Advanced Patterns

🔹 Backpressure Handling

public class ReactorAdvanced {
    public Flux handleBackpressure() {
        return Flux.range(1, 100)
            .map(this::slowOperation)
            .onBackpressureBuffer(10)
            .map(String::valueOf);
    }
    
    public Flux parallelProcessing() {
        return Flux.range(1, 1000)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(this::processItem)
            .sequential();
    }
    
    private Integer slowOperation(Integer value) {
        try {
            Thread.sleep(100); // Simulate slow operation
            return value * 2;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    private String processItem(Integer item) {
        return "Processed: " + item;
    }
}

6️⃣ Q&A / Frequently Asked Questions

Key concepts: (1) Publisher/Subscriber model. (2) Backpressure handling. (3) Cold vs Hot publishers. (4) Operators. (5) Error handling. (6) Threading and Schedulers. (7) Testing with StepVerifier. (8) Reactive streams specification.

Use cases: (1) Mono for single values. (2) Flux for multiple values. (3) Mono for optional values. (4) Flux for streams. (5) Mono for completion signals. (6) Flux for continuous data. (7) Consider data cardinality.

Error handling strategies: (1) onErrorReturn. (2) onErrorResume. (3) onErrorContinue. (4) retry. (5) timeout. (6) doOnError. (7) Error recovery patterns. (8) Circuit breaker pattern.

7️⃣ Best Practices & Pro Tips 🚀

  • Use appropriate operators
  • Handle errors properly
  • Implement backpressure
  • Test reactive streams
  • Use appropriate schedulers
  • Monitor performance
  • Handle cancellation
  • Use debugging tools
  • Implement proper logging
  • Consider cold vs hot
  • Resource cleanup
  • Documentation

Read Next 📖

Conclusion

Project Reactor provides powerful reactive programming capabilities for Java applications. By following the patterns and practices outlined in this guide, you can effectively implement reactive systems.

Remember to focus on proper error handling, backpressure management, and testing for reliable reactive applications.