Java RxJava Guide: Complete Tutorial

1️⃣ Introduction

RxJava is a powerful library for composing asynchronous and event-based programs using observable sequences. This guide covers everything you need to know about reactive programming with RxJava.

Key areas covered:

  • Observable Types
  • Operators
  • Schedulers
  • Error Handling
  • Testing
  • Best Practices

2️⃣ Observable Types

🔹 Observable Creation

public class RxBasics {
    public Observable createObservable() {
        return Observable.just("Hello", "World")
            .map(String::toUpperCase)
            .filter(s -> s.startsWith("H"));
    }
    
    public Single createSingle() {
        return Single.just("Hello")
            .map(String::toUpperCase);
    }
    
    public Maybe createMaybe() {
        return Maybe.just("Hello")
            .filter(s -> s.length() > 10);
    }
    
    public Completable createCompletable() {
        return Completable.fromAction(() -> {
            // Perform some operation
            System.out.println("Operation completed");
        });
    }
    
    public Flowable createFlowable() {
        return Flowable.range(1, 1000000)
            .onBackpressureBuffer();
    }
}

🔹 Operators

public class RxOperators {
    public Observable transformData() {
        return Observable.just("1", "2", "3")
            .map(Integer::parseInt)
            .map(i -> i * 2)
            .map(String::valueOf)
            .doOnNext(s -> log.info("Value: {}", s))
            .doOnError(e -> log.error("Error: {}", e.getMessage()));
    }
    
    public Observable combineObservables() {
        Observable obs1 = Observable.range(1, 3);
        Observable obs2 = Observable.range(4, 3);
        
        return Observable.zip(obs1, obs2,
            (a, b) -> a + b);
    }
    
    public Observable> bufferExample() {
        return Observable.range(1, 10)
            .buffer(3)
            .doOnNext(list -> 
                log.info("Processing batch: {}", list));
    }
    
    public Observable debounceExample() {
        return Observable.interval(100, TimeUnit.MILLISECONDS)
            .debounce(200, TimeUnit.MILLISECONDS);
    }
}

3️⃣ Schedulers

🔹 Threading Control

public class RxSchedulers {
    public Observable subscribeOnExample() {
        return Observable.fromCallable(() -> {
            // Simulate long running operation
            Thread.sleep(1000);
            return "Result";
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
    }
    
    public Observable multipleThreads() {
        return Observable.just("Data")
            .subscribeOn(Schedulers.io())
            .map(s -> {
                // IO Thread operation
                return s.toLowerCase();
            })
            .observeOn(Schedulers.computation())
            .map(s -> {
                // Computation Thread operation
                return s.toUpperCase();
            })
            .observeOn(AndroidSchedulers.mainThread());
    }
    
    public Flowable intervalExample() {
        return Flowable.interval(1, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(l -> updateUI(l));
    }
    
    private void updateUI(Long value) {
        // UI update logic
    }
}

4️⃣ Error Handling

🔹 Error Handling Strategies

public class RxErrorHandling {
    public Observable handleErrors() {
        return Observable.just("A", "B", null, "C")
            .map(String::toLowerCase)
            .onErrorReturn(e -> "default")
            .doOnError(e -> log.error("Error occurred", e));
    }
    
    public Observable retryWithDelay() {
        return Observable.just("Data")
            .map(this::riskyOperation)
            .retryWhen(errors ->
                errors.zipWith(
                    Observable.range(1, 3),
                    (error, count) -> {
                        if (count < 3) return count;
                        else throw error;
                    })
                .flatMap(count ->
                    Observable.timer(
                        count * 2, TimeUnit.SECONDS)));
    }
    
    public Observable handleTimeout() {
        return Observable.fromCallable(() -> 
            callExternalService())
            .timeout(5, TimeUnit.SECONDS)
            .onErrorResumeNext(Observable.just(
                "Fallback"));
    }
    
    private String riskyOperation(String input) {
        // Potentially throwing operation
        return input.toUpperCase();
    }
    
    private String callExternalService() {
        // External service call simulation
        return "Response";
    }
}

5️⃣ Testing

🔹 TestScheduler Usage

public class RxTesting {
    private TestScheduler testScheduler;
    
    @Before
    public void setup() {
        testScheduler = new TestScheduler();
    }
    
    @Test
    public void testObservable() {
        TestObserver testObserver = 
            Observable.just("test")
                .delay(1, TimeUnit.SECONDS, testScheduler)
                .test();
        
        testObserver.assertEmpty();
        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        testObserver.assertValue("test")
            .assertComplete();
    }
    
    @Test
    public void testErrorHandling() {
        TestObserver testObserver =
            Observable.error(new RuntimeException())
                .onErrorResumeNext(
                    Observable.just("fallback"))
                .test();
        
        testObserver.assertValue("fallback")
            .assertComplete();
    }
    
    @Test
    public void testInterval() {
        TestObserver testObserver =
            Observable.interval(
                1, TimeUnit.SECONDS, testScheduler)
                .take(3)
                .test();
        
        testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
        testObserver.assertValues(0L, 1L);
        
        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        testObserver.assertValues(0L, 1L, 2L)
            .assertComplete();
    }
}



6️⃣ Q&A / Frequently Asked Questions

Observable types: (1) Observable - standard observable sequence. (2) Single - emits one value or error. (3) Maybe - emits one value, no value, or error. (4) Completable - only completes or errors. (5) Flowable - backpressure-aware Observable.

Scheduler usage: (1) IO - network/disk operations. (2) Computation - CPU-intensive work. (3) NewThread - dedicated thread. (4) Single - sequential operations. (5) Trampoline - current thread queue. (6) MainThread - UI operations.

Backpressure strategies: (1) Use Flowable. (2) Buffer overflow strategy. (3) Latest value strategy. (4) Drop oldest strategy. (5) Error strategy. (6) Missing backpressure handler.

7️⃣ Best Practices & Pro Tips 🚀

  • Choose appropriate Observable type
  • Handle errors properly
  • Use appropriate Schedulers
  • Implement proper disposal
  • Consider backpressure
  • Use appropriate operators
  • Test thoroughly
  • Monitor memory usage
  • Handle lifecycle
  • Use CompositeDisposable
  • Implement proper logging
  • Documentation

Read Next 📖

Conclusion

RxJava provides powerful reactive programming capabilities for Java applications. By following the patterns and practices outlined in this guide, you can effectively implement reactive and event-driven systems.

Remember to focus on proper error handling, threading management, and testing for reliable reactive applications.