- Notifications
You must be signed in to change notification settings - Fork7.6k
What's different in 3.0
- Introduction
- Behavior changes
- More undeliverable errors
- Connectable source reset
- Flowable.publish pause
- Processor.offer null-check
- MulticastProcessor.offer fusion-check
- Group abandonment in groupBy
- Backpressure in groupBy
- Window abandonment in window
- CompositeException cause generation
- Parameter validation exception change
- From-callbacks upfront cancellation
- Using cleanup order
- API changes
- Interoperation
- Miscellaneous
Welcome to the new major release of RxJava, a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
As with every such release, there have been quite a lot of trivial and non-trivial changes, cleanups and improvements all across the codebase, which warrant some detailed and comprehensive explanations nonetheless.
With each major release, we take the liberty to introduce potential and actual binary and behavioral incompatible changes so that past mistakes can be corrected and technical debt can be repaid.
Please read this guide to its full extent before posting any issue about "why X no longer compiles". Please also take note of sentences marked with
With the release of RxJava 3.0.0, the previous version line, 2.2.x, is in maintenance mode. This meansonly bugfixes will be accepted and applied;no new operators ordocumentation changes will be accepted or applied.
ℹ️ 2.x will be supported untilFebruary 28, 2021, after which all development on that branch will stop.
RxJava 3 lives in the groupio.reactivex.rxjava3 with artifact IDrxjava. Official language/platform adaptors will also be located under the groupio.reactivex.rxjava3.
The following examples demonstrate the typical import statements. Please consider the latest version and replace3.0.0 with the numbers from the badge:
dependencies { implementation'io.reactivex.rxjava3:rxjava:3.0.0'}<dependency> <groupId>io.reactivex.rxjava3</groupId> <artifactId>rxjava</artifactId> <version>3.0.0</version></dependency>
ℹ️Further references: PR#6421
The 3.x documentation of the various components can be found at
Sub-version specific documentation is available under a version tag, for example
(replace3.0.0-RC9 with the numbers from the badge:).
The documentation of the current snapshot is under
For a long time, RxJava was limited to Java 6 API due to how Android was lagging behind in its runtime support. This changed with the upcoming Android Studio 4 previews where a process calleddesugaring is able to turn many Java 7 and 8 features into Java 6 compatible ones transparently.
This allowed us to increase the baseline of RxJava to Java 8 and add official support for many Java 8 constructs:
Stream: usejava.util.stream.Streamas a source or expose sequences asblockingStreams.- Stream
Collectors: aggregate items into collections specified bystandard transformations. Optional: helps with thenon-nullness requirement of RxJavaCompletableFuture: consumeCompletableFutures non-blockingly or expose single results asCompletableFutures.- Use site non-null annotation: helps with some functional types be able to return null in specific circumstances.
However, some features won't be supported:
java.time.Duration: would add a lot of overloads; can always be decomposed into thetime+unitmanually.java.util.function: these can't throwThrowables, overloads would create bloat and/or ambiguity
Consequently, one has to change the project's compilation target settings to Java 8:
sourceCompatibility=JavaVersion.VERSION_1_8targetCompatibility=JavaVersion.VERSION_1_8
or
android { compileOptions { sourceCompatibilityJavaVersion.VERSION_1_8 targetCompatibilityJavaVersion.VERSION_1_8 }}Due to the state of the Android Desugar tooling, as of writing this page, the internals of pre-existing, non-Java 8 related RxJava operators do not use Java 8 constructs or types. This allows using these "older" operators with Android API levels where the desugaring tool doesn't provide automatic Java 8 backports of various constructs.
ℹ️Further references: Issue#6695, PR#6765,other PRs
RxJava 3 components are located under theio.reactivex.rxjava3 package (RxJava 1 hasrx and RxJava 2 is justio.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable,Observer, etc.) have been moved toio.reactivex.rxjava3.core.
| Component | RxJava 2 | RxJava 3 |
|---|---|---|
| Core | io.reactivex | io.reactivex.rxjava3.core |
| Annotations | io.reactivex.annotations | io.reactivex.rxjava3.annotations |
| Disposables | io.reactivex.disposables | io.reactivex.rxjava3.disposables |
| Exceptions | io.reactivex.exceptions | io.reactivex.rxjava3.exceptions |
| Functions | io.reactivex.functions | io.reactivex.rxjava3.functions |
| Flowables | io.reactivex.flowables | io.reactivex.rxjava3.flowables |
| Observables | io.reactivex.observables | io.reactivex.rxjava3.observables |
| Subjects | io.reactivex.subjects | io.reactivex.rxjava3.subjects |
| Processors | io.reactivex.processors | io.reactivex.rxjava3.processors |
| Observers | io.reactivex.observers | io.reactivex.rxjava3.observers |
| Subscribers | io.reactivex.subscribers | io.reactivex.rxjava3.subscribers |
| Parallel | io.reactivex.parallel | io.reactivex.rxjava3.parallel |
| Internal | io.reactivex.internal | io.reactivex.rxjava3.internal |
Due to naming matches, IDE's tend to importjava.util.Observable instead of picking RxJava'sio.reactivex.rxjava3.core.Observable. One can usually have the IDE ignorejava.util.Observable andjava.util.Observer, or otherwise, specify an explicitimport io.reactivex.rxjava3.core.Observable; in the affected files.
Also since RxJava 3 now requires a Java 8 runtime, the standard library functional interfaces, such asjava.util.function.Function, may be picked instead ofio.reactivex.rxjava3.functions.Function. IDEs tend to give non-descriptive errors such as "Function can't be converted to Function", omitting the fact about the package differences.
ℹ️Further references: PR#6621
Sometimes, the design of components and operators turn out to be inadequate, too limited or wrong in some circumstances. Major releases such as this allow us to make the necessary changes that would have caused all sorts of problems in a patch release.
With RxJava 2.x,the goal was set to not let any errors slip away in case the sequence is no longer able to deliver them to the consumers for some reason. Despite our best efforts, errors still could have been lost in various race conditions across many dozen operators.
Fixing this in a 2.x patch would have caused too much trouble, therefore, the fix was postponed to the, otherwise already considerably changing, 3.x release. Now, canceling an operator that delays errors internally will signal those errors to the global error handler viaRxJavaPlugins.onError().
RxJavaPlugins.setErrorHandler(error ->System.out.println(error));PublishProcessor<Integer>main =PublishProcessor.create();PublishProcessor<Integer>inner =PublishProcessor.create();// switchMapDelayError will delay all errorsTestSubscriber<Integer>ts =main.switchMapDelayError(v ->inner).test();main.onNext(1);// the inner failsinner.onError(newIOException());// the consumer is still cluelessts.assertEmpty();// the consumer cancelsts.cancel();// console prints// io.reactivex.rxjava3.exceptions.UndeliverableException:// The exception could not be delivered to the consumer because// it has already canceled/disposed the flow or the exception has// nowhere to go to begin with. Further reading:// https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling// | java.io.IOException
ℹ️Further references:PRs
The purpose of the connectable types (ConnectableFlowable andConnectableObservable) is to allow one or more consumers to be prepared before the actual upstream gets streamed to them upon callingconnect(). This worked correctly for the first time, but had some trouble if the upstream terminated instead of getting disconnected. In this terminating case, depending on whether the connectable was created withreplay() orpublish(), fresh consumers would either be unable to receive items from a new connection or would miss items altogether.
With 3.x, connectables have to be reset explicitly when they terminate. This extra step allows consumers to receive cached items or be prepared for a fresh connection.
Withpublish, if the connectable terminates, consumers subscribing later will only receive the terminal event. One has to callreset() so that a late consumer will receive items from a fresh connection.
ConnectableFlowable<Integer>connectable =Flowable.range(1,10).publish();// prepare consumers, nothing is signaled yetconnectable.subscribe(/* ... */);connectable.subscribe(/* ... */);// connect, current consumers will receive itemsconnectable.connect();// let it terminateThread.sleep(2000);// late consumers now will receive a terminal eventconnectable.subscribe(item -> { },error -> { }, () ->System.out.println("Done!"));// reset the connectable to appear fresh againconnectable.reset();// fresh consumers, they will also be ready to receiveconnectable.subscribe(System.out::println,error -> { }, () ->System.out.println("Done!"));// connect, the fresh consumer now gets the new itemsconnectable.connect();
Withreplay, if the connectable terminates, consumers subscribing later will receive the cached items. One has to callreset to discard this cache so that late consumers can then receive fresh items.
ConnectableFlowable<Integer>connectable =Flowable.range(1,10).replay();// prepare consumers, nothing is signaled yetconnectable.subscribe(System.out::println);connectable.subscribe(System.out::println);// connect, current consumers will receive itemsconnectable.connect();// let it terminateThread.sleep(2000);// late consumers will still receive the cached itemsconnectable.subscribe(System.out::println,error -> { }, () ->System.out.println("Done!"));// reset the connectable to appear fresh againconnectable.reset();// fresh consumers, they will also be ready to receiveconnectable.subscribe(System.out::println,error -> { }, () ->System.out.println("Done!"));// connect, the fresh consumer now gets the new itemsconnectable.connect();
ℹ️Further references: Issue#5628, PR#6519
The implementation ofFlowable.publish hosts an internal queue to support backpressure from its downstream. In 2.x, this queue, and consequently the upstream source, was slowly draining on its own if all of the resultingConnectableFlowable's consumers have cancelled. This caused unexpected item loss when the lack of consumers was only temporary.
With 3.x, the implementation pauses and items already in the internal queue will be immediately available to consumers subscribing a bit later.
ConnectableFlowable<Integer>connectable =Flowable.range(1,200).publish();connectable.connect();// the first consumer takes only 50 items and cancelsconnectable.take(50).test().assertValueCount(50);// with 3.x, the remaining items will be still availableconnectable.test().assertValueCount(150);
ℹ️Further references: Issue#5899, PR#6519
CallingPublishProcessor.offer(),BehaviorProcessor.offer() orMulticastProcessor.offer with a null argument now throws aNullPointerException instead of signaling it viaonError and thus terminating the processor. This now matches the behavior of theonNext method required by theReactive Streams specification.
PublishProcessor<Integer>pp =PublishProcessor.create();TestSubscriber<Integer>ts =pp.test();try {pp.offer(null);}catch (NullPointerExceptionexpected) {}// no error receivedts.assertEmpty();pp.offer(1);// consumers are still there to receive proper itemsts.asssertValuesOnly(1);
ℹ️Further references: PR#6799
MulticastProcessor was designed to be processor that coordinates backpressure like theFlowable.publish operators do. It includes internal optimizations such as operator-fusion when subscribing it to the right kind of source.
Since users can retain the reference to the processor itself, they could, in concept, call theonXXX methods and possibly cause trouble. The same is true for theoffer method which, when called while the aforementioned fusion is taking place, leads to undefined behavior in 2.x.
With 3.x, theoffer method will throw anIllegalStateException and not disturb the internal state of the processor.
ℹ️Further references: PR#6799
ThegroupBy operator is one of the peculiar operators that signals reactive sources as its main output where consumers are expected to subscribe to these inner sources as well. Consequently, if the main sequence gets cancelled (i.e., theFlowable<GroupedFlowable<T>> itself), the consumers should still keep receiving items on their groups but no new groups should be created. The original source can then only be cancelled if all of such inner consumers have cancelled as well.
However, in 2.x, nothing is forcing the consumption of the inner sources and thus groups may be simply ignored altogether, preventing the cancellation of the original source and possibly leading to resource leaks.
With 3.x, the behavior ofgroupBy has been changed so that when it emits a group, the downstream has to subscribe to it synchronously. Otherwise, the group is considered "abandoned" and terminated. This way, abandoned groups won't prevent the cancellation of the original source. If a late consumer still subscribes to the group, the item that triggered the group creation will be still available.
Synchronous subscription means the following flow setupswill cause abandonment and possibly group re-creation:
// observeOn creates a time gap between group emission// and subscriptionsource.groupBy(v ->v).observeOn(Schedulers.computation()).flatMap(g ->g)// subscribeOn creates a time gap toosource.groupBy(v ->v).flatMap(g ->g.subscribeOn(Schedulers.computation()))
Since the groups are essentially hot sources, one should useobserveOn to move the processing of the items safely to another thread anyway:
source.groupBy(v ->v).flatMap(g ->g.observeOn(Schedulers.computation()) .map(v ->v +1))
ℹ️Further references: Issue#6596, PR#6642
TheFlowable.groupBy operator is even more peculiar in a way that it has to coordinate backpressure from the consumers of its inner group and request from its originalFlowable. The complication is, such requests can lead to a creation of a new group, a new item for the group that itself requested or a new item for a completely different group altogether. Therefore, groups can affect each other's ability to receive items and can hang the sequence, especially if some groups don't get to be consumed at all.
This latter can happen when groups are merged viaflatMap where the number of individual groups is greater than theflatMap's concurrency level (defaul 128) so fresh groups won't get subscribed to and old ones may not complete to make room. WithconcatMap, the same issue can manifest immediately.
Since RxJava is non-blocking, such silent hangs are difficult to detect and diagnose (i.e., no thread is blocking ingroupBy orflatMap). Therefore, 3.x changed the behavior ofgroupBy so that if the immediate downstream is unable to receive a new group, the sequence terminates withMissingBackpressureException:
Flowable.range(1,1000).groupBy(v ->v).flatMap(v ->v,16).test().assertError(MissingBackpressureException);
The error message will also indicate the group index:
Unable to emit a new group (#16) due to lack of requests. Please make sure the downstream can always accept a new group and each group is consumed for the whole operator to be able to proceed.
Increasing the concurrency level to the right amount (orInteger.MAX_VALUE if the number of groups is not known upfront) should resolve the problem:
.flatMap(v ->v,1000)
ℹ️Further references: Issue#6641, PR#6740
Similar togroupBy, thewindow operator emits inner reactive sequences that should still keep receiving items when the outer sequence is cancelled (i.e., working with only a limited set of windows). Similarly, when all window consumers cancel, the original source should be cancelled as well.
However, in 2.x, nothing is forcing the consumption of the inner sources and thus windows may be simply ignored altogether, preventing the cancellation of the original source and possibly leading to resource leaks.
With 3.x, the behavior of allwindow operators has been changed so that when it emits a group, the downstream has to subscribe to it synchronously. Otherwise, the window is considered "abandoned" and terminated. This way, abandoned windows won't prevent the cancellation of the original source. If a late consumer still subscribes to the window, the item that triggered the window creationmay be still available.
Synchronous subscription means the following flow setupswill cause abandonment:
// observeOn creates a time gap between window emission// and subscriptionsource.window(10,5).observeOn(Schedulers.computation()).flatMap(g ->g)// subscribeOn creates a time gap toosource.window(1,TimeUnit.SECONDS).flatMap(g ->g.subscribeOn(Schedulers.computation()))
Since the windows are essentially hot sources, one should useobserveOn to move the processing of the items safely to another thread anyway:
source.window(1,TimeUnit.SECONDS).flatMap(g ->g.observeOn(Schedulers.computation()) .map(v ->v +1))
ℹ️Further references: PR#6758, PR#6761, PR#6762
In 1.x and 2.x, calling theCompositeException.getCause() method resulted in a generation of a chain of exceptions from the internal list of aggregated exceptions. This was mainly done because Java 6 lacks the suppressed exception feature of Java 7+ exceptions. However, the implementation was possibly mutating exceptions or, sometimes, unable to establish a chain at all. Given the source of the original contribution of the method, it was risky to fix the issues with it in 2.x.
With 3.x, the method constructs a cause exception that when asked for a stacktrace, generates an output without touching the aggregated exceptions (which is IDE friendly and should be navigable):
Multiple exceptions (2)|-- io.reactivex.rxjava3.exceptions.TestException: ex3 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:341)|-- io.reactivex.rxjava3.exceptions.TestException: ex4 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:342) |-- io.reactivex.rxjava3.exceptions.CompositeException: 2 exceptions occurred. at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:337) |-- io.reactivex.rxjava3.exceptions.CompositeException.ExceptionOverview: Multiple exceptions (2) |-- io.reactivex.rxjava3.exceptions.TestException: ex1 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:335) |-- io.reactivex.rxjava3.exceptions.TestException: ex2 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:336)
ℹ️Further references: Issue#6747, PR#6748
Some standard operators in 2.x throwIndexOutOfBoundsException when the respective argument was invalid. For consistency with other parameter validation exceptions, the following operators now throwIllegalArgumentException instead:
skipskipLasttakeLasttakeLastTimed
ℹ️Further references: PR#6831, PR#6835
In 2.x, canceling sequences created viafromRunnable andfromAction were inconsistent with otherfromX sequences when the downstream cancelled/disposed the sequence immediately.
In 3.x, such upfront cancellation will not execute the given callback.
Runnablerun =mock(Runnable.class);Completable.fromRunnable(run).test(true);// cancel upfrontverify(run,never()).run();
ℹ️Further references: PR#6873
The operatorusing has aneager parameter to determine when the resource should be cleaned up:true means before-termination andfalse means after-termination. Unfortunately, this setting didn't affect the cleanup order upon donwstream cancellation and was always cleaning up the resource before canceling the upstream.
In 3.x, the cleanup order is now consistent when the sequence terminates or gets cancelled:true means before-termination or before canceling the upstream,false means after-termination or after canceling the upstream.
ℹ️Further references: Issue#6347, PR#6534
A major release allows cleaning up and improving the API surface by adding, changing or removing elements all across. With 3.x, there are several of such changes that require some explanations.
RxJava 2.x introduced a custom set of functional interfaces inio.reactivex.functions so that the use of the library is possible with the same types on Java 6 and Java 8. A secondary reason for such custom types is that the standard Java 8 function types do not support throwing any checked exceptions, which in itself can result in some inconvenience when using RxJava operators.
Despite RxJava 3 being based on Java 8, the issues with the standard Java 8 functional interfaces persist, now with possibledesugaring issues on Android and their inability to throw checked exceptions. Therefore, 3.x kept the custom interfaces, but the@FunctionalInterface annotation has been applied to them (which is safe/ignored on Android).
@FunctionalInterfaceinterfaceFunction<@NonNullT,@NonNullR> {Rapply(Tt)throwsThrowable;}
In addition, Java 8 allows declaring annotations on type argument and type argument use individually and thus all functional interfaces have received nullability annotations.
ℹ️Further references: PR#6840, PR#6791, PR#6795
One small drawback with the customthrows Exception in the functional interfaces is that some 3rd party APIs may throw a checked exception that is not a descendant ofException, or simply throwThrowable.
Therefore, with 3.x, the functional interfaces as well as other support interfaces have been widened and declared withthrows Throwable in their signature.
This widening should be inconsequential for lambda-based or class-implementations provided to the RxJava methods:
source.map(v -> {if (v ==0) {thrownewException(); }returnv;});source.filter(newPredicate<Integer>() {@Overridepublicbooleantest()throwsException {thrownewIOException(); }});
I.e., there is no need to changethrows Exception tothrows Throwable just for the sake of it.
However, if one uses these functional interfaces outside:
staticvoidIntegercallFunction(Function<Integer,Integer>function,Integervalue)throwsException {returnfunction.apply(value);}
the widening ofthrows will have to be propagated:
staticvoidIntegercallFunction(Function<Integer,Integer>function,Integervalue)throwsThrowable {returnfunction.apply(value);}
ℹ️Further references: PR#6511, PR#6579
RxJava 2.x already supported the standardjava.util.concurrent.Callable whosecall method is declared withthrows Exception by default. Unfortunately, when our custom functional interfaces werewidened tothrows Throwable, it was impossible to widenCallable because in Java, implementations can't widen thethrows clause, only narrow or abandon it.
Therefore, 3.x introduces theio.reactivex.rxjava3.functions.Supplier interface that defines the widestthrows possible:
interfaceSupplier<@NonNullR> {Rget()throwsThrowable;}
Due to naming matches, IDE's tend to importjava.util.function.Supplier instead of picking RxJava'sio.reactivex.rxjava3.functions.Supplier. Also IDEs tend to give non-descriptive errors such as "Suppliercan't be converted to Supplier", omitting the fact about the package differences.
To comply with the support for wider throws functional interfaces, many operators used to takejava.util.concurrent.Callable now takeio.reactivex.rxjava3.functions.Supplier instead. If the operator was used with a lambda, only recompilation is needed:
Flowable.defer(() ->Flowable.just(Math.random()));
However, if explicit implementation was used:
Flowable.defer(newCallable<Double>() {@OverridepublicDoublecall()throwsException {returnMath.random(); }});
theinterface type (Callable ->Supplier) and themethod name (call ->get) has to be adjusted:
Flowable.defer(newSupplier<Double>() {@OverridepublicDoubleget()throwsException {returnMath.random(); }});
See theAPI signature changes section on which operators are affected.
ℹ️Further references: PR#6511
In 2.x, theto() operator used the genericFunction to allow assembly-time conversion of flows into arbitrary types. The drawback of thisapproach was that each base reactive type had the sameFunction interface in their method signature,thus it was impossible to implement multiple converters for different reactive types within the same class.To work around this issue, theas operator andXConverter interfaces have been introducedin 2.x, which interfaces are distinct and can be implemented on the same class. Changing the signature ofto in 2.x was not possible dueto the pledged binary compatibility of the library.
From 3.x, theas() methods have been removed and theto() methods now each work with their respectiveXConverter interfaces (hosted in packageio.reactivex.rxjava3.core):
Flowable.to(Function<Flowable<T>, R>)->Flowable.to(FlowableConverter<T, R>)Observable.to(Function<Observable<T>, R>)->Observable.to(ObservableConverter<T, R>)Maybe.to(Function<Flowable<T>, R>)->Maybe.to(MaybeConverter<T, R>)Single.to(Function<Flowable<T>, R>)->Maybe.to(SingleConverter<T, R>)Completable.to(Function<Completable, R>)->Completable.to(CompletableConverter<R>)ParallelFlowable.to(Function<ParallelFlowable<T>, R)->ParallelFlowable.to(ParallelFlowableConverter<T, R>)
If one was using these methods with a lambda expression, only a recompilation is needed:
// beforesource.to(flowable ->flowable.blockingFirst());// aftersource.to(flowable ->flowable.blockingFirst());
If one was implementing a Function interface (typically anonymously), the interface type, type arguments and thethrows clause have to be adjusted
// beforesource.to(newFunction<Flowable<Integer>,Integer>() {@OverridepublicIntegerapply(Flowable<Integer>t)throwsException {returnt.blockingFirst(); }});// aftersource.to(newFlowableConverter<Integer,Integer>() {@OverridepublicIntegerapply(Flowable<Integer>t) {returnt.blockingFirst(); }});
ℹ️Further references: Issue#5654, PR#6514
Moving to Java 8 and Android'sdesugaring tooling allows the use of static interface methods instead of separate factory classes. The support classio.reactivex.disposables.Disposables was a prime candidate for moving all of its methods into theDisposable interface itself (io.reactivex.rxjava3.disposables.Disposable).
Uses of the factory methods:
Disposabled =Disposables.empty();
should now be turned into:
Disposabled =Disposable.empty();
ℹ️Further references: PR#6781
Internally, RxJava 2.x uses an abstraction of a disposable container instead of usingCompositeDisposable everywhere, allowing a more appropriate container type to be used. This is achieved via an internalDisposableContainer implemented byCompositeDisposable as well as other internal components. Unfortunately, since the public class referenced an internal interface, RxJava was causing warnings in OSGi environments.
In RxJava 3, theDisposableContainer is now part of the public API underio.reactivex.rxjava3.disposables.DisposableContainer and no longer causes OSGi issues.
ℹ️Further references: Issue#6742, PR#6745
The RxJava 2.2.x line has still a couple ofexperimental operators (but nobeta) operators, which have been promoted to standard with 3.x:
ℹ️Further references: PR#6537
RxJava 3 received a considerable amount of new operators and methods across its API surface. Brand new operators introduced are marked with
Available in:Flowable,Observable,Maybe,Single,Completable
A limitation with the boundedreplay operator is that to allow continuous item delivery to slow consumers, a linked list of the cached items has to be maintained. By default, the head node of this list is moved forward when the boundary condition (size, time) mandates it. This setup avoids allocation in exchange for retaining one "invisible" item in the linked list. However, sometimes this retention is unwanted and the allocation overhead of a clean node is acceptable. In 2.x, theReplaySubject andReplayProcessor implementations already allowed for such behavior, but the instancereplay() operators did not.
With 3.x, thereplay operators (both connectable and multicasting variants) received overloads, defining aneagerTruncate option that performs this type of head node cleanup.
replay(int, boolean)replay(long, TimeUnit, Scheduler, boolean)replay(int, long, TimeUnit, Scheduler, boolean)replay(Function, int, boolean)replay(Function, long, TimeUnit, Scheduler, boolean)replay(Function, int, long, TimeUnit, Scheduler, boolean)
replay(int, boolean)replay(long, TimeUnit, Scheduler, boolean)replay(int, long, TimeUnit, Scheduler, boolean)replay(Function, int, boolean)replay(Function, long, TimeUnit, Scheduler, boolean)replay(Function, int, long, TimeUnit, Scheduler, boolean)
ℹ️Further references: Issue#6475, PR#6532
Available in:Flowable,Observable,Maybe,Single,Completable
A property of theconcatMap operator is that themapper function may be invoked either on the subscriber's thread or the currently completing inner source's thread. There is no good way to control this thread of invocation from the outside, therefore, new overloads have been added in 3.x with an additionalScheduler parameter:
ℹ️Further references: Issue#6447, PR#6538
By default,Schedulers.from executes work on the suppliedExecutor in an eager mode, running as many tasks as available. This can cause some unwanted lack of interleaving between these tasks and external tasks submitted to the same executor. To remedy the situation, a new mode and overload has been added so that theScheduler returned bySchedulers.from runs tasks one by one, allowing other external tasks to be interleaved.
ℹ️Further references: Issue#6696, Issue#6697, PR#6744
Available in:Flowable,Observable,Maybe,Single,Completable
The underlyingblockingIterable operator had already the option to specify the internal buffer size (and prefetch amounts), which is now exposed via newblockingForEach overloads
ℹ️Further references: Issue#6784, PR#6800
Available in:Flowable,Observable,Maybe,Single,Completable
For API consistency, the callback-basedblockingSubscribe methods have been introduced toMaybe,Single andCompletable respectively.
blockingSubscribe()blockingSubscribe(Consumer)blockingSubscribe(Consumer, Consumer)blockingSubscribe(Consumer, Consumer, Action)blockingSubscribe(MaybeObserver)
blockingSubscribe()blockingSubscribe(Consumer)blockingSubscribe(Consumer, Consumer)blockingSubscribe(SingleObserver)
blockingSubscribe()blockingSubscribe(Action)blockingSubscribe(Action, Consumer)blockingSubscribe(CompletableObserver)
ℹ️Further references: Issue#6852, PR#6862
Available in:Flowable,Observable,Maybe,Single,Completable
The option, available in every other reactive type, to delay the errors optionally as well was missing fromMaybe.
ℹ️Further references: Issue#6863, PR#6864
Available in:Flowable,Observable,Maybe,Single,Completable
Upon an error, the sequence is completed (conditionally) instead of signaling the error.
ℹ️Further references: Issue#6852, PR#6867
Available in:Flowable,Observable,Maybe,Single,Completable
This operator (under the nameonErrorResumeNext now renamed) was already available everywhere else and was accidentally left out ofCompletable.
ℹ️Further references: Issue#6852, PR#6868
Available in:Flowable,Observable,Maybe,Single,Completable
The operator was missing fromSingle andCompletable.
ℹ️Further references: Issue#6852, PR#6869
Available in:Flowable,Observable,Maybe,Single,Completable
Added the static version of theswitchMap operator,switchOnNext andswitchOnNextDelayError, toMaybe,Single andCompletable.
Maybe.switchOnNext(Function)Single.switchOnNext(Function)Completable.switchOnNext(Function)Maybe.switchOnNextDelayError(Function)Single.switchOnNextDelayError(Function)Completable.switchOnNextDelayError(Function)
ℹ️Further references: Issue#6852, PR#6870
Available in:Flowable,Observable,Maybe,Single,Completable
The operator was already added to the other reactive types before.
ℹ️Further references: Issue#6852, PR#6871
Several operators have been added across:
| Operator | F | O | M | S | C |
|---|---|---|---|---|---|
fromAction | ![]() | ![]() | ![]() | (23) | ![]() |
fromCompletable | ![]() | ![]() | ![]() | (72) | (73) |
fromMaybe | ![]() | ![]() | (73) | ![]() | ![]() |
fromObservable | ![]() | (73) | ![]() | ![]() | ![]() |
fromPublisher | ![]() | ![]() | ![]() | ![]() | ![]() |
fromRunnable | ![]() | ![]() | ![]() | (23) | ![]() |
fromSingle | ![]() | ![]() | ![]() | (73) | ![]() |
fromAction(Action)fromCompletable(CompletableSource)fromMaybe(MaybeSource)fromObservable(ObservableSource, BackpressureStrategy)fromRunnable(Runnable)fromSingle(Runnable)
fromAction(Action)fromCompletable(CompletableSource)fromMaybe(MaybeSource)fromRunnable(Runnable)fromSingle(Runnable)
ℹ️Further references: Issue#6852, PR#6873
Available in:Flowable,Observable,Maybe,Single,Completable
These operators were already available forFlowable andObservable, now added toSingle andMaybe.
timeInterval()timeInterval(TimeUnit)timeInterval(Scheduler)timeInterval(TimeUnit, Scheduler)timestamp()timestamp(TimeUnit)timestamp(Scheduler)timestamp(TimeUnit, Scheduler)
timeInterval()timeInterval(TimeUnit)timeInterval(Scheduler)timeInterval(TimeUnit, Scheduler)timestamp()timestamp(TimeUnit)timestamp(Scheduler)timestamp(TimeUnit, Scheduler)
ℹ️Further references: Issue#6852, PR#6874
Available in:Flowable,Observable,Maybe,Single,Completable
This operator was already available elsewhere, now added toMaybe andCompletable.
ℹ️Further references: Issue#6852, PR#6875
Available in:Flowable,Observable,Maybe,Single,Completable
This operator was already available inFlowable andObservable, now added toMaybe andSingle.
ℹ️Further references: Issue#6852, PR#6876
Available in:Flowable,Observable,Maybe,Single,Completable
This operator was already available inFlowable andObservable, now added toMaybe,Single andCompletable.
ℹ️Further references: Issue#6852, PR#6877
Available in:Flowable,Observable,Maybe,Single,Completable
Added varios concatMap-based transformations betweenMaybe,Single andCompletable forMaybe andSingle. These are essentially aliases to the respectiveflatMap operators for better discoverability.
Maybe.concatMapCompletable(Function)Maybe.concatMapSingle(Function)Single.concatMapCompletable(Function)Single.concatMapMaybe(Function)Single.concatMap(Function)
ℹ️Further references: Issue#6852, PR#6879
Available in:Flowable,Observable,Maybe,Single,Completable
The delayError variants of theconcat operator were missing across.
Single.concatArrayDelayError(Single...)Single.concatArrayEagerDelayErrorSingle.concatDelayError(Iterable)Single.concatDelayError(Publisher)Single.concatDelayError(Publisher, int)
Completable.concatArrayDelayError(Completable...)Completable.concatDelayError(Iterable)Completable.concatDelayError(Publisher)Completable.concatDelayError(Publisher, int)
ℹ️Further references: Issue#6852, PR#6881
Available in:Flowable,Observable,Maybe,Single,Completable
The operator was already available elsewhere, now added toSingle.
ℹ️Further references: Issue#6852, PR#6882
Available in:Flowable,Observable,Maybe,Single,Completable
The operator was already available elsewhere, now added toCompletable.
ℹ️Further references: Issue#6852, PR#6884
Available in:
| source \ other | F | O | M | S | C |
|---|---|---|---|---|---|
| Flowable | ![]() | ![]() | ![]() | ![]() | ![]() |
| Observable | ![]() | ![]() | ![]() | ![]() | ![]() |
| Maybe | ![]() | ![]() | ![]() | ![]() | ![]() |
| Single | ![]() | ![]() | ![]() | ![]() | ![]() |
| Completable | ![]() | ![]() | ![]() | ![]() | ![]() |
Added overloads for better continuation support between the reactive types.
startWith(Publisher)startWith(ObservableSource)startWith(MaybeSource)startWith(SingleSource)startWith(CompletableSource)
startWith(Publisher)startWith(ObservableSource)startWith(MaybeSource)startWith(SingleSource)startWith(CompletableSource)
ℹ️Further references: Issue#6852, PR#6885
Available in:Flowable,Observable,Maybe,Single,Completable
The operatorsonErrorReturn andonErrorReturnItem weres available everywhere else and are now added toCompletable.
ℹ️Further references: Issue#6852, PR#6886
Available in:Flowable,Observable,Maybe,Single,Completable
The method was already available inFlowable andObservable, now added toMaybe,Single andCompletable for API consistency.
Maybe.safeSubscribe(MaybeObserver)Single.safeSubscribe(SingleObserver)Completable.safeSubscribe(CompletableObserver)
ℹ️Further references: Issue#6852, PR#6887
Available in:Flowable,Observable,Maybe,Single,Completable
Add two overloads offlatMap toSingle: one to transform the success or error signals into a newSingleSource, one to combine the original success value with the success value of the inner sources:
ℹ️Further references: Issue#6852, PR#6893
Available in:Flowable,Observable,Maybe,Single,Completable
Add variousconcatEager andconcatEagerDelayError overloads across the reactive types.
concatEagerDelayError(Iterable)concatEagerDelayError(Iterable, int, int)concatEagerDelayError(Publisher)concatEagerDelayError(Publisher, int, int)
concatEagerDelayError(Iterable)concatEagerDelayError(Iterable, int, int)concatEagerDelayError(Publisher)concatEagerDelayError(Publisher, int, int)
concatEager(Iterable, int)concatEager(Publisher, int)concatEagerDelayError(Iterable)concatEagerDelayError(Iterable, int)concatEagerDelayError(Publisher)concatEagerDelayError(Publisher, int)
concatEager(Iterable, int)concatEager(Publisher, int)concatEagerDelayError(Iterable)concatEagerDelayError(Iterable, int)concatEagerDelayError(Publisher)concatEagerDelayError(Publisher, int)
ℹ️Further references: Issue#6880, PR#6899
Available in:Flowable,Observable,Maybe,Single,Completable
With the new typeio.reactivex.rxjava3.functions.Supplier comes a new wrapper operatorfromSupplier to complementfromCallable in all the reactive types.
Flowable.fromSupplierObservable.fromSupplierMaybe.fromSupplierSingle.fromSupplierCompletable.fromSupplier
ℹ️Further references: PR#6529
Available in:Flowable,Observable,Maybe,Single,CompletableParallelFlowable
The operator was already available inFlowable andObservable, now added toParallelFlowable.
Flowable.range(1,10).parallel().runOn(Schedulers.computation()).flatMapIterable(v ->Arrays.asList(v,v +1));
ℹ️Further references: PR#6798
Now that the API baseline is set to Java 8, RxJava can now support the new types of Java 8 directly, without the need of an external library (such as theRxJavaJdk8Interop library, now discontinued).
However, Java 8 has language support for a relatively convenient way to convert a Java 8 functional interface to its RxJava 3 equivalent via method handles:
java.util.function.Function<Integer,Integer>f =a ->a +1;Flowable.range(1,10).map(f::apply);// orio.reactivex.rxjava3.functions.Function<Integer,Integer>f2 =f::apply;
Unfortunately, the reverse direction is not possible because Java 8's functional interfaces do not allow throwing a checked exception.
In general, the distinction between the two sets of interfaces shouldn't be a practical problem because unlike Java 8'sjava.util.Collectors, there is no repository of pre-made functional interface implementations out there that would require more direct support from RxJava.
Available in:Flowable,Observable,Maybe,Single,Completable
Given an existing, constant reference of ajava.util.Optional, turn it into aFlowable,Observable orMaybe source, emitting its value immediately or completing immediately.
💡 There is noSingle variant because ithas to be non-empty. NoCompletable either becauseit is always empty.
Flowable<Integer>zero =Flowable.fromOptional(Optional.empty());Observable<Integer>one =Flowable.fromOptional(Optional.of(1));Maybe<Integer>maybe =Flowable.fromOptional(Optional.ofNullable(valueMaybeNull));
ℹ️Further references: Issue#6776, PR#6765, PR#6783, PR#6797
Available in:Flowable,Observable,Maybe,Single,Completable
Turns ajava.util.stream.Stream into aFlowable orObservable and emits its items to consumers.
💡 BecauseStream is assumed to be having zero to N items (possibly infinite), there is no good way to expose it asMaybe,Single orCompletable.
Stream types (such asIntStream andDoubleStream). These streams have to be converted into their boxed variants via theirboxed() method. Since RxJava uses reference types, there is no way to optimize the interoperation with the primitive streams.
Flowable<Integer>stream =Flowable.fromStream(IntStream.range(1,10).boxed());Observable<Integer>stream2 =Observable.fromStream(list.stream());
ℹ️Further references: Issue#6776, PR#6765, PR#6797
Available in:Flowable,Observable,Maybe,Single,Completable
Wrap ajava.util.concurrent.CompletionStage instance (such asCompletableFuture) into a reactive type and expose its single value or failure as the appropriate reactive signal.
💡 ACompletionStage is like a hot source that is already executing or has already terminated, thus the wrapper is only there to observe the outcome, not to initiate the computation the stage represents.
CompletionStage interface doesn't support cancellation, thus canceling the reactive flow will not stop the givenCompletionStage.
Flowable.fromCompletionStageObservable.fromCompletionStageMaybe.fromCompletionStageSingle.fromCompletionStageCompletable.fromCompletionStage
Flowable<Integer>someAsync =Flowable.fromCompletionStage(operation.getAsync());Obervable<Integer>otherAsync =Observable.fromCompletionStage(CompletableFuture.completedFuture(1));Maybe<Object>failed =Maybe.fromCompletionStage(CompletableFurure.completedFuture(0) .thenAccept(v -> {thrownewRuntimeException(); })
ℹ️Further references: Issue#6776, PR#6765, PR#6783, PR#6797
Available in:Flowable,Observable,Maybe,Single,CompletableParallelFlowable
Transform the upstream item(s) intojava.util.Optional instances, then emit the non-empty value, or if theOptional is empty, skip to the next upstream value.
💡Completable has no items to map.
Flowable.mapOptionalObservable.mapOptionalMaybe.mapOptionalSingle.mapOptionalParallelFlowable.mapOptional(Function)ParallelFlowable.mapOptional(Function, BiFunction)ParallelFlowable.mapOptional(Function, ParallelFailureHandling)
Flowable.range(1,10).mapOptional(v ->v %2 ==0 ?Optional.of(v) :Optional.empty());Flowable.range(1,10).parallel().runOn(Schedulers.computation()).mapOptional(v ->v %2 ==0 ?Optional.of(v) :Optional.empty());.sequential();
ℹ️Further references: Issue#6776, PR#6775, PR#6783, PR#6797, PR#6798
Available in:Flowable,Observable,Maybe,Single,CompletableParallelFlowable
Provides the ability to aggregate items of aFlowable orObservable with the help of Java 8's rich set ofCollector implementations. SeeCollectors for its capabilities.
💡 Because a sequence is assumed to be having zero to N items (possibly infinite), there is no good reason to collect aMaybe,Single orCompletable.
Single<List<Integer>>list =Flowable.range(1,10).collect(Collectors.toList());Flowable<List<Integr>>list2 =Flowable.range(1,10).parallel().runOn(Schedulers.computation()).collect(Collectors.toList());
ℹ️Further references: Issue#6776, PR#6775, PR#6797, PR#6798
Available in:Flowable,Observable,Maybe,Single,Completable
Expose the first, only or very last item of aFlowable orObservable as ajava.util.concurrent.CompletionStage.
💡 ForMaybe,Single andCompletable, the equivalent operator is calledtoCompletionStage.
💡 Since a sequence can be empty, there are two variants to these methods: one that takes a default value for such an empty source and one that signals aNoSuchElementException via the returnedCompletionStage. These latter methods have theOrError in their method name.
Flowable.firstStage(T)Flowable.singleStage(T)Flowable.lastStage(T)Observable.firstStage(T)Observable.singleStage(T)Observable.lastStage(T)
// Signals 1CompletionStage<Integer>cs =Flowable.range(1,10) .firstStage(11);// Signals IndexOutOfBoundsException as the source has too many itemsCompletionStage<Integer>cs1 =Flowable.just(1,2) .singleStage(11);// Signals 11CompletionStage<Integer>cs2 =Observable.<Integer>empty() .lastStage(11);
ℹ️Further references: Issue#6776, PR#6775, PR#6797
Available in:Flowable,Observable,Maybe,Single,Completable
Expose the first, only or very last item of aFlowable orObservable as ajava.util.concurrent.CompletionStage, orsignalNoSuchElementException of the source sequence is empty..
💡 ForMaybe,Single andCompletable, the equivalent operator is calledtoCompletionStage.
💡 Since a sequence can be empty, there are two variants to these methods: one that takes a default value for such an empty source and one that signals aNoSuchElementException via the returnedCompletionStage. Theformer do not have theOrError in their method name.
Flowable.firstStage()Flowable.singleStage()Flowable.lastStage()Observable.firstStage()Observable.singleStage()Observable.lastStage()
// Signals 1CompletionStage<Integer>cs =Flowable.range(1,10) .firstOrErrorStage();// Signals IndexOutOfBoundsException as the source has too many itemsCompletionStage<Integer>cs1 =Flowable.just(1,2) .singleOrErrorStage();// Signals NoSuchElementExceptionCompletionStage<Integer>cs2 =Observable.<Integer>empty() .lastOrErrorStage();
ℹ️Further references: Issue#6776, PR#6775, PR#6797
Available in:Flowable,Observable,Maybe,Single,Completable
Expose the sigle value or termination of aMaybe,Single orCompletable as ajava.util.concurrent.CompletionStage.
💡 The equivalent operators inFlowable andObservable are calledfirstStage,singleStage,lastStage,firstOrErrorStage,singleOrErrorStage andlastOrErrorStage.
💡 TheMaybe andCompletable operators allow defining a default completion value in case the source turns out to be empty.
Maybe.toCompletionStage()Maybe.toCompletionStage()Single.toCompletionStage()Completable.toCompletionStage()
// Signals 1CompletionStage<Integer>cs =Maybe.just(1).toCompletionStage();// Signals NoSuchElementExceptionCompletionStage<Integer>cs =Maybe.empty().toCompletionStage();// Signals 10CompletionStage<Integer>cs =Maybe.empty().toCompletionStage(10);// Signals 10CompletionStage<String>cs2 =Completable.empty().toCompletionStage(10);
ℹ️Further references: Issue#6776, PR#6783
Available in:Flowable,Observable,Maybe,Single,Completable
Exposes aFlowable or anObservable as a blockingjava.util.stream.Stream.
💡 Streams are expected to have zero to N (possibly infinite) elements and thus there is no good reason to streamaMaybe,Single orCompletable. Use the appropriateblockingGet andblockingAwait methods instead.
Stream may cause leaks or computations running indefinitely. It is recommended one closes theStream manually or via thetry-with-resources construct of Java 7+.
Flowable.blockingStream()Flowable.blockingStream(int)Observable.blockingStream()Observable.blockingStream(int)
try (Streamstream =Flowable.range(1,10) .subscribeOn(Schedulers.computation()) .blockingStream()) {stream.limit(5) .forEach(System.out::println);}
ℹ️Further references: Issue#6776, PR#6779, PR#6797
Available in:Flowable,Observable,Maybe,Single,CompletableParallelFlowable
Maps each upstream item to ajava.util.stream.Stream and emits those inner items, in sequence, non-overlappingly to the downstream.
💡flatMapStream andconcatMapStream are essentially the same operators because consuming aStream is a sequential (and perhaps blocking) operation, thus there is no way two or more distinctStreams could get interleaved.
💡 Since aStream can be exposed as both backpressure-supportingFlowable or a backpressure-unsupportingObservable, the equivalent operators forMaybe andSingle are calledflattenStreamAsFlowable andflattenStreamAsObservable.
Stream types (such asIntStream andDoubleStream). These streams have to be converted into their boxed variants via theirboxed() method. Since RxJava uses reference types, there is no way to optimize the interoperation with the primitive streams.
Flowable.concatMapStream(Function)Flowable.concatMapStream(Function, int)Flowable.flatMapStream(Function)Flowable.flatMapStream(Function, int)Observable.concatMapStream(Function)Observable.flatMapStream(Function)ParallelFlowable.flatMapStream(Function)ParallelFlowable.flatMapStream(Function, int)
Flowable.range(1,10) .flatMapStream(v ->Arrays.asList(v,v +1).stream());Observable.range(1,10) .concatMapStream(v ->Arrays.asList(v,v +1).stream());Flowable.range(1,10) .parallel() .runOn(Schedulers.computation()) .flatMapStream(v ->Arrays.asList(v,v +1).stream());
ℹ️Further references: Issue#6776, PR#6779, PR#6797, PR#6798
Available in:Flowable,Observable,Maybe,Single,Completable
Maps success item into ajava.util.stream.Stream and emits those inner items.
💡 The equivalent operator is calledflatMapStream inFlowable,Observable andParallelFlowable.
💡 There are noflattenStreamAs methods inCompletable because it is always empty and has no item to map.
Stream types (such asIntStream andDoubleStream). These streams have to be converted into their boxed variants via theirboxed() method. Since RxJava uses reference types, there is no way to optimize the interoperation with the primitive streams.
Maybe.flattenStreamAsFlowableMaybe.flattenStreamAsObservableSingle.flattenStreamAsFlowableSingle.flattenStreamAsObservable
Flowable<Integer>f =Maybe.just(1).flattenStreamAsFlowable(v ->Arrays.asList(v,v +1).stream());Observable<Integer>o =Single.just(2).flattenStreamAsObservable(v ->IntStream.range(v,v +10).boxed());
ℹ️Further references: Issue#6776, PR#6805
The method was ambiguous and/or inviting wrong usage in other languages. They have now been renamed tostartWithArray,startWithIterable andstartWithItem:
ℹ️Further references: Issue#6122, PR#6530
The method was ambiguous and/or inviting wrong usage in other languages. They have now been renamed toonErrorResumeWith across all types.
Flowable.onErrorResumeWith()Observable.onErrorResumeWith()Maybe.onErrorResumeWith()Single.onErrorResumeWith()Completable.onErrorResumeWith()
ℹ️Further references: Issue#6551, PR#6556
Renamed to be plainzip to match the naming convention with other operators (i.e., Iterable/Source versions are named plainly, array-versions receive anArray postfix).
ℹ️Further references: Issue#6610, PR#6638
Renamed to be plaincombineLatestArray andcombineLatestArrayDelayError to match the naming convention with other operators (i.e., Iterable/Source versions are named plainly, array-versions receive anArray postfix).
Flowable.combineLatestArray()Flowable.combineLatestArrayDelayError()Observable.combineLatestArray()Observable.combineLatestArrayDelayError()
ℹ️Further references: Issue#6820, PR#6640, PR#6838
Renamed tosequenceEqual to match the naming in the other reactive classes.
ℹ️Further references: Issue#6854, PR#6856
The operator was confusing and has been renamed toflatMapSingle, replacing the originalMaybe.flatMapSingle. The original behavior (i.e., signalingNoSuchElementException if theMaybe is empty) can be emulated viatoSingle().
source.flatMapSingle(item ->singleSource).toSingle()
ℹ️Further references: Issue#6878, PR#6891
Operators accepting ajava.util.concurrent.Callable have been changed to acceptio.reactivex.rxjava3.functions.Supplier instead to enable the callbacks tothrow any kind of exceptions.
If the operator was used with a lambda, only a recompilation is needed:
Flowable.defer(() ->Flowable.just(Math.random()));
However, if explicit implementation was used:
Flowable.defer(newCallable<Double>() {@OverridepublicDoublecall()throwsException {returnMath.random(); }});
theinterface type (Callable ->Supplier) and themethod name (call ->get) has to be adjusted:
Flowable.defer(newSupplier<Double>() {@OverridepublicDoubleget()throwsException {returnMath.random(); }});
(Across all reactive types, multiple overloads.)
| defer | error | using |
| generate | buffer | collect |
| distinct | reduceWith | scanWith |
| toMap | toMultimap |
ℹ️Further references: PR#6511
Corrected the return type toSingle as now it is guaranteed to have a success item or an error.
ℹ️Further references: PR#6511
Change the order of thetillTheEnd argument inconcatMapDelayError andconcatMapEagerDelayError to be consistent with other operators taking a boolean parameter beforeprefetch/maxConcurrency.
Flowable.concatMapDelayError()Flowable.concatMapEagerDelayError()Observable.concatMapDelayError()Observable.concatMapEagerDelayError()
ℹ️Further references: Issue#6610, PR#6638
The signature was wrongly declared with aFlowable instead of a more generalPublisher.
ℹ️Further references: PR#6858
The operator was not in line with howflatMaps are expected to operate (i.e., it signaledNoSuchElementExceptionif theMaybe was empty). TheflatMapSingleElement operator has been renamed to be theflatMapSingle operator.
The original error behavior can be emulated viatoSingle():
source.flatMapSingle(item ->singleSource).toSingle()
ℹ️Further references: Issue#6878, PR#6891
ThegetValue() andgetValues(T[]) methods were a remnant from a time whereSubject andFlowableProcessor was unifying all state peeking methods for every kind of subject/processor. These have been marked as@Deprecated in 2.x and are now removed from 3.x. They can be trivially replaced withgetValue() if necessary, for example:
Objectvalue =subject.getValue();if (value ==null) {returnnewObject[1];}returnnewObject[] {value };
ℹ️Further references: Issue#5622, PR#6516
UseMaybe.defaultIfEmpty(T) instead.
ℹ️Further references: PR#6517
Removed fromFlowable andObservable. The 4th argument, theSubscription/Disposable callback, was more or less useless. UseFlowable.doOnSubscribe() andObservable.doOnSubscribe() instead. instead.
ℹ️Further references: PR#6517
Using a better terminology instead:ignoreElement().
ℹ️Further references: PR#6517
The behavior and signature were confusing (i.e., returningnull or aThrowable). UseblockingAwait() instead.
ℹ️Further references: PR#6517
Based on user feedback, the following methods have been removed fromTestSubscriber andTestObserver respectively due to being less useful outside testing RxJava itself:
| assertErrorMessage | assertFailure(Predicate, T...) | assertFailureAndMessage |
| assertNever(Predicate) | assertNever(T) | assertNoTimeout |
| assertNotSubscribed | assertNotTerminated | assertSubscribed |
| assertTerminated | assertTimeout | assertValueSequenceOnly |
| assertValueSet | assertValueSetOnly | awaitCount(int, Runnable) |
| awaitCount(int, Runnable, long) | awaitTerminalEvent | awaitTerminalEvent(long TimeUnit) |
| clearTimeout | completions | errorCount |
| errors | getEvents | isTerminated |
| isTimeout | lastThread | valueCount |
| assertOf |
ℹ️Further references: Issue#6153, PR#6526
Thereplay(Scheduler) and other overloads were carried over from the original Rx.NET API set but appears to be unused. Most use cases capture the connectable anyway so there is no much benefit from inlining anobserveOn into a connectable:
ConnectableFlowable<Integer>connectable =source.replay();Flowable<Integr>flowable =connectable.observeOn(Schedulers.io());// hand flowable to consumersflowable.subscribe();connectable.connect();
ℹ️Further references: PR#6539
TheFlowable.dematerialize() andObservable.dematerialize() were inherently type-unsafe and have been removed. In Rx.NET, the extension methods alloweddematerialize() to be applied toObservable<Notification<T>> only, but there is no way for doing it in Java as it has no extension methods and one can't restrict a method to appear only with a certain type argument scheme.
Usedeserialize(Function) instead.
Observable<Notification<Integer>>source = ...Observable<Integer>result =source.dematerialize(v ->v);
ℹ️Further references: PR#6539
The operator was apparently not used anywhere and has been removed from all types. It's function can be emulated viaonErrorResumeNext:
source.onErrorResumeNext(error ->errorinstanceofException ?fallback :Obserable.error(error))
ℹ️Further references: Issue#6554, PR#6564, PR#6844
This operator did not see much use and have been removed fromFlowable andObservable. It can be emulated with the plainsourced version:
source.buffer(Observable.defer(supplier).take(1).repeat())
ℹ️Further references: Issue#6555, PR#6564
Both the vararg overloaded versions ofcombineLatest andcombineLatestDelayError were awkward to use from other JVM languages and have been removed. UsecombineLatestArray andcombineLatestArrayDelayError instead.
ℹ️Further references: Issue#6634, PR#6635
Zip requires a known number of sources to work with. These overloads were just collecting up the inner sources for another overload. Removed from bothFlowable andObservable. They can be emulated via composition:
Observable<Observable<Integer>>sources = ...sources.toList().flatMapObservable(list ->Observable.zip(list,zipper));
ℹ️Further references: PR#6638
These were just convenience overloads forfromFuture().subscribeOn() all across. ApplysubscribeOn explicitly from now on.
Flowable.fromFuture(future).subscribeOn(Schedulers.io());Flowable.fromFuture(future,5,TimeUnit.SECONDS) .subscribeOn(Schedulers.io());
ℹ️Further references: Issue#6811, PR#6814
This overload had no effect because there is no buffering happening inside the operator (unlike in theFlowable variant). Use theObservable.concatMapIterable(Function) overload instead.
ℹ️Further references: Issue#6828, PR#6837
RxJava 3 still follows the Reactive Streams specification and as such, theio.reactivex.rxjava3.core.Flowable is a compatible source for any 3rd party solution accepting anorg.reactivestreams.Publisher as input.
Flowable can also wrap any suchorg.reactivestreams.Publisher in return.
ℹ️ Note that it is possible to interface RxJava's 2.xFlowable and 3.xFlowable this way, however, due to the specification requirements, this involves extra overhead. Instead, one should use adedicated interoperation library.
RxJava is more than 7 years old at this moment and many users are still stuck with 3rd party tools or libraries only supporting the RxJava 1 line.
To help with the situation, and also help with a gradual migration from 1.x to 3.x, an external interop library is provided:
https://github.com/akarnokd/RxJavaInterop#rxjavainterop
Migration from 2.x to 3.x could also be cumbersome as well as difficult because the 2.x line also amassed an ecosystem of tools and libraries of its own, which may take time to provide a native 3.x versions.
RxJava 3.x was structured, both in code and in Maven coordinates, to allow the existence of both 2.x and 3.x code side by side (or even have all 3 major versions at once).
There is limited interoperation between theFlowables through the Reactive StreamsPublisher interface (although not recommended due to extra overheads), however, there is no direct way for a 2.xObservable to talk to a 3.xObservable as they are completely separate types.
To help with the situation, and also help with a gradual migration from 2.x to 3.x, an external interop library is provided:
https://github.com/akarnokd/RxJavaBridge#rxjavabridge
The move to a Java 8 baseline was enabled by Android's improved (and upcoming)desugaring toolset.
Unfortunately, there was no indication if and when this tooling would support Java 9, more specifically, thejava.util.concurrent.Flow interfaces imported and standardized from the 3rd party Reactive Streams specification.
There is a semi-hiddenorg.reactivestreams.FlowAdapter class in the Reactive Streams library that could be used for conversion in-between, but yet again, it adds some extra overhead.
Therefore, an external, native interoperation library is provided to convert betweenjava.util.concurrent.Flow.Publisher andio.reactivex.rxjava3.core.Flowable as well asjava.util.concurrent.Flow.Processor andio.reactivex.rxjava3.processors.FlowableProcessor.
https://github.com/akarnokd/RxJavaJdk9Interop#rxjavajdk9interop
💡 Conversion to other RxJava 3 reactive types are not supported and the user is expected to apply the appropriate RxJava 3conversion method.
Since the Graphical User Interface librarySwing is not part of the Android platform, the desktop users of the JDK have to resort to an external library to work with GUI components and their event sources:
https://github.com/akarnokd/RxJavaSwing#rxjavaswing
There is a project in the works at Oracle trying to solve the asynchronous programming problems in a different way than RxJava andreactive programming has been doing it for more than a decade.
The idea is to have the user code appear to be imperative and blocking, but the JVM will make it so that actual, and resource-limited, native OS threads don't get blocked.
It could be years till the design and implementation ofProject Loom becomes mainstream in the JDK, and perhaps even more time until Android picks it up. Therefore, to allow early experimentation, an external library is provided to allow working with generators written in an imperative and (virtually) blocking fashion:
https://github.com/akarnokd/RxJavaFiberInterop#rxjavafiberinterop
💡 Since Project Loom offers a transparent way of turning blocking operations (such asCountDownLatch.await()) into economic virtual thread suspensions, there is no need to provide any specific conversion method from RxJava 3 toProject Loom; executing any of the standard RxJavablockingXXX method in a virtual thread automatically benefits from this transparent suspension.
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

