Reactive Programming in Java: Beyond Streams (2025)


Reactive Programming in Java

Reactive Programming has revolutionized how we build responsive and scalable applications in Java. This comprehensive guide explores advanced reactive concepts, frameworks, and patterns beyond basic streams.

Pro Tip: Understanding reactive programming patterns helps developers build more responsive and scalable applications that can handle high concurrency.

Reactive Basics

Note: Reactive Programming is based on the Observer pattern and provides a way to handle asynchronous data streams.

Reactive Streams API


public interface Publisher {
    void subscribe(Subscriber subscriber);
}

public interface Subscriber {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor extends Subscriber, Publisher {
}

Project Reactor

Pro Tip: Project Reactor is the foundation of Spring's reactive stack and provides powerful operators for reactive programming.

Basic Reactor Types


@Service
public class UserService {
    private final UserRepository userRepository;
    
    @Autowired
    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    public Mono getUserById(String id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
    }
    
    public Flux getUsersByRole(String role) {
        return userRepository.findByRole(role)
            .filter(user -> user.isActive())
            .map(user -> {
                user.setLastAccessed(LocalDateTime.now());
                return user;
            });
    }
    
    public Mono createUser(UserRequest request) {
        return Mono.just(request)
            .map(this::validateUser)
            .flatMap(userRepository::save)
            .doOnSuccess(user -> log.info("User created: {}", user.getId()));
    }
}

RxJava

Note: RxJava provides a rich set of operators and schedulers for reactive programming in Java.

RxJava Implementation


@Service
public class OrderService {
    private final OrderRepository orderRepository;
    
    @Autowired
    public OrderService(OrderRepository orderRepository) {
        this.orderRepository = orderRepository;
    }
    
    public Single getOrderById(String id) {
        return Single.fromCallable(() -> orderRepository.findById(id))
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .doOnSuccess(order -> log.info("Order retrieved: {}", order.getId()));
    }
    
    public Observable getOrdersByStatus(OrderStatus status) {
        return Observable.fromCallable(() -> orderRepository.findByStatus(status))
            .flatMapIterable(orders -> orders)
            .filter(order -> order.getAmount() > 100)
            .map(order -> {
                order.setProcessed(true);
                return order;
            });
    }
    
    public Completable processOrder(String orderId) {
        return Completable.fromAction(() -> {
            Order order = orderRepository.findById(orderId);
            order.setStatus(OrderStatus.PROCESSING);
            orderRepository.save(order);
        })
        .subscribeOn(Schedulers.io())
        .doOnComplete(() -> log.info("Order processed: {}", orderId));
    }
}

Reactive Patterns

Pro Tip: Understanding reactive patterns helps in building more efficient and maintainable reactive applications.

Circuit Breaker Pattern


@Service
public class PaymentService {
    private final CircuitBreaker circuitBreaker;
    private final PaymentClient paymentClient;
    
    @Autowired
    public PaymentService(CircuitBreaker circuitBreaker, PaymentClient paymentClient) {
        this.circuitBreaker = circuitBreaker;
        this.paymentClient = paymentClient;
    }
    
    public Mono processPayment(PaymentRequest request) {
        return Mono.just(request)
            .flatMap(req -> circuitBreaker.run(
                () -> paymentClient.processPayment(req),
                throwable -> Mono.just(new PaymentResult(Status.FAILED))
            ))
            .timeout(Duration.ofSeconds(5))
            .onErrorResume(TimeoutException.class, e -> 
                Mono.just(new PaymentResult(Status.TIMEOUT))
            );
    }
}

Backpressure Handling

Note: Backpressure is crucial for handling fast producers and slow consumers in reactive streams.

Backpressure Strategies


@Service
public class EventProcessor {
    public Flux processEvents(Flux events) {
        return events
            .onBackpressureBuffer(
                1000, // buffer size
                BufferOverflowStrategy.DROP_LATEST
            )
            .flatMap(event -> processEvent(event), 5) // concurrency
            .onBackpressureDrop(event -> 
                log.warn("Dropped event: {}", event.getId())
            );
    }
    
    private Mono processEvent(Event event) {
        return Mono.just(event)
            .flatMap(e -> {
                // Process event
                return Mono.just(new ProcessedEvent(e));
            })
            .subscribeOn(Schedulers.boundedElastic());
    }
}

Testing Reactive Code

Pro Tip: Testing reactive code requires special tools and approaches to handle asynchronous operations.

Reactive Testing


@SpringBootTest
class UserServiceTest {
    @Autowired
    private UserService userService;
    
    @Test
    void testGetUserById() {
        StepVerifier.create(userService.getUserById("123"))
            .expectNextMatches(user -> 
                user.getId().equals("123") &&
                user.isActive()
            )
            .verifyComplete();
    }
    
    @Test
    void testGetUsersByRole() {
        StepVerifier.create(userService.getUsersByRole("ADMIN"))
            .expectNextCount(2)
            .expectNextMatches(user -> 
                user.getRole().equals("ADMIN")
            )
            .verifyComplete();
    }
    
    @Test
    void testCreateUser() {
        UserRequest request = new UserRequest("John", "john@example.com");
        
        StepVerifier.create(userService.createUser(request))
            .expectNextMatches(user -> 
                user.getName().equals("John") &&
                user.getEmail().equals("john@example.com")
            )
            .verifyComplete();
    }
}

Best Practices

Note: Following best practices ensures reliable and maintainable reactive applications.

Error Handling


@Service
public class ErrorHandlingService {
    public Mono processWithErrorHandling(Input input) {
        return Mono.just(input)
            .flatMap(this::validateInput)
            .flatMap(this::processInput)
            .onErrorResume(ValidationException.class, e -> 
                Mono.just(new Result(Status.INVALID))
            )
            .onErrorResume(TimeoutException.class, e -> 
                Mono.just(new Result(Status.TIMEOUT))
            )
            .onErrorResume(e -> 
                Mono.just(new Result(Status.ERROR))
            )
            .doOnError(e -> 
                log.error("Error processing input: {}", e.getMessage())
            );
    }
}

Best Practices Summary

  • Use appropriate reactive types (Mono/Flux)
  • Implement proper error handling
  • Use backpressure strategies
  • Choose appropriate schedulers
  • Implement proper testing
  • Use operators efficiently
  • Handle resource cleanup
  • Monitor reactive streams

Conclusion

Reactive Programming in Java provides a powerful approach to building responsive and scalable applications. By understanding the patterns, implementing proper error handling, and following best practices, developers can create robust reactive applications that meet modern business needs.