- Notifications
You must be signed in to change notification settings - Fork7.6k
What's different in 2.0
RxJava 2.0 has been completely rewritten from scratch on top of the Reactive-Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries.
Because Reactive-Streams has a different architecture, it mandates changes to some well known RxJava types. This wiki page attempts to summarize what has changed and describes how to rewrite 1.x code into 2.x code.
For technical details on how to write operators for 2.x, please visit theWriting Operators wiki page.
- Maven address and base package
- Javadoc
- Nulls
- Observable and Flowable
- Single
- Completable
- Maybe
- Base reactive interfaces
- Subjects and Processors
- Other classes
- Functional interfaces
- Subscriber
- Subscription
- Backpressure
- Reactive-Streams compliance
- Runtime hooks
- Error handling
- Scheduler
- Entering the reactive world
- Leaving the reactive world
- Testing
- Operator differences
- Miscellaneous changes
To allow having RxJava 1.x and RxJava 2.x side-by-side, RxJava 2.x is under the maven coordinatesio.reactivex.rxjava2:rxjava:2.x.y and classes are accessible belowio.reactivex.
Users switching from 1.x to 2.x have to re-organize their imports, but carefully.
The official Javadoc pages for 2.x is hosted athttp://reactivex.io/RxJava/2.x/javadoc/
RxJava 2.x no longer acceptsnull values and the following will yieldNullPointerException immediately or as a signal to downstream:
Observable.just(null);Single.just(null);Observable.fromCallable(() ->null) .subscribe(System.out::println,Throwable::printStackTrace);Observable.just(1).map(v ->null) .subscribe(System.out::println,Throwable::printStackTrace);
This means thatObservable<Void> can no longer emit any values but only terminate normally or with an exception. API designers may instead choose to defineObservable<Object> with no guarantee on whatObject will be (which should be irrelevant anyway). For example, if one needs a signaller-like source, a shared enum can be defined and its solo instanceonNext'd:
enumIrrelevant {INSTANCE; }Observable<Object>source =Observable.create((ObservableEmitter<Object>emitter) -> {System.out.println("Side-effect 1");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 2");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 3");emitter.onNext(Irrelevant.INSTANCE);});source.subscribe(e -> {/* Ignored. */ },Throwable::printStackTrace);
A small regret about introducing backpressure in RxJava 0.x is that instead of having a separate base reactive class, theObservable itself was retrofitted. The main issue with backpressure is that many hot sources, such as UI events, can't be reasonably backpressured and cause unexpectedMissingBackpressureException (i.e., beginners don't expect them).
We try to remedy this situation in 2.x by havingio.reactivex.Observable non-backpressured and the newio.reactivex.Flowable be the backpressure-enabled base reactive class.
The good news is that operator names remain (mostly) the same. The bad news is that one should be careful when performing 'organize imports' as it may select the non-backpressuredio.reactivex.Observable unintended.
When architecting dataflows (as an end-consumer of RxJava) or deciding upon what type your 2.x compatible library should take and return, you can consider a few factors that should help you avoid problems down the line such asMissingBackpressureException orOutOfMemoryError.
- You have a flow of no more than 1000 elements at its longest: i.e., you have so few elements over time that there is practically no chance for OOME in your application.
- You deal with GUI events such as mouse moves or touch events: these can rarely be backpressured reasonably and aren't that frequent. You may be able to handle an element frequency of 1000 Hz or less with Observable but consider using sampling/debouncing anyway.
- Your flow is essentially synchronous but your platform doesn't support Java Streams or you miss features from it. Using
Observablehas lower overhead in general thanFlowable.(You could also consider IxJava which is optimized for Iterable flows supporting Java 6+).
- Dealing with 10k+ of elements that are generated in some fashion somewhere and thus the chain can tell the source to limit the amount it generates.
- Reading (parsing) files from disk is inherently blocking and pull-based which works well with backpressure as you control, for example, how many lines you read from this for a specified request amount).
- Reading from a database through JDBC is also blocking and pull-based and is controlled by you by calling
ResultSet.next()for likely each downstream request. - Network (Streaming) IO where either the network helps or the protocol used supports requesting some logical amount.
- Many blocking and/or pull-based data sources which may eventually get a non-blocking reactive API/driver in the future.
The 2.xSingle reactive base type, which can emit a singleonSuccess oronError has been redesigned from scratch. Its architecture now derives from the Reactive-Streams design. Its consumer type (rx.Single.SingleSubscriber<T>) has been changed from being a class that acceptsrx.Subscription resources to be an interfaceio.reactivex.SingleObserver<T> that has only 3 methods:
interfaceSingleObserver<T> {voidonSubscribe(Disposabled);voidonSuccess(Tvalue);voidonError(Throwableerror);}
and follows the protocolonSubscribe (onSuccess | onError)?.
TheCompletable type remains largely the same. It was already designed along the Reactive-Streams style for 1.x so no user-level changes there.
Similar to the naming changes,rx.Completable.CompletableSubscriber has becomeio.reactivex.CompletableObserver withonSubscribe(Disposable):
interfaceCompletableObserver<T> {voidonSubscribe(Disposabled);voidonComplete();voidonError(Throwableerror);}
and still follows the protocolonSubscribe (onComplete | onError)?.
RxJava 2.0.0-RC2 introduced a new base reactive type calledMaybe. Conceptually, it is a union ofSingle andCompletable providing the means to capture an emission pattern where there could be 0 or 1 item or an error signalled by some reactive source.
TheMaybe class is accompanied byMaybeSource as its base interface type,MaybeObserver<T> as its signal-receiving interface and follows the protocolonSubscribe (onSuccess | onError | onComplete)?. Because there could be at most 1 element emitted, theMaybe type has no notion of backpressure (because there is no buffer bloat possible as with unknown lengthFlowables orObservables.
This means that an invocation ofonSubscribe(Disposable) is potentially followed by one of the otheronXXX methods. UnlikeFlowable, if there is only a single value to be signalled, onlyonSuccess is called andonComplete is not.
Working with this new base reactive type is practically the same as the others as it offers a modest subset of theFlowable operators that make sense with a 0 or 1 item sequence.
Maybe.just(1).map(v ->v +1).filter(v ->v ==1).defaultIfEmpty(2).test().assertResult(2);
Following the style of extending the Reactive-StreamsPublisher<T> inFlowable, the other base reactive classes now extend similar base interfaces (in packageio.reactivex):
interfaceObservableSource<T> {voidsubscribe(Observer<?superT>observer);}interfaceSingleSource<T> {voidsubscribe(SingleObserver<?superT>observer);}interfaceCompletableSource {voidsubscribe(CompletableObserverobserver);}interfaceMaybeSource<T> {voidsubscribe(MaybeObserver<?superT>observer);}
Therefore, many operators that required some reactive base type from the user now acceptPublisher andXSource:
Flowable<R>flatMap(Function<?superT, ?extendsPublisher<?extendsR>>mapper);Observable<R>flatMap(Function<?superT, ?extendsObservableSource<?extendsR>>mapper);
By havingPublisher as input this way, you can compose with other Reactive-Streams compliant libraries without the need to wrap them or convert them intoFlowable first.
If an operator has to offer a reactive base type, however, the user will receive the full reactive class (as giving out anXSource is practically useless as it doesn't have operators on it):
Flowable<Flowable<Integer>>windows =source.window(5);source.compose((Flowable<T>flowable) ->flowable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()));
In the Reactive-Streams specification, theSubject-like behavior, namely being a consumer and supplier of events at the same time, is done by theorg.reactivestreams.Processor interface. As with theObservable/Flowable split, the backpressure-aware, Reactive-Streams compliant implementations are based on theFlowableProcessor<T> class (which extendsFlowable to give a rich set of instance operators). An important change regardingSubjects (and by extension,FlowableProcessor) that they no longer supportT -> R like conversion (that is, input is of typeT and the output is of typeR). (We never had a use for it in 1.x and the originalSubject<T, R> came from .NET where there is aSubject<T> overload because .NET allows the same class name with a different number of type arguments.)
Theio.reactivex.subjects.AsyncSubject,io.reactivex.subjects.BehaviorSubject,io.reactivex.subjects.PublishSubject,io.reactivex.subjects.ReplaySubject andio.reactivex.subjects.UnicastSubject in 2.x don't support backpressure (as part of the 2.xObservable family).
Theio.reactivex.processors.AsyncProcessor,io.reactivex.processors.BehaviorProcessor,io.reactivex.processors.PublishProcessor,io.reactivex.processors.ReplayProcessor andio.reactivex.processors.UnicastProcessor are backpressure-aware. TheBehaviorProcessor andPublishProcessor don't coordinate requests (useFlowable.publish() for that) of their downstream subscribers and will signal themMissingBackpressureException if the downstream can't keep up. The otherXProcessor types honor backpressure of their downstream subscribers but otherwise, when subscribed to a source (optional), they consume it in an unbounded manner (requestingLong.MAX_VALUE).
The 1.xTestSubject has been dropped. Its functionality can be achieved viaTestScheduler,PublishProcessor/PublishSubject andobserveOn(testScheduler)/scheduler parameter.
TestSchedulerscheduler =newTestScheduler();PublishSubject<Integer>ps =PublishSubject.create();TestObserver<Integer>ts =ps.delay(1000,TimeUnit.MILLISECONDS,scheduler).test();ts.assertEmpty();ps.onNext(1);scheduler.advanceTimeBy(999,TimeUnit.MILLISECONDS);ts.assertEmpty();scheduler.advanceTimeBy(1,TimeUnit.MILLISECONDS);ts.assertValue(1);
TheSerializedSubject is no longer a public class. You have to useSubject.toSerialized() andFlowableProcessor.toSerialized() instead.
Therx.observables.ConnectableObservable is nowio.reactivex.observables.ConnectableObservable<T> andio.reactivex.flowables.ConnectableFlowable<T>.
Therx.observables.GroupedObservable is nowio.reactivex.observables.GroupedObservable<T> andio.reactivex.flowables.GroupedFlowable<T>.
In 1.x, you could create an instance withGroupedObservable.from() which was used internally by 1.x. In 2.x, all use cases now extendGroupedObservable directly thus the factory methods are no longer available; the whole class is now abstract.
You can extend the class and add your own customsubscribeActual behavior to achieve something similar to the 1.x features:
classMyGroup<K,V>extendsGroupedObservable<K,V> {finalKkey;finalSubject<V>subject;publicMyGroup(Kkey) {this.key =key;this.subject =PublishSubject.create(); }@OverridepublicTgetKey() {returnkey; }@OverrideprotectedvoidsubscribeActual(Observer<?superT>observer) {subject.subscribe(observer); }}
(The same approach works withGroupedFlowable as well.)
Because both 1.x and 2.x is aimed at Java 6+, we can't use the Java 8 functional interfaces such asjava.util.function.Function. Instead, we defined our own functional interfaces in 1.x and 2.x follows this tradition.
One notable difference is that all our functional interfaces now definethrows Exception. This is a large convenience for consumers and mappers that otherwise throw and would needtry-catch to transform or suppress a checked exception.
Flowable.just("file.txt").map(name ->Files.readLines(name)).subscribe(lines ->System.out.println(lines.size()),Throwable::printStackTrace);
If the file doesn't exist or can't be read properly, the end consumer will print outIOException directly. Note also theFiles.readLines(name) invoked without try-catch.
As the opportunity to reduce component count, 2.x doesn't defineAction3-Action9 andActionN (these were unused within RxJava itself anyway).
The remaining action interfaces were named according to the Java 8 functional types. The no argumentAction0 is replaced by theio.reactivex.functions.Action for the operators andjava.lang.Runnable for theScheduler methods.Action1 has been renamed toConsumer andAction2 is calledBiConsumer.ActionN is replaced by theConsumer<Object[]> type declaration.
We followed the naming convention of Java 8 by definingio.reactivex.functions.Function andio.reactivex.functions.BiFunction, plus renamingFunc3 -Func9 intoFunction3 -Function9 respectively. TheFuncN is replaced by theFunction<Object[], R> type declaration.
In addition, operators requiring a predicate no longer useFunc1<T, Boolean> but have a separate, primitive-returning type ofPredicate<T> (allows better inlining due to no autoboxing).
The Reactive-Streams specification has its own Subscriber as an interface. This interface is lightweight and combines request management with cancellation into a single interfaceorg.reactivestreams.Subscription instead of havingrx.Producer andrx.Subscription separately. This allows creating stream consumers with less internal state than the quite heavyrx.Subscriber of 1.x.
Flowable.range(1,10).subscribe(newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {s.request(Long.MAX_VALUE); }@OverridepublicvoidonNext(Integert) {System.out.println(t); }@OverridepublicvoidonError(Throwablet) {t.printStackTrace(); }@OverridepublicvoidonComplete() {System.out.println("Done"); }});
Due to the name conflict, replacing the package fromrx toorg.reactivestreams is not enough. In addition,org.reactivestreams.Subscriber has no notion of adding resources to it, cancelling it or requesting from the outside.
To bridge the gap we defined abstract classesDefaultSubscriber,ResourceSubscriber andDisposableSubscriber (plus theirXObserver variants) forFlowable (andObservable) respectively that offers resource tracking support (ofDisposables) just likerx.Subscriber and can be cancelled/disposed externally viadispose():
ResourceSubscriber<Integer>subscriber =newResourceSubscriber<Integer>() {@OverridepublicvoidonStart() {request(Long.MAX_VALUE); }@OverridepublicvoidonNext(Integert) {System.out.println(t); }@OverridepublicvoidonError(Throwablet) {t.printStackTrace(); }@OverridepublicvoidonComplete() {System.out.println("Done"); }};Flowable.range(1,10).delay(1,TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();
Note also that due to Reactive-Streams compatibility, the methodonCompleted has been renamed toonComplete without the trailingd.
Since 1.xObservable.subscribe(Subscriber) returnedSubscription, users often added theSubscription to aCompositeSubscription for example:
CompositeSubscriptioncomposite =newCompositeSubscription();composite.add(Observable.range(1,5).subscribe(newTestSubscriber<Integer>()));
Due to the Reactive-Streams specification,Publisher.subscribe returns void and the pattern by itself no longer works in 2.0. To remedy this, the methodE subscribeWith(E subscriber) has been added to each base reactive class which returns its input subscriber/observer as is. With the two examples before, the 2.x code can now look like this sinceResourceSubscriber implementsDisposable directly:
CompositeDisposablecomposite2 =newCompositeDisposable();composite2.add(Flowable.range(1,5).subscribeWith(subscriber));
Note that due to how request management works, callingrequest(n) fromSubscriber.onSubscribe orResourceSubscriber.onStart may trigger calls toonNext immediately before therequest() call itself returns to theonSubscribe/onStart method of yours:
Flowable.range(1,3).subscribe(newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {System.out.println("OnSubscribe start");s.request(Long.MAX_VALUE);System.out.println("OnSubscribe end"); }@OverridepublicvoidonNext(Integerv) {System.out.println(v); }@OverridepublicvoidonError(Throwablee) {e.printStackTrace(); }@OverridepublicvoidonComplete() {System.out.println("Done"); }});
This will print:
OnSubscribe start123DoneOnSubscribe endThe problem comes when one does some initialization inonSubscribe/onStart after callingrequest there andonNext may or may not see the effects of the initialization. To avoid this situation, make sure you callrequestafter all initialization have been done inonSubscribe/onStart.
This behavior differs from 1.x where arequest call went through a deferred logic that accumulated requests until an upstreamProducer arrived at some time (This nature adds overhead to all operators and consumers in 1.x.) In 2.x, there is always aSubscription coming down first and 90% of the time there is no need to defer requesting.
In RxJava 1.x, the interfacerx.Subscription was responsible for stream and resource lifecycle management, namely unsubscribing a sequence and releasing general resources such as scheduled tasks. The Reactive-Streams specification took this name for specifying an interaction point between a source and a consumer:org.reactivestreams.Subscription allows requesting a positive amount from the upstream and allows cancelling the sequence.
To avoid the name clash, the 1.xrx.Subscription has been renamed intoio.reactivex.Disposable (somewhat resembling .NET's own IDisposable).
Because Reactive-Streams base interface,org.reactivestreams.Publisher defines thesubscribe() method asvoid,Flowable.subscribe(Subscriber) no longer returns anySubscription (orDisposable). The other base reactive types also follow this signature with their respective subscriber types.
The other overloads ofsubscribe now returnDisposable in 2.x.
The originalSubscription container types have been renamed and updated
CompositeSubscriptiontoCompositeDisposableSerialSubscriptionandMultipleAssignmentSubscriptionhave been merged intoSerialDisposable. Theset()method disposes the old value andreplace()method does not.RefCountSubscriptionhas been removed.
The Reactive-Streams specification mandates operators supporting backpressure, specifically via the guarantee that they don't overflow their consumers when those don't request. Operators of the newFlowable base reactive type now consider downstream request amounts properly, however, this doesn't meanMissingBackpressureException is gone. The exception is still there but this time, the operator that can't signal moreonNext will signal this exception instead (allowing better identification of who is not properly backpressured).
As an alternative, the 2.xObservable doesn't do backpressure at all and is available as a choice to switch over.
updated in 2.0.7
TheFlowable-based sources and operators are, as of 2.0.7, fully Reactive-Streams version 1.0.0 specification compliant.
Before 2.0.7, the operatorstrict() had to be applied in order to achieve the same level of compliance. In 2.0.7, the operatorstrict() returnsthis, is deprecated and will be removed completely in 2.1.0.
As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a customio.reactivex.FlowableSubscriber interface (extendsorg.reactivestreams.Subscriber) but adds no new methods to it. The new interface isconstrained to RxJava 2 and represents a consumer toFlowable that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:
- §1.3 relaxation:
onSubscribemay run concurrently withonNextin case theFlowableSubscribercallsrequest()from insideonSubscribeand it is the resposibility ofFlowableSubscriberto ensure thread-safety between the remaining instructions inonSubscribeandonNext. - §2.3 relaxation: calling
Subscription.cancelandSubscription.requestfromFlowableSubscriber.onComplete()orFlowableSubscriber.onError()is considered a no-operation. - §2.12 relaxation: if the same
FlowableSubscriberinstance is subscribed to multiple sources, it must ensure itsonXXXmethods remain thread safe. - §3.9 relaxation: issuing a non-positive
request()will not stop the current stream but signal an error viaRxJavaPlugins.onError.
From a user's perspective, if one was using the thesubscribe methods other thanFlowable.subscribe(Subscriber<? super T>), there is no need to do anything regarding this change and there is no extra penalty for it.
If one was usingFlowable.subscribe(Subscriber<? super T>) with the built-in RxJavaSubscriber implementations such asDisposableSubscriber,TestSubscriber andResourceSubscriber, there is a small runtime overhead (oneinstanceof check) associated when the code is not recompiled against 2.0.7.
If a custom class implementingSubscriber was employed before, subscribing it to aFlowable adds an internal wrapper that ensures observing the Flowable is 100% compliant with the specification at the cost of some per-item overhead.
In order to help lift these extra overheads, a new methodFlowable.subscribe(FlowableSubscriber<? super T>) has been added which exposes the original behavior from before 2.0.7. It is recommended that new custom consumer implementations extendFlowableSubscriber instead of justSubscriber.
The 2.x redesigned theRxJavaPlugins class which now supports changing the hooks at runtime. Tests that want to override the schedulers and the lifecycle of the base reactive types can do it on a case-by-case basis through callback functions.
The class-basedRxJavaObservableHook and friends are now gone andRxJavaHooks functionality is incorporated intoRxJavaPlugins.
One important design requirement for 2.x is that noThrowable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
Such errors are routed to theRxJavaPlugins.onError handler. This handler can be overridden with the methodRxJavaPlugins.setErrorHandler(Consumer<Throwable>). Without a specific handler, RxJava defaults to printing theThrowable's stacktrace to the console and calls the current thread's uncaught exception handler.
On desktop Java, this latter handler does nothing on anExecutorService backedScheduler and the application can keep running. However, Android is more strict and terminates the application in such uncaught exception cases.
If this behavior is desirable can be debated, but in any case, if you want to avoid such calls to the uncaught exception handler, thefinal application that uses RxJava 2 (directly or transitively) should set a no-op handler:
// If Java 8 lambdas are supportedRxJavaPlugins.setErrorHandler(e -> { });// If no Retrolambda or JackRxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());
It is not advised intermediate libraries change the error handler outside their own testing environment.
Unfortunately, RxJava can't tell which of these out-of-lifecycle, undeliverable exceptions should or shouldn't crash your app. Identifying the source and reason for these exceptions can be tiresome, especially if they originate from a source and get routed toRxJavaPlugins.onError somewhere lower the chain.
Therefore, 2.0.6 introduces specific exception wrappers to help distinguish and track down what was happening the time of the error:
OnErrorNotImplementedException: reintroduced to detect when the user forgot to add error handling tosubscribe().ProtocolViolationException: indicates a bug in an operatorUndeliverableException: wraps the original exception that can't be delivered due to lifecycle restrictions on aSubscriber/Observer. It is automatically applied byRxJavaPlugins.onErrorwith intact stacktrace that may help find which exact operator rerouted the original error.
If an undeliverable exception is an instance/descendant ofNullPointerException,IllegalStateException (UndeliverableException andProtocolViolationException extend this),IllegalArgumentException,CompositeException,MissingBackpressureException orOnErrorNotImplementedException, theUndeliverableException wrapping doesn't happen.
In addition, some 3rd party libraries/code throw when they get interrupted by a cancel/dispose call which leads to an undeliverable exception most of the time. Internal changes in 2.0.6 now consistently cancel or dispose aSubscription/Disposable before cancelling/disposing a task or worker (which causes the interrupt on the target thread).
// in some librarytry {doSomethingBlockingly()}catch (InterruptedExceptionex) {// check if the interrupt is due to cancellation// if so, no need to signal the InterruptedExceptionif (!disposable.isDisposed()) {observer.onError(ex); }}
If the library/code already did this, the undeliverableInterruptedExceptions should stop now. If this pattern was not employed before, we encourage updating the code/library in question.
If one decides to add a non-empty global error consumer, here is an example that manages the typical undeliverable exceptions based on whether they represent a likely bug or an ignorable application/network state:
RxJavaPlugins.setErrorHandler(e -> {if (einstanceofUndeliverableException) {e =e.getCause(); }if ((einstanceofIOException) || (einstanceofSocketException)) {// fine, irrelevant network problem or API that throws on cancellationreturn; }if (einstanceofInterruptedException) {// fine, some blocking code was interrupted by a dispose callreturn; }if ((einstanceofNullPointerException) || (einstanceofIllegalArgumentException)) {// that's likely a bug in the applicationThread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(),e);return; }if (einstanceofIllegalStateException) {// that's a bug in RxJava or in a custom operatorThread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(),e);return; }Log.warning("Undeliverable exception received, not sure what to do",e);});
The 2.x API still supports the main default scheduler types:computation,io,newThread andtrampoline, accessible throughio.reactivex.schedulers.Schedulers utility class.
Theimmediate scheduler is not present in 2.x. It was frequently misused and didn't implement theScheduler specification correctly anyway; it contained blocking sleep for delayed action and didn't support recursive scheduling at all. UseSchedulers.trampoline() instead.
TheSchedulers.test() has been removed as well to avoid the conceptional difference with the rest of the default schedulers. Those return a "global" scheduler instance whereastest() returned always a new instance of theTestScheduler. Test developers are now encouraged to simplynew TestScheduler() in their code.
Theio.reactivex.Scheduler abstract base class now supports scheduling tasks directly without the need to create and then destroy aWorker (which is often forgotten):
publicabstractclassScheduler {publicDisposablescheduleDirect(Runnabletask) { ... }publicDisposablescheduleDirect(Runnabletask,longdelay,TimeUnitunit) { ... }publicDisposablescheduleDirectPeriodically(Runnabletask,longinitialDelay,longperiod,TimeUnitunit) { ... }publiclongnow(TimeUnitunit) { ... }// ... rest is the same: lifecycle methods, worker creation}
The main purpose is to avoid the tracking overhead of theWorkers for typically one-shot tasks. The methods have a default implementation that reusescreateWorker properly but can be overridden with more efficient implementations if necessary.
The method that returns the scheduler's own notion of current time,now() has been changed to accept aTimeUnit to indicate the unit of measure.
One of the design flaws of RxJava 1.x was the exposure of therx.Observable.create() method that while powerful, not the typical operator you want to use to enter the reactive world. Unfortunately, so many depend on it that we couldn't remove or rename it.
Since 2.x is a fresh start, we won't make that mistake again. Each reactive base typeFlowable,Observable,Single,Maybe andCompletable feature a safecreate operator that does the right thing regarding backpressure (forFlowable) and cancellation (all):
Flowable.create((FlowableEmitter<Integer>emitter) -> {emitter.onNext(1);emitter.onNext(2);emitter.onComplete();},BackpressureStrategy.BUFFER);
Practically, the 1.xfromEmitter (formerlyfromAsync) has been renamed toFlowable.create. The other base reactive types have similarcreate methods (minus the backpressure strategy).
Apart from subscribing to the base types with their respective consumers (Subscriber,Observer,SingleObserver,MaybeObserver andCompletableObserver) and functional-interface based consumers (such assubscribe(Consumer<T>, Consumer<Throwable>, Action)), the formerly separate 1.xBlockingObservable (and similar classes for the others) has been integrated with the main reactive type. Now you can directly block for some results by invoking ablockingX operation directly:
List<Integer>list =Flowable.range(1,100).toList().blockingGet();// toList() returns SingleIntegeri =Flowable.range(100,100).blockingLast();
(The reason for this is twofold: performance and ease of use of the library as a synchronous Java 8 Streams-like processor.)
Another significant difference betweenrx.Subscriber (and co) andorg.reactivestreams.Subscriber (and co) is that in 2.x, yourSubscribers andObservers are not allowed to throw anything but fatal exceptions (seeExceptions.throwIfFatal()). (The Reactive-Streams specification allows throwingNullPointerException if theonSubscribe,onNext oronError receives anull value, but RxJava doesn't letnulls in any way.) This means the following code is no longer legal:
Subscriber<Integer>subscriber =newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {s.request(Long.MAX_VALUE); }publicvoidonNext(Integert) {if (t ==1) {thrownewIllegalArgumentException(); } }publicvoidonError(Throwablee) {if (einstanceofIllegalArgumentException) {thrownewUnsupportedOperationException(); } }publicvoidonComplete() {thrownewNoSuchElementException(); }};Flowable.just(1).subscribe(subscriber);
The same applies toObserver,SingleObserver,MaybeObserver andCompletableObserver.
Since many of the existing code targeting 1.x do such things, the methodsafeSubscribe has been introduced that does handle these non-conforming consumers.
Alternatively, you can use thesubscribe(Consumer<T>, Consumer<Throwable>, Action) (and similar) methods to provide a callback/lambda that can throw:
Flowable.just(1).subscribe(subscriber::onNext,subscriber::onError,subscriber::onComplete,subscriber::onSubscribe);
Testing RxJava 2.x works the same way as it does in 1.x.Flowable can be tested withio.reactivex.subscribers.TestSubscriber whereas the non-backpressuredObservable,Single,Maybe andCompletable can be tested withio.reactivex.observers.TestObserver.
To support our internal testing, all base reactive types now featuretest() methods (which is a huge convenience for us) returningTestSubscriber orTestObserver:
TestSubscriber<Integer>ts =Flowable.range(1,5).test();TestObserver<Integer>to =Observable.range(1,5).test();TestObserver<Integer>tso =Single.just(1).test();TestObserver<Integer>tmo =Maybe.just(1).test();TestObserver<Integer>tco =Completable.complete().test();
The second convenience is that mostTestSubscriber/TestObserver methods return the instance itself allowing chaining the variousassertX methods. The third convenience is that you can now fluently test your sources without the need to create or introduceTestSubscriber/TestObserver instance in your code:
Flowable.range(1,5).test().assertResult(1,2,3,4,5);
assertResult(T... items): asserts if subscribed, received exactly the given items in the given order followed byonCompleteand no errorsassertFailure(Class<? extends Throwable> clazz, T... items): asserts if subscribed, received exactly the given items in the given order followed by aThrowableerror of wichclazz.isInstance()returns true.assertFailureAndMessage(Class<? extends Throwable> clazz, String message, T... items): same asassertFailureplus validates thegetMessage()contains the specified messageawaitDone(long time, TimeUnit unit)awaits a terminal event (blockingly) and cancels the sequence if the timeout elapsed.assertOf(Consumer<TestSubscriber<T>> consumer)compose some assertions into the fluent chain (used internally for fusion test as operator fusion is not part of the public API right now).
One of the benefits is that changingFlowable toObservable here the test code part doesn't have to change at all due to the implicit type change of theTestSubscriber toTestObserver.
Thetest() method onTestObserver has atest(boolean cancel) overload which cancels/disposes theTestSubscriber/TestObserver before it even gets subscribed:
PublishSubject<Integer>pp =PublishSubject.create();// nobody subscribed yetassertFalse(pp.hasSubscribers());pp.test(true);// nobody remained subscribedassertFalse(pp.hasSubscribers());
TestSubscriber has thetest(long initialRequest) andtest(long initialRequest, boolean cancel) overloads to specify the initial request amount and whether theTestSubscriber should be also immediately cancelled. If theinitialRequest is given, theTestSubscriber offers therequestMore(long) method to keep requesting in a fluent manner:
Flowable.range(1,5).test(0).assertValues().requestMore(1).assertValues(1).requestMore(2).assertValues(1,2,3).requestMore(2).assertResult(1,2,3,4,5);
or alternatively theTestSubscriber instance has to be captured to gain access to itsrequest() method:
PublishProcessor<Integer>pp =PublishProcessor.create();TestSubscriber<Integer>ts =pp.test(0L);ts.request(1);pp.onNext(1);pp.onNext(2);ts.assertFailure(MissingBackpressureException.class,1);
Given an asynchronous source, fluent blocking for a terminal event is now possible:
Flowable.just(1).subscribeOn(Schedulers.single()).test().awaitDone(5,TimeUnit.SECONDS).assertResult(1);
Those who are using Mockito and mockedObserver in 1.x has to mock theSubscriber.onSubscribe method to issue an initial request, otherwise, the sequence will hang or fail with hot sources:
@SuppressWarnings("unchecked")publicstatic <T>Subscriber<T>mockSubscriber() {Subscriber<T>w =mock(Subscriber.class);Mockito.doAnswer(newAnswer<Object>() {@OverridepublicObjectanswer(InvocationOnMocka)throwsThrowable {Subscriptions =a.getArgumentAt(0,Subscription.class);s.request(Long.MAX_VALUE);returnnull; } }).when(w).onSubscribe((Subscription)any());returnw;}
Most operators are still there in 2.x and practically all of them have the same behavior as they had in 1.x. The following subsections list each base reactive type and the difference between 1.x and 2.x.
Generally, many operators gained overloads that now allow specifying the internal buffer size or prefetch amount they should run their upstream (or inner sources).
Some operator overloads have been renamed with a postfix, such asfromArray,fromIterable etc. The reason for this is that when the library is compiled with Java 8, the javac often can't disambiguate between functional interface types.
Operators marked as@Beta or@Experimental in 1.x are promoted to standard.
| 1.x | 2.x |
|---|---|
amb | addedamb(ObservableSource...) overload, 2-9 argument versions dropped |
| RxRingBuffer.SIZE | bufferSize() |
combineLatest | added varargs overload, added overloads withbufferSize argument,combineLatest(List) dropped |
concat | added overload withprefetch argument, 5-9 source overloads dropped, useconcatArray instead |
| N/A | addedconcatArray andconcatArrayDelayError |
| N/A | addedconcatArrayEager andconcatArrayEagerDelayError |
concatDelayError | added overloads with option to delay till the current ends or till the very end |
concatEagerDelayError | added overloads with option to delay till the current ends or till the very end |
create(SyncOnSubscribe) | replaced withgenerate + overloads (distinct interfaces, you can implement them all at once) |
create(AsnycOnSubscribe) | not present |
create(OnSubscribe) | repurposed with safecreate(FlowableOnSubscribe, BackpressureStrategy), raw support viaunsafeCreate() |
from | disambiguated intofromArray,fromIterable,fromFuture |
| N/A | addedfromPublisher |
fromAsync | renamed tocreate() |
| N/A | addedintervalRange() |
limit | dropped, usetake |
merge | added overloads withprefetch |
mergeDelayError | added overloads withprefetch |
sequenceEqual | added overload withbufferSize |
switchOnNext | added overload withprefetch |
switchOnNextDelayError | added overload withprefetch |
timer | deprecated overloads dropped |
zip | added overloads withbufferSize anddelayErrors capabilities, disambiguated tozipArray andzipIterable |
| 1.x | 2.x |
|---|---|
all | RC3 returnsSingle<Boolean> now |
any | RC3 returnsSingle<Boolean> now |
asObservable | renamed tohide(), hides all identities now |
buffer | overloads with customCollection supplier |
cache(int) | deprecated and dropped |
collect | RC3 returnsSingle<U> |
collect(U, Action2<U, T>) | disambiguated tocollectInto andRC3 returnsSingle<U> |
concatMap | added overloads withprefetch |
concatMapDelayError | added overloads withprefetch, option to delay till the current ends or till the very end |
concatMapEager | added overloads withprefetch |
concatMapEagerDelayError | added overloads withprefetch, option to delay till the current ends or till the very end |
count | RC3 returnsSingle<Long> now |
countLong | dropped, usecount |
distinct | overload with customCollection supplier. |
doOnCompleted | renamed todoOnComplete, note the missingd! |
doOnUnsubscribe | renamed toFlowable.doOnCancel anddoOnDispose for the others,additional info |
| N/A | addeddoOnLifecylce to handleonSubscribe,request andcancel peeking |
elementAt(int) | RC3 no longer signals NoSuchElementException if the source is shorter than the index |
elementAt(Func1, int) | dropped, usefilter(predicate).elementAt(int) |
elementAtOrDefault(int, T) | renamed toelementAt(int, T) andRC3 returnsSingle<T> |
elementAtOrDefault(Func1, int, T) | dropped, usefilter(predicate).elementAt(int, T) |
first() | RC3 renamed tofirstElement and returnsMaybe<T> |
first(Func1) | dropped, usefilter(predicate).first() |
firstOrDefault(T) | renamed tofirst(T) andRC3 returnsSingle<T> |
firstOrDefault(Func1, T) | dropped, usefilter(predicate).first(T) |
flatMap | added overloads withprefetch |
| N/A | addedforEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]]) for conditionally stopping consumption |
groupBy | added overload withbufferSize anddelayError option,the custom internal map version didn't make it into RC1 |
ignoreElements | RC3 returnsCompletable |
isEmpty | RC3 returnsSingle<Boolean> |
last() | RC3 renamed tolastElement and returnsMaybe<T> |
last(Func1) | dropped, usefilter(predicate).last() |
lastOrDefault(T) | renamed tolast(T) andRC3 returnsSingle<T> |
lastOrDefault(Func1, T) | dropped, usefilter(predicate).last(T) |
nest | dropped, use manualjust |
publish(Func1) | added overload withprefetch |
reduce(Func2) | RC3 returnsMaybe<T> |
| N/A | addedreduceWith(Callable, BiFunction) to reduce in a Subscriber-individual manner, returnsSingle<T> |
| N/A | addedrepeatUntil(BooleanSupplier) |
repeatWhen(Func1, Scheduler) | dropped the overload, usesubscribeOn(Scheduler).repeatWhen(Function) instead |
retry | addedretry(Predicate),retry(int, Predicate) |
| N/A | addedretryUntil(BooleanSupplier) |
retryWhen(Func1, Scheduler) | dropped the overload, usesubscribeOn(Scheduler).retryWhen(Function) instead |
sample | doesn't emit the very last item if the upstream completes within the period, added overloads withemitLast parameter |
| N/A | addedscanWith(Callable, BiFunction) to scan in a Subscriber-individual manner |
single() | RC3 renamed tosingleElement and returnsMaybe<T> |
single(Func1) | dropped, usefilter(predicate).single() |
singleOrDefault(T) | renamed tosingle(T) andRC3 returnsSingle<T> |
singleOrDefault(Func1, T) | dropped, usefilter(predicate).single(T) |
skipLast | added overloads withbufferSize anddelayError options |
startWith | 2-9 argument version dropped, usestartWithArray instead |
| N/A | addedstartWithArray to disambiguate |
subscribe | No longer wraps all consumer types (i.e.,Observer) with a safety wrapper, (just like the 1.xunsafeSubscribe no longer available). UsesafeSubscribe to get an explicit safety wrapper around a consumer type. |
| N/A | addedsubscribeWith that returns its input after subscription |
switchMap | added overload withprefetch argument |
switchMapDelayError | added overload withprefetch argument |
takeLastBuffer | dropped |
| N/A | addedtest() (returns TestSubscriber subscribed to this) with overloads to fluently test |
throttleLast | doesn't emit the very last item if the upstream completes within the period, usesample with theemitLast parameter |
timeout(Func0<Observable>, ...) | signature changed totimeout(Publisher, ...) and dropped the function, usedefer(Callable<Publisher>>) if necessary |
toBlocking().y | inlined asblockingY() operators, excepttoFuture |
toCompletable | RC3 dropped, useignoreElements |
toList | RC3 returnsSingle<List<T>> |
toMap | RC3 returnsSingle<Map<K, V>> |
toMultimap | RC3 returnsSingle<Map<K, Collection<V>>> |
| N/A | addedtoFuture |
| N/A | addedtoObservable |
toSingle | RC3 dropped, usesingle(T) |
toSortedList | RC3 returnsSingle<List<T>> |
unsafeSubscribe | Removed as the Reactive Streams specification mandates theonXXX methods don't crash and therefore the default is to not have a safety net insubscribe. The newsafeSubscribe method was introduced to explicitly add the safety wrapper around a consumer type. |
withLatestFrom | 5-9 source overloads dropped |
zipWith | added overloads withprefetch anddelayErrors options |
Some operators that produced exactly one value or an error now returnSingle in 2.x (orMaybe if an empty source is allowed).
(Remark: this is "experimental" in RC2 and RC3 to see how it feels to program with such mixed-type sequences and whether or not there has to be too muchtoObservable/toFlowable back-conversion.)
| Operator | Old return type | New return type | Remark |
|---|---|---|---|
all(Predicate) | Observable<Boolean> | Single<Boolean> | Emits true if all elements match the predicate |
any(Predicate) | Observable<Boolean> | Single<Boolean> | Emits true if any elements match the predicate |
count() | Observable<Long> | Single<Long> | Counts the number of elements in the sequence |
elementAt(int) | Observable<T> | Maybe<T> | Emits the element at the given index or completes |
elementAt(int, T) | Observable<T> | Single<T> | Emits the element at the given index or the default |
elementAtOrError(int) | Observable<T> | Single<T> | Emits the indexth element or aNoSuchElementException |
first(T) | Observable<T> | Single<T> | Emits the very first element orNoSuchElementException |
firstElement() | Observable<T> | Maybe<T> | Emits the very first element or completes |
firstOrError() | Observable<T> | Single<T> | Emits the first element or aNoSuchElementException if the source is empty |
ignoreElements() | Observable<T> | Completable | Ignore all but the terminal events |
isEmpty() | Observable<Boolean> | Single<Boolean> | Emits true if the source is empty |
last(T) | Observable<T> | Single<T> | Emits the very last element or the default item |
lastElement() | Observable<T> | Maybe<T> | Emits the very last element or completes |
lastOrError() | Observable<T> | Single<T> | Emits the lastelement or aNoSuchElementException if the source is empty |
reduce(BiFunction) | Observable<T> | Maybe<T> | Emits the reduced value or completes |
reduce(Callable, BiFunction) | Observable<U> | Single<U> | Emits the reduced value (or the initial value) |
reduceWith(U, BiFunction) | Observable<U> | Single<U> | Emits the reduced value (or the initial value) |
single(T) | Observable<T> | Single<T> | Emits the only element or the default item |
singleElement() | Observable<T> | Maybe<T> | Emits the only element or completes |
singleOrError() | Observable<T> | Single<T> | Emits the one and only element, IndexOutOfBoundsException if the source is longer than 1 item or aNoSuchElementException if the source is empty |
toList() | Observable<List<T>> | Single<List<T>> | collects all elements into aList |
toMap() | Observable<Map<K, V>> | Single<Map<K, V>> | collects all elements into aMap |
toMultimap() | Observable<Map<K, Collection<V>>> | Single<Map<K, Collection<V>>> | collects all elements into aMap with collection |
toSortedList() | Observable<List<T>> | Single<List<T>> | collects all elements into aList and sorts it |
To make sure the final API of 2.0 is clean as possible, we remove methods and other components between release candidates without deprecating them.
| Removed in version | Component | Remark |
|---|---|---|
| RC3 | Flowable.toCompletable() | useFlowable.ignoreElements() |
| RC3 | Flowable.toSingle() | useFlowable.single(T) |
| RC3 | Flowable.toMaybe() | useFlowable.singleElement() |
| RC3 | Observable.toCompletable() | useObservable.ignoreElements() |
| RC3 | Observable.toSingle() | useObservable.single(T) |
| RC3 | Observable.toMaybe() | useObservable.singleElement() |
In 1.x, thedoOnUnsubscribe was always executed on a terminal event because 1.x'SafeSubscriber calledunsubscribe on itself. This was practically unnecessary and the Reactive-Streams specification states that when a terminal event arrives at aSubscriber, the upstreamSubscription should be considered cancelled and thus callingcancel() is a no-op.
For the same reason,unsubscribeOn is not called on the regular termination path but only when there is an actualcancel (ordispose) call on the chain.
Therefore, the following sequence won't calldoOnCancel:
Flowable.just(1,2,3).doOnCancel(() ->System.out.println("Cancelled!")).subscribe(System.out::println);
However, the following will call since thetake operator cancels after the set amount ofonNext events have been delivered:
Flowable.just(1,2,3).doOnCancel(() ->System.out.println("Cancelled!")).take(2).subscribe(System.out::println);
If you need to perform cleanup on both regular termination or cancellation, consider the operatorusing instead.
Alternatively, thedoFinally operator (introduced in 2.0.1 and standardized in 2.1) calls a developer specifiedAction that gets executed after a source completed, failed with an error or got cancelled/disposed:
Flowable.just(1,2,3).doFinally(() ->System.out.println("Finally")).subscribe(System.out::println);Flowable.just(1,2,3).doFinally(() ->System.out.println("Finally")).take(2)// cancels the above after 2 elements.subscribe(System.out::println);
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava