RxJava is a powerful library for composing asynchronous and event-based programs using observable sequences. This guide covers everything you need to know about reactive programming with RxJava.
Key areas covered:
public class RxBasics {
public Observable createObservable() {
return Observable.just("Hello", "World")
.map(String::toUpperCase)
.filter(s -> s.startsWith("H"));
}
public Single createSingle() {
return Single.just("Hello")
.map(String::toUpperCase);
}
public Maybe createMaybe() {
return Maybe.just("Hello")
.filter(s -> s.length() > 10);
}
public Completable createCompletable() {
return Completable.fromAction(() -> {
// Perform some operation
System.out.println("Operation completed");
});
}
public Flowable createFlowable() {
return Flowable.range(1, 1000000)
.onBackpressureBuffer();
}
}
public class RxOperators {
public Observable transformData() {
return Observable.just("1", "2", "3")
.map(Integer::parseInt)
.map(i -> i * 2)
.map(String::valueOf)
.doOnNext(s -> log.info("Value: {}", s))
.doOnError(e -> log.error("Error: {}", e.getMessage()));
}
public Observable combineObservables() {
Observable obs1 = Observable.range(1, 3);
Observable obs2 = Observable.range(4, 3);
return Observable.zip(obs1, obs2,
(a, b) -> a + b);
}
public Observable> bufferExample() {
return Observable.range(1, 10)
.buffer(3)
.doOnNext(list ->
log.info("Processing batch: {}", list));
}
public Observable debounceExample() {
return Observable.interval(100, TimeUnit.MILLISECONDS)
.debounce(200, TimeUnit.MILLISECONDS);
}
}
public class RxSchedulers {
public Observable subscribeOnExample() {
return Observable.fromCallable(() -> {
// Simulate long running operation
Thread.sleep(1000);
return "Result";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Observable multipleThreads() {
return Observable.just("Data")
.subscribeOn(Schedulers.io())
.map(s -> {
// IO Thread operation
return s.toLowerCase();
})
.observeOn(Schedulers.computation())
.map(s -> {
// Computation Thread operation
return s.toUpperCase();
})
.observeOn(AndroidSchedulers.mainThread());
}
public Flowable intervalExample() {
return Flowable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(l -> updateUI(l));
}
private void updateUI(Long value) {
// UI update logic
}
}
public class RxErrorHandling {
public Observable handleErrors() {
return Observable.just("A", "B", null, "C")
.map(String::toLowerCase)
.onErrorReturn(e -> "default")
.doOnError(e -> log.error("Error occurred", e));
}
public Observable retryWithDelay() {
return Observable.just("Data")
.map(this::riskyOperation)
.retryWhen(errors ->
errors.zipWith(
Observable.range(1, 3),
(error, count) -> {
if (count < 3) return count;
else throw error;
})
.flatMap(count ->
Observable.timer(
count * 2, TimeUnit.SECONDS)));
}
public Observable handleTimeout() {
return Observable.fromCallable(() ->
callExternalService())
.timeout(5, TimeUnit.SECONDS)
.onErrorResumeNext(Observable.just(
"Fallback"));
}
private String riskyOperation(String input) {
// Potentially throwing operation
return input.toUpperCase();
}
private String callExternalService() {
// External service call simulation
return "Response";
}
}
public class RxTesting {
private TestScheduler testScheduler;
@Before
public void setup() {
testScheduler = new TestScheduler();
}
@Test
public void testObservable() {
TestObserver testObserver =
Observable.just("test")
.delay(1, TimeUnit.SECONDS, testScheduler)
.test();
testObserver.assertEmpty();
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValue("test")
.assertComplete();
}
@Test
public void testErrorHandling() {
TestObserver