Reactive Programming with Spring WebFlux

1️⃣ Introduction

Spring WebFlux is a reactive web framework that provides non-blocking, event-driven architecture for building high-performance web applications. Based on the Reactive Streams specification and powered by Project Reactor, WebFlux allows developers to create services that can handle a large number of concurrent connections with minimal resources.

Key benefits of reactive programming with Spring WebFlux include:

  • Non-blocking I/O operations
  • Better resource utilization and scalability
  • Resilience through backpressure mechanisms
  • Functional programming paradigm
  • Native support for reactive streams
  • Seamless integration with reactive data repositories

2️⃣ Reactive Programming Fundamentals

🔹 Core Concepts

  • Publisher/Subscriber Model: Publishers produce data that Subscribers consume
  • Backpressure: Flow control mechanism allowing Subscribers to signal how much data they can process
  • Non-blocking: Operations that don't block the calling thread during execution
  • Reactive Types: Mono (0 or 1 item) and Flux (0 to N items)

🔹 Project Reactor

// Add Reactor dependency
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.4</version>
</dependency>

// Creating and using Flux
Flux<String> names = Flux.just("Alice", "Bob", "Charlie", "Dave")
    .filter(name -> name.length() > 3)
    .map(String::toUpperCase);

names.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed"));

// Creating and using Mono
Mono<User> user = userRepository.findById("123")
    .doOnNext(u -> log.info("User found: {}", u.getName()))
    .map(this::enrichUser);

3️⃣ Setting Up Spring WebFlux

🔹 Dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

🔹 Basic Application Setup

@SpringBootApplication
public class ReactiveApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }
}

4️⃣ Building Reactive APIs

🔹 Annotation-Based Controllers

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private final UserService userService;
    
    public UserController(UserService userService) {
        this.userService = userService;
    }
    
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAllUsers();
    }
    
    @GetMapping("/{id}")
    public Mono<User> getUserById(@PathVariable String id) {
        return userService.findById(id)
            .switchIfEmpty(Mono.error(new ResponseStatusException(
                HttpStatus.NOT_FOUND, "User not found")));
    }
    
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@RequestBody User user) {
        return userService.saveUser(user);
    }
    
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteUser(@PathVariable String id) {
        return userService.deleteUser(id);
    }
}

🔹 Functional Endpoints

@Configuration
public class UserRouter {
    
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
        return RouterFunctions
            .route(GET("/api/users").and(accept(APPLICATION_JSON)), userHandler::getAllUsers)
            .andRoute(GET("/api/users/{id}").and(accept(APPLICATION_JSON)), userHandler::getUserById)
            .andRoute(POST("/api/users").and(contentType(APPLICATION_JSON)), userHandler::createUser)
            .andRoute(DELETE("/api/users/{id}"), userHandler::deleteUser);
    }
}

@Component
public class UserHandler {
    
    private final UserService userService;
    
    public UserHandler(UserService userService) {
        this.userService = userService;
    }
    
    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(APPLICATION_JSON)
            .body(userService.findAllUsers(), User.class);
    }
    
    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(User.class)
            .flatMap(userService::saveUser)
            .flatMap(savedUser -> ServerResponse.created(
                    UriComponentsBuilder
                        .fromPath("/api/users/{id}")
                        .buildAndExpand(savedUser.getId())
                        .toUri())
                .contentType(APPLICATION_JSON)
                .bodyValue(savedUser));
    }
    
    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.deleteUser(id)
            .then(ServerResponse.noContent().build());
    }
}

5️⃣ Reactive WebClient

WebClient is a non-blocking, reactive HTTP client for consuming external APIs.

// Creating WebClient instance
WebClient webClient = WebClient.builder()
    .baseUrl("https://api.example.com")
    .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
    .build();

// GET request
Mono<User> user = webClient.get()
    .uri("/users/{id}", userId)
    .retrieve()
    .bodyToMono(User.class);

// POST request
Mono<User> createdUser = webClient.post()
    .uri("/users")
    .contentType(MediaType.APPLICATION_JSON)
    .bodyValue(newUser)
    .retrieve()
    .bodyToMono(User.class);

// Error handling
Mono<User> userWithErrorHandling = webClient.get()
    .uri("/users/{id}", userId)
    .retrieve()
    .onStatus(
        HttpStatus::is4xxClientError,
        response -> Mono.error(new UserNotFoundException("User not found"))
    )
    .onStatus(
        HttpStatus::is5xxServerError,
        response -> Mono.error(new ServerException("Server error"))
    )
    .bodyToMono(User.class);

6️⃣ Reactive Data Access

🔹 Spring Data Reactive Repositories

// Add Spring Data Reactive MongoDB dependency
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

// Define a reactive repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {
    Flux<User> findByLastName(String lastName);
    Mono<User> findByEmail(String email);
}

// Using the repository
@Service
public class UserServiceImpl implements UserService {
    
    private final UserRepository userRepository;
    
    public UserServiceImpl(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    @Override
    public Flux<User> findAllUsers() {
        return userRepository.findAll();
    }
    
    @Override
    public Mono<User> findById(String id) {
        return userRepository.findById(id);
    }
    
    @Override
    public Mono<User> saveUser(User user) {
        return userRepository.save(user);
    }
    
    @Override
    public Mono<Void> deleteUser(String id) {
        return userRepository.deleteById(id);
    }
}

🔹 Reactive R2DBC for SQL Databases

// Add R2DBC dependencies
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
</dependency>

// Define a reactive R2DBC repository
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
    Flux<Product> findByCategory(String category);
    Mono<Product> findByName(String name);
}

7️⃣ Error Handling and Resilience

🔹 Exception Handling

@RestControllerAdvice
public class GlobalErrorHandler {
    
    @ExceptionHandler(UserNotFoundException.class)
    public Mono<ResponseEntity<Map<String, String>>> handleUserNotFound(UserNotFoundException ex) {
        Map<String, String> error = Map.of(
            "error", "User not found",
            "message", ex.getMessage()
        );
        return Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND).body(error));
    }
    
    @ExceptionHandler(ValidationException.class)
    public Mono<ResponseEntity<Map<String, String>>> handleValidation(ValidationException ex) {
        Map<String, String> error = Map.of(
            "error", "Validation error",
            "message", ex.getMessage()
        );
        return Mono.just(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error));
    }
}

🔹 Circuit Breaker with Resilience4j

// Add Resilience4j dependency
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
</dependency>

// Configure circuit breaker
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
    .failureRateThreshold(50)
    .waitDurationInOpenState(Duration.ofMillis(1000))
    .slidingWindowSize(10)
    .build();
CircuitBreaker circuitBreaker = CircuitBreaker.of("externalServiceA", config);

// Apply circuit breaker to reactive method
public Mono<User> findUser(String id) {
    return Mono.fromCallable(() -> externalService.findUser(id))
        .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
        .onErrorResume(e -> Mono.just(getFallbackUser(id)));
}

// Using Resilience4j with WebClient
public Mono<ExternalData> getExternalData(String id) {
    return webClient.get()
        .uri("/data/{id}", id)
        .retrieve()
        .bodyToMono(ExternalData.class)
        .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
        .onErrorResume(e -> Mono.just(new ExternalData(id, "fallback")));
}

8️⃣ Testing Reactive Applications

🔹 Unit Testing Reactive Streams

// Add testing dependencies
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

// Unit test for service
@ExtendWith(MockitoExtension.class)
class UserServiceTest {
    
    @Mock
    private UserRepository userRepository;
    
    @InjectMocks
    private UserServiceImpl userService;
    
    @Test
    void findByIdShouldReturnUser() {
        // Given
        User expected = new User("1", "John", "Doe", "john@example.com");
        when(userRepository.findById("1")).thenReturn(Mono.just(expected));
        
        // When
        Mono<User> result = userService.findById("1");
        
        // Then
        StepVerifier.create(result)
            .expectNext(expected)
            .verifyComplete();
    }
    
    @Test
    void findByIdShouldReturnEmptyWhenUserNotFound() {
        // Given
        when(userRepository.findById("999")).thenReturn(Mono.empty());
        
        // When
        Mono<User> result = userService.findById("999");
        
        // Then
        StepVerifier.create(result)
            .verifyComplete();
    }
}

🔹 Integration Testing WebFlux Endpoints

@WebFluxTest(UserController.class)
class UserControllerTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @MockBean
    private UserService userService;
    
    @Test
    void getUserByIdShouldReturnUser() {
        // Given
        User user = new User("1", "John", "Doe", "john@example.com");
        when(userService.findById("1")).thenReturn(Mono.just(user));
        
        // When/Then
        webTestClient.get().uri("/api/users/1")
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBody(User.class)
            .isEqualTo(user);
    }
    
    @Test
    void getUserByIdShouldReturn404WhenUserNotFound() {
        // Given
        when(userService.findById("999")).thenReturn(Mono.empty());
        
        // When/Then
        webTestClient.get().uri("/api/users/999")
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .expectStatus().isNotFound();
    }
}

9️⃣ Q&A / Frequently Asked Questions

Use Spring WebFlux when: (1) You have high-concurrency requirements with many connections (10K+) but limited threads. (2) You're developing microservices that make calls to other services and would benefit from non-blocking I/O. (3) You're working with streaming data or server-sent events. (4) You have a reactive stack from database to API. Stick with Spring MVC when: (1) Your application has primarily CRUD operations with low concurrency. (2) You rely on blocking libraries without reactive alternatives. (3) Your team is more familiar with imperative programming. WebFlux offers better resource utilization for I/O-intensive operations, while MVC may be simpler for CPU-bound workloads.

Backpressure is a flow control mechanism that allows subscribers to signal how much data they can process, preventing fast publishers from overwhelming slow consumers. In Reactor, backpressure is implemented through subscription requests. When a subscriber subscribes to a publisher, it can request a specific number of items (demand) using request(n). The publisher will then produce at most that number of items before waiting for more requests. Strategies for handling backpressure include: (1) Buffer - store excess items until they can be processed, (2) Drop - discard items that can't be processed, (3) Latest - keep only the most recent items, (4) Error - signal an error when backpressure occurs. This ensures resource efficiency and system stability.

Mono and Flux are the two core reactive types in Project Reactor. Mono represents a stream of 0 or 1 item, making it suitable for operations that return at most one result, like finding a record by ID or creating a resource. Mono completes after emitting its single item or immediately if empty. Flux represents a stream of 0 to N items, ideal for operations that return multiple results, like retrieving collections or streaming data. Flux can emit any number of items before completing. Choose Mono when dealing with single-value results (similar to Optional or Future) and Flux when handling multiple values (similar to Stream or Collection). Both types provide the same rich set of operators for transformation, filtering, and error handling.

🔟 Best Practices & Pro Tips 🚀

  • Avoid blocking operations in reactive code
  • Use appropriate schedulers for CPU-intensive tasks (Schedulers.parallel())
  • Handle errors with onErrorResume instead of try/catch
  • Prefer functional endpoints for complex routing logic
  • Use WebClient instead of RestTemplate for reactive applications
  • Implement proper backpressure handling
  • Keep reactive chains declarative and readable
  • Test reactive streams with StepVerifier
  • Monitor and tune thread pool configurations
  • Consider using Spring Boot Actuator for reactive metrics
  • Implement circuit breakers for external service calls
  • Use connection pooling for database connections

Read Next 📖

Conclusion

Spring WebFlux provides a powerful reactive programming model for building high-performance, non-blocking Java applications. By leveraging the Reactive Streams specification and Project Reactor, WebFlux enables developers to create scalable services that efficiently handle a large number of concurrent connections with limited resources.

While reactive programming introduces new concepts and challenges, the benefits in terms of resource utilization, resilience, and scalability make it a compelling choice for modern applications, especially those operating in cloud environments or dealing with high concurrency. By following the practices outlined in this guide, you can build robust reactive systems that deliver superior performance under load.