Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

What's different in 2.0

akarnokd edited this pageOct 9, 2018 ·68 revisions

RxJava 2.0 has been completely rewritten from scratch on top of the Reactive-Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries.

Because Reactive-Streams has a different architecture, it mandates changes to some well known RxJava types. This wiki page attempts to summarize what has changed and describes how to rewrite 1.x code into 2.x code.

For technical details on how to write operators for 2.x, please visit theWriting Operators wiki page.

Contents

Maven address and base package

To allow having RxJava 1.x and RxJava 2.x side-by-side, RxJava 2.x is under the maven coordinatesio.reactivex.rxjava2:rxjava:2.x.y and classes are accessible belowio.reactivex.

Users switching from 1.x to 2.x have to re-organize their imports, but carefully.

Javadoc

The official Javadoc pages for 2.x is hosted athttp://reactivex.io/RxJava/2.x/javadoc/

Nulls

RxJava 2.x no longer acceptsnull values and the following will yieldNullPointerException immediately or as a signal to downstream:

Observable.just(null);Single.just(null);Observable.fromCallable(() ->null)    .subscribe(System.out::println,Throwable::printStackTrace);Observable.just(1).map(v ->null)    .subscribe(System.out::println,Throwable::printStackTrace);

This means thatObservable<Void> can no longer emit any values but only terminate normally or with an exception. API designers may instead choose to defineObservable<Object> with no guarantee on whatObject will be (which should be irrelevant anyway). For example, if one needs a signaller-like source, a shared enum can be defined and its solo instanceonNext'd:

enumIrrelevant {INSTANCE; }Observable<Object>source =Observable.create((ObservableEmitter<Object>emitter) -> {System.out.println("Side-effect 1");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 2");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 3");emitter.onNext(Irrelevant.INSTANCE);});source.subscribe(e -> {/* Ignored. */ },Throwable::printStackTrace);

Observable and Flowable

A small regret about introducing backpressure in RxJava 0.x is that instead of having a separate base reactive class, theObservable itself was retrofitted. The main issue with backpressure is that many hot sources, such as UI events, can't be reasonably backpressured and cause unexpectedMissingBackpressureException (i.e., beginners don't expect them).

We try to remedy this situation in 2.x by havingio.reactivex.Observable non-backpressured and the newio.reactivex.Flowable be the backpressure-enabled base reactive class.

The good news is that operator names remain (mostly) the same. The bad news is that one should be careful when performing 'organize imports' as it may select the non-backpressuredio.reactivex.Observable unintended.

Which type to use?

When architecting dataflows (as an end-consumer of RxJava) or deciding upon what type your 2.x compatible library should take and return, you can consider a few factors that should help you avoid problems down the line such asMissingBackpressureException orOutOfMemoryError.

When to use Observable

  • You have a flow of no more than 1000 elements at its longest: i.e., you have so few elements over time that there is practically no chance for OOME in your application.
  • You deal with GUI events such as mouse moves or touch events: these can rarely be backpressured reasonably and aren't that frequent. You may be able to handle an element frequency of 1000 Hz or less with Observable but consider using sampling/debouncing anyway.
  • Your flow is essentially synchronous but your platform doesn't support Java Streams or you miss features from it. UsingObservable has lower overhead in general thanFlowable.(You could also consider IxJava which is optimized for Iterable flows supporting Java 6+).

When to use Flowable

  • Dealing with 10k+ of elements that are generated in some fashion somewhere and thus the chain can tell the source to limit the amount it generates.
  • Reading (parsing) files from disk is inherently blocking and pull-based which works well with backpressure as you control, for example, how many lines you read from this for a specified request amount).
  • Reading from a database through JDBC is also blocking and pull-based and is controlled by you by callingResultSet.next() for likely each downstream request.
  • Network (Streaming) IO where either the network helps or the protocol used supports requesting some logical amount.
  • Many blocking and/or pull-based data sources which may eventually get a non-blocking reactive API/driver in the future.

Single

The 2.xSingle reactive base type, which can emit a singleonSuccess oronError has been redesigned from scratch. Its architecture now derives from the Reactive-Streams design. Its consumer type (rx.Single.SingleSubscriber<T>) has been changed from being a class that acceptsrx.Subscription resources to be an interfaceio.reactivex.SingleObserver<T> that has only 3 methods:

interfaceSingleObserver<T> {voidonSubscribe(Disposabled);voidonSuccess(Tvalue);voidonError(Throwableerror);}

and follows the protocolonSubscribe (onSuccess | onError)?.

Completable

TheCompletable type remains largely the same. It was already designed along the Reactive-Streams style for 1.x so no user-level changes there.

Similar to the naming changes,rx.Completable.CompletableSubscriber has becomeio.reactivex.CompletableObserver withonSubscribe(Disposable):

interfaceCompletableObserver<T> {voidonSubscribe(Disposabled);voidonComplete();voidonError(Throwableerror);}

and still follows the protocolonSubscribe (onComplete | onError)?.

Maybe

RxJava 2.0.0-RC2 introduced a new base reactive type calledMaybe. Conceptually, it is a union ofSingle andCompletable providing the means to capture an emission pattern where there could be 0 or 1 item or an error signalled by some reactive source.

TheMaybe class is accompanied byMaybeSource as its base interface type,MaybeObserver<T> as its signal-receiving interface and follows the protocolonSubscribe (onSuccess | onError | onComplete)?. Because there could be at most 1 element emitted, theMaybe type has no notion of backpressure (because there is no buffer bloat possible as with unknown lengthFlowables orObservables.

This means that an invocation ofonSubscribe(Disposable) is potentially followed by one of the otheronXXX methods. UnlikeFlowable, if there is only a single value to be signalled, onlyonSuccess is called andonComplete is not.

Working with this new base reactive type is practically the same as the others as it offers a modest subset of theFlowable operators that make sense with a 0 or 1 item sequence.

Maybe.just(1).map(v ->v +1).filter(v ->v ==1).defaultIfEmpty(2).test().assertResult(2);

Base reactive interfaces

Following the style of extending the Reactive-StreamsPublisher<T> inFlowable, the other base reactive classes now extend similar base interfaces (in packageio.reactivex):

interfaceObservableSource<T> {voidsubscribe(Observer<?superT>observer);}interfaceSingleSource<T> {voidsubscribe(SingleObserver<?superT>observer);}interfaceCompletableSource {voidsubscribe(CompletableObserverobserver);}interfaceMaybeSource<T> {voidsubscribe(MaybeObserver<?superT>observer);}

Therefore, many operators that required some reactive base type from the user now acceptPublisher andXSource:

Flowable<R>flatMap(Function<?superT, ?extendsPublisher<?extendsR>>mapper);Observable<R>flatMap(Function<?superT, ?extendsObservableSource<?extendsR>>mapper);

By havingPublisher as input this way, you can compose with other Reactive-Streams compliant libraries without the need to wrap them or convert them intoFlowable first.

If an operator has to offer a reactive base type, however, the user will receive the full reactive class (as giving out anXSource is practically useless as it doesn't have operators on it):

Flowable<Flowable<Integer>>windows =source.window(5);source.compose((Flowable<T>flowable) ->flowable    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread()));

Subjects and Processors

In the Reactive-Streams specification, theSubject-like behavior, namely being a consumer and supplier of events at the same time, is done by theorg.reactivestreams.Processor interface. As with theObservable/Flowable split, the backpressure-aware, Reactive-Streams compliant implementations are based on theFlowableProcessor<T> class (which extendsFlowable to give a rich set of instance operators). An important change regardingSubjects (and by extension,FlowableProcessor) that they no longer supportT -> R like conversion (that is, input is of typeT and the output is of typeR). (We never had a use for it in 1.x and the originalSubject<T, R> came from .NET where there is aSubject<T> overload because .NET allows the same class name with a different number of type arguments.)

Theio.reactivex.subjects.AsyncSubject,io.reactivex.subjects.BehaviorSubject,io.reactivex.subjects.PublishSubject,io.reactivex.subjects.ReplaySubject andio.reactivex.subjects.UnicastSubject in 2.x don't support backpressure (as part of the 2.xObservable family).

Theio.reactivex.processors.AsyncProcessor,io.reactivex.processors.BehaviorProcessor,io.reactivex.processors.PublishProcessor,io.reactivex.processors.ReplayProcessor andio.reactivex.processors.UnicastProcessor are backpressure-aware. TheBehaviorProcessor andPublishProcessor don't coordinate requests (useFlowable.publish() for that) of their downstream subscribers and will signal themMissingBackpressureException if the downstream can't keep up. The otherXProcessor types honor backpressure of their downstream subscribers but otherwise, when subscribed to a source (optional), they consume it in an unbounded manner (requestingLong.MAX_VALUE).

TestSubject

The 1.xTestSubject has been dropped. Its functionality can be achieved viaTestScheduler,PublishProcessor/PublishSubject andobserveOn(testScheduler)/scheduler parameter.

TestSchedulerscheduler =newTestScheduler();PublishSubject<Integer>ps =PublishSubject.create();TestObserver<Integer>ts =ps.delay(1000,TimeUnit.MILLISECONDS,scheduler).test();ts.assertEmpty();ps.onNext(1);scheduler.advanceTimeBy(999,TimeUnit.MILLISECONDS);ts.assertEmpty();scheduler.advanceTimeBy(1,TimeUnit.MILLISECONDS);ts.assertValue(1);

SerializedSubject

TheSerializedSubject is no longer a public class. You have to useSubject.toSerialized() andFlowableProcessor.toSerialized() instead.

Other classes

Therx.observables.ConnectableObservable is nowio.reactivex.observables.ConnectableObservable<T> andio.reactivex.flowables.ConnectableFlowable<T>.

GroupedObservable

Therx.observables.GroupedObservable is nowio.reactivex.observables.GroupedObservable<T> andio.reactivex.flowables.GroupedFlowable<T>.

In 1.x, you could create an instance withGroupedObservable.from() which was used internally by 1.x. In 2.x, all use cases now extendGroupedObservable directly thus the factory methods are no longer available; the whole class is now abstract.

You can extend the class and add your own customsubscribeActual behavior to achieve something similar to the 1.x features:

classMyGroup<K,V>extendsGroupedObservable<K,V> {finalKkey;finalSubject<V>subject;publicMyGroup(Kkey) {this.key =key;this.subject =PublishSubject.create();    }@OverridepublicTgetKey() {returnkey;    }@OverrideprotectedvoidsubscribeActual(Observer<?superT>observer) {subject.subscribe(observer);    }}

(The same approach works withGroupedFlowable as well.)

Functional interfaces

Because both 1.x and 2.x is aimed at Java 6+, we can't use the Java 8 functional interfaces such asjava.util.function.Function. Instead, we defined our own functional interfaces in 1.x and 2.x follows this tradition.

One notable difference is that all our functional interfaces now definethrows Exception. This is a large convenience for consumers and mappers that otherwise throw and would needtry-catch to transform or suppress a checked exception.

Flowable.just("file.txt").map(name ->Files.readLines(name)).subscribe(lines ->System.out.println(lines.size()),Throwable::printStackTrace);

If the file doesn't exist or can't be read properly, the end consumer will print outIOException directly. Note also theFiles.readLines(name) invoked without try-catch.

Actions

As the opportunity to reduce component count, 2.x doesn't defineAction3-Action9 andActionN (these were unused within RxJava itself anyway).

The remaining action interfaces were named according to the Java 8 functional types. The no argumentAction0 is replaced by theio.reactivex.functions.Action for the operators andjava.lang.Runnable for theScheduler methods.Action1 has been renamed toConsumer andAction2 is calledBiConsumer.ActionN is replaced by theConsumer<Object[]> type declaration.

Functions

We followed the naming convention of Java 8 by definingio.reactivex.functions.Function andio.reactivex.functions.BiFunction, plus renamingFunc3 -Func9 intoFunction3 -Function9 respectively. TheFuncN is replaced by theFunction<Object[], R> type declaration.

In addition, operators requiring a predicate no longer useFunc1<T, Boolean> but have a separate, primitive-returning type ofPredicate<T> (allows better inlining due to no autoboxing).

Subscriber

The Reactive-Streams specification has its own Subscriber as an interface. This interface is lightweight and combines request management with cancellation into a single interfaceorg.reactivestreams.Subscription instead of havingrx.Producer andrx.Subscription separately. This allows creating stream consumers with less internal state than the quite heavyrx.Subscriber of 1.x.

Flowable.range(1,10).subscribe(newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {s.request(Long.MAX_VALUE);    }@OverridepublicvoidonNext(Integert) {System.out.println(t);    }@OverridepublicvoidonError(Throwablet) {t.printStackTrace();    }@OverridepublicvoidonComplete() {System.out.println("Done");    }});

Due to the name conflict, replacing the package fromrx toorg.reactivestreams is not enough. In addition,org.reactivestreams.Subscriber has no notion of adding resources to it, cancelling it or requesting from the outside.

To bridge the gap we defined abstract classesDefaultSubscriber,ResourceSubscriber andDisposableSubscriber (plus theirXObserver variants) forFlowable (andObservable) respectively that offers resource tracking support (ofDisposables) just likerx.Subscriber and can be cancelled/disposed externally viadispose():

ResourceSubscriber<Integer>subscriber =newResourceSubscriber<Integer>() {@OverridepublicvoidonStart() {request(Long.MAX_VALUE);    }@OverridepublicvoidonNext(Integert) {System.out.println(t);    }@OverridepublicvoidonError(Throwablet) {t.printStackTrace();    }@OverridepublicvoidonComplete() {System.out.println("Done");    }};Flowable.range(1,10).delay(1,TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();

Note also that due to Reactive-Streams compatibility, the methodonCompleted has been renamed toonComplete without the trailingd.

Since 1.xObservable.subscribe(Subscriber) returnedSubscription, users often added theSubscription to aCompositeSubscription for example:

CompositeSubscriptioncomposite =newCompositeSubscription();composite.add(Observable.range(1,5).subscribe(newTestSubscriber<Integer>()));

Due to the Reactive-Streams specification,Publisher.subscribe returns void and the pattern by itself no longer works in 2.0. To remedy this, the methodE subscribeWith(E subscriber) has been added to each base reactive class which returns its input subscriber/observer as is. With the two examples before, the 2.x code can now look like this sinceResourceSubscriber implementsDisposable directly:

CompositeDisposablecomposite2 =newCompositeDisposable();composite2.add(Flowable.range(1,5).subscribeWith(subscriber));

Calling request from onSubscribe/onStart

Note that due to how request management works, callingrequest(n) fromSubscriber.onSubscribe orResourceSubscriber.onStart may trigger calls toonNext immediately before therequest() call itself returns to theonSubscribe/onStart method of yours:

Flowable.range(1,3).subscribe(newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {System.out.println("OnSubscribe start");s.request(Long.MAX_VALUE);System.out.println("OnSubscribe end");    }@OverridepublicvoidonNext(Integerv) {System.out.println(v);    }@OverridepublicvoidonError(Throwablee) {e.printStackTrace();    }@OverridepublicvoidonComplete() {System.out.println("Done");    }});

This will print:

OnSubscribe start123DoneOnSubscribe end

The problem comes when one does some initialization inonSubscribe/onStart after callingrequest there andonNext may or may not see the effects of the initialization. To avoid this situation, make sure you callrequestafter all initialization have been done inonSubscribe/onStart.

This behavior differs from 1.x where arequest call went through a deferred logic that accumulated requests until an upstreamProducer arrived at some time (This nature adds overhead to all operators and consumers in 1.x.) In 2.x, there is always aSubscription coming down first and 90% of the time there is no need to defer requesting.

Subscription

In RxJava 1.x, the interfacerx.Subscription was responsible for stream and resource lifecycle management, namely unsubscribing a sequence and releasing general resources such as scheduled tasks. The Reactive-Streams specification took this name for specifying an interaction point between a source and a consumer:org.reactivestreams.Subscription allows requesting a positive amount from the upstream and allows cancelling the sequence.

To avoid the name clash, the 1.xrx.Subscription has been renamed intoio.reactivex.Disposable (somewhat resembling .NET's own IDisposable).

Because Reactive-Streams base interface,org.reactivestreams.Publisher defines thesubscribe() method asvoid,Flowable.subscribe(Subscriber) no longer returns anySubscription (orDisposable). The other base reactive types also follow this signature with their respective subscriber types.

The other overloads ofsubscribe now returnDisposable in 2.x.

The originalSubscription container types have been renamed and updated

  • CompositeSubscription toCompositeDisposable
  • SerialSubscription andMultipleAssignmentSubscription have been merged intoSerialDisposable. Theset() method disposes the old value andreplace() method does not.
  • RefCountSubscription has been removed.

Backpressure

The Reactive-Streams specification mandates operators supporting backpressure, specifically via the guarantee that they don't overflow their consumers when those don't request. Operators of the newFlowable base reactive type now consider downstream request amounts properly, however, this doesn't meanMissingBackpressureException is gone. The exception is still there but this time, the operator that can't signal moreonNext will signal this exception instead (allowing better identification of who is not properly backpressured).

As an alternative, the 2.xObservable doesn't do backpressure at all and is available as a choice to switch over.

Reactive-Streams compliance

updated in 2.0.7

TheFlowable-based sources and operators are, as of 2.0.7, fully Reactive-Streams version 1.0.0 specification compliant.

Before 2.0.7, the operatorstrict() had to be applied in order to achieve the same level of compliance. In 2.0.7, the operatorstrict() returnsthis, is deprecated and will be removed completely in 2.1.0.

As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a customio.reactivex.FlowableSubscriber interface (extendsorg.reactivestreams.Subscriber) but adds no new methods to it. The new interface isconstrained to RxJava 2 and represents a consumer toFlowable that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:

  • §1.3 relaxation:onSubscribe may run concurrently withonNext in case theFlowableSubscriber callsrequest() from insideonSubscribe and it is the resposibility ofFlowableSubscriber to ensure thread-safety between the remaining instructions inonSubscribe andonNext.
  • §2.3 relaxation: callingSubscription.cancel andSubscription.request fromFlowableSubscriber.onComplete() orFlowableSubscriber.onError() is considered a no-operation.
  • §2.12 relaxation: if the sameFlowableSubscriber instance is subscribed to multiple sources, it must ensure itsonXXX methods remain thread safe.
  • §3.9 relaxation: issuing a non-positiverequest() will not stop the current stream but signal an error viaRxJavaPlugins.onError.

From a user's perspective, if one was using the thesubscribe methods other thanFlowable.subscribe(Subscriber<? super T>), there is no need to do anything regarding this change and there is no extra penalty for it.

If one was usingFlowable.subscribe(Subscriber<? super T>) with the built-in RxJavaSubscriber implementations such asDisposableSubscriber,TestSubscriber andResourceSubscriber, there is a small runtime overhead (oneinstanceof check) associated when the code is not recompiled against 2.0.7.

If a custom class implementingSubscriber was employed before, subscribing it to aFlowable adds an internal wrapper that ensures observing the Flowable is 100% compliant with the specification at the cost of some per-item overhead.

In order to help lift these extra overheads, a new methodFlowable.subscribe(FlowableSubscriber<? super T>) has been added which exposes the original behavior from before 2.0.7. It is recommended that new custom consumer implementations extendFlowableSubscriber instead of justSubscriber.

Runtime hooks

The 2.x redesigned theRxJavaPlugins class which now supports changing the hooks at runtime. Tests that want to override the schedulers and the lifecycle of the base reactive types can do it on a case-by-case basis through callback functions.

The class-basedRxJavaObservableHook and friends are now gone andRxJavaHooks functionality is incorporated intoRxJavaPlugins.

Error handling

One important design requirement for 2.x is that noThrowable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.

Such errors are routed to theRxJavaPlugins.onError handler. This handler can be overridden with the methodRxJavaPlugins.setErrorHandler(Consumer<Throwable>). Without a specific handler, RxJava defaults to printing theThrowable's stacktrace to the console and calls the current thread's uncaught exception handler.

On desktop Java, this latter handler does nothing on anExecutorService backedScheduler and the application can keep running. However, Android is more strict and terminates the application in such uncaught exception cases.

If this behavior is desirable can be debated, but in any case, if you want to avoid such calls to the uncaught exception handler, thefinal application that uses RxJava 2 (directly or transitively) should set a no-op handler:

// If Java 8 lambdas are supportedRxJavaPlugins.setErrorHandler(e -> { });// If no Retrolambda or JackRxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());

It is not advised intermediate libraries change the error handler outside their own testing environment.

Unfortunately, RxJava can't tell which of these out-of-lifecycle, undeliverable exceptions should or shouldn't crash your app. Identifying the source and reason for these exceptions can be tiresome, especially if they originate from a source and get routed toRxJavaPlugins.onError somewhere lower the chain.

Therefore, 2.0.6 introduces specific exception wrappers to help distinguish and track down what was happening the time of the error:

  • OnErrorNotImplementedException: reintroduced to detect when the user forgot to add error handling tosubscribe().
  • ProtocolViolationException: indicates a bug in an operator
  • UndeliverableException: wraps the original exception that can't be delivered due to lifecycle restrictions on aSubscriber/Observer. It is automatically applied byRxJavaPlugins.onError with intact stacktrace that may help find which exact operator rerouted the original error.

If an undeliverable exception is an instance/descendant ofNullPointerException,IllegalStateException (UndeliverableException andProtocolViolationException extend this),IllegalArgumentException,CompositeException,MissingBackpressureException orOnErrorNotImplementedException, theUndeliverableException wrapping doesn't happen.

In addition, some 3rd party libraries/code throw when they get interrupted by a cancel/dispose call which leads to an undeliverable exception most of the time. Internal changes in 2.0.6 now consistently cancel or dispose aSubscription/Disposable before cancelling/disposing a task or worker (which causes the interrupt on the target thread).

// in some librarytry {doSomethingBlockingly()}catch (InterruptedExceptionex) {// check if the interrupt is due to cancellation// if so, no need to signal the InterruptedExceptionif (!disposable.isDisposed()) {observer.onError(ex);   }}

If the library/code already did this, the undeliverableInterruptedExceptions should stop now. If this pattern was not employed before, we encourage updating the code/library in question.

If one decides to add a non-empty global error consumer, here is an example that manages the typical undeliverable exceptions based on whether they represent a likely bug or an ignorable application/network state:

RxJavaPlugins.setErrorHandler(e -> {if (einstanceofUndeliverableException) {e =e.getCause();    }if ((einstanceofIOException) || (einstanceofSocketException)) {// fine, irrelevant network problem or API that throws on cancellationreturn;    }if (einstanceofInterruptedException) {// fine, some blocking code was interrupted by a dispose callreturn;    }if ((einstanceofNullPointerException) || (einstanceofIllegalArgumentException)) {// that's likely a bug in the applicationThread.currentThread().getUncaughtExceptionHandler()            .handleException(Thread.currentThread(),e);return;    }if (einstanceofIllegalStateException) {// that's a bug in RxJava or in a custom operatorThread.currentThread().getUncaughtExceptionHandler()            .handleException(Thread.currentThread(),e);return;    }Log.warning("Undeliverable exception received, not sure what to do",e);});

Schedulers

The 2.x API still supports the main default scheduler types:computation,io,newThread andtrampoline, accessible throughio.reactivex.schedulers.Schedulers utility class.

Theimmediate scheduler is not present in 2.x. It was frequently misused and didn't implement theScheduler specification correctly anyway; it contained blocking sleep for delayed action and didn't support recursive scheduling at all. UseSchedulers.trampoline() instead.

TheSchedulers.test() has been removed as well to avoid the conceptional difference with the rest of the default schedulers. Those return a "global" scheduler instance whereastest() returned always a new instance of theTestScheduler. Test developers are now encouraged to simplynew TestScheduler() in their code.

Theio.reactivex.Scheduler abstract base class now supports scheduling tasks directly without the need to create and then destroy aWorker (which is often forgotten):

publicabstractclassScheduler {publicDisposablescheduleDirect(Runnabletask) { ... }publicDisposablescheduleDirect(Runnabletask,longdelay,TimeUnitunit) { ... }publicDisposablescheduleDirectPeriodically(Runnabletask,longinitialDelay,longperiod,TimeUnitunit) { ... }publiclongnow(TimeUnitunit) { ... }// ... rest is the same: lifecycle methods, worker creation}

The main purpose is to avoid the tracking overhead of theWorkers for typically one-shot tasks. The methods have a default implementation that reusescreateWorker properly but can be overridden with more efficient implementations if necessary.

The method that returns the scheduler's own notion of current time,now() has been changed to accept aTimeUnit to indicate the unit of measure.

Entering the reactive world

One of the design flaws of RxJava 1.x was the exposure of therx.Observable.create() method that while powerful, not the typical operator you want to use to enter the reactive world. Unfortunately, so many depend on it that we couldn't remove or rename it.

Since 2.x is a fresh start, we won't make that mistake again. Each reactive base typeFlowable,Observable,Single,Maybe andCompletable feature a safecreate operator that does the right thing regarding backpressure (forFlowable) and cancellation (all):

Flowable.create((FlowableEmitter<Integer>emitter) -> {emitter.onNext(1);emitter.onNext(2);emitter.onComplete();},BackpressureStrategy.BUFFER);

Practically, the 1.xfromEmitter (formerlyfromAsync) has been renamed toFlowable.create. The other base reactive types have similarcreate methods (minus the backpressure strategy).

Leaving the reactive world

Apart from subscribing to the base types with their respective consumers (Subscriber,Observer,SingleObserver,MaybeObserver andCompletableObserver) and functional-interface based consumers (such assubscribe(Consumer<T>, Consumer<Throwable>, Action)), the formerly separate 1.xBlockingObservable (and similar classes for the others) has been integrated with the main reactive type. Now you can directly block for some results by invoking ablockingX operation directly:

List<Integer>list =Flowable.range(1,100).toList().blockingGet();// toList() returns SingleIntegeri =Flowable.range(100,100).blockingLast();

(The reason for this is twofold: performance and ease of use of the library as a synchronous Java 8 Streams-like processor.)

Another significant difference betweenrx.Subscriber (and co) andorg.reactivestreams.Subscriber (and co) is that in 2.x, yourSubscribers andObservers are not allowed to throw anything but fatal exceptions (seeExceptions.throwIfFatal()). (The Reactive-Streams specification allows throwingNullPointerException if theonSubscribe,onNext oronError receives anull value, but RxJava doesn't letnulls in any way.) This means the following code is no longer legal:

Subscriber<Integer>subscriber =newSubscriber<Integer>() {@OverridepublicvoidonSubscribe(Subscriptions) {s.request(Long.MAX_VALUE);    }publicvoidonNext(Integert) {if (t ==1) {thrownewIllegalArgumentException();        }    }publicvoidonError(Throwablee) {if (einstanceofIllegalArgumentException) {thrownewUnsupportedOperationException();        }    }publicvoidonComplete() {thrownewNoSuchElementException();    }};Flowable.just(1).subscribe(subscriber);

The same applies toObserver,SingleObserver,MaybeObserver andCompletableObserver.

Since many of the existing code targeting 1.x do such things, the methodsafeSubscribe has been introduced that does handle these non-conforming consumers.

Alternatively, you can use thesubscribe(Consumer<T>, Consumer<Throwable>, Action) (and similar) methods to provide a callback/lambda that can throw:

Flowable.just(1).subscribe(subscriber::onNext,subscriber::onError,subscriber::onComplete,subscriber::onSubscribe);

Testing

Testing RxJava 2.x works the same way as it does in 1.x.Flowable can be tested withio.reactivex.subscribers.TestSubscriber whereas the non-backpressuredObservable,Single,Maybe andCompletable can be tested withio.reactivex.observers.TestObserver.

test() "operator"

To support our internal testing, all base reactive types now featuretest() methods (which is a huge convenience for us) returningTestSubscriber orTestObserver:

TestSubscriber<Integer>ts =Flowable.range(1,5).test();TestObserver<Integer>to =Observable.range(1,5).test();TestObserver<Integer>tso =Single.just(1).test();TestObserver<Integer>tmo =Maybe.just(1).test();TestObserver<Integer>tco =Completable.complete().test();

The second convenience is that mostTestSubscriber/TestObserver methods return the instance itself allowing chaining the variousassertX methods. The third convenience is that you can now fluently test your sources without the need to create or introduceTestSubscriber/TestObserver instance in your code:

Flowable.range(1,5).test().assertResult(1,2,3,4,5);

Notable new assert methods

  • assertResult(T... items): asserts if subscribed, received exactly the given items in the given order followed byonComplete and no errors
  • assertFailure(Class<? extends Throwable> clazz, T... items): asserts if subscribed, received exactly the given items in the given order followed by aThrowable error of wichclazz.isInstance() returns true.
  • assertFailureAndMessage(Class<? extends Throwable> clazz, String message, T... items): same asassertFailure plus validates thegetMessage() contains the specified message
  • awaitDone(long time, TimeUnit unit) awaits a terminal event (blockingly) and cancels the sequence if the timeout elapsed.
  • assertOf(Consumer<TestSubscriber<T>> consumer) compose some assertions into the fluent chain (used internally for fusion test as operator fusion is not part of the public API right now).

One of the benefits is that changingFlowable toObservable here the test code part doesn't have to change at all due to the implicit type change of theTestSubscriber toTestObserver.

cancel and request upfront

Thetest() method onTestObserver has atest(boolean cancel) overload which cancels/disposes theTestSubscriber/TestObserver before it even gets subscribed:

PublishSubject<Integer>pp =PublishSubject.create();// nobody subscribed yetassertFalse(pp.hasSubscribers());pp.test(true);// nobody remained subscribedassertFalse(pp.hasSubscribers());

TestSubscriber has thetest(long initialRequest) andtest(long initialRequest, boolean cancel) overloads to specify the initial request amount and whether theTestSubscriber should be also immediately cancelled. If theinitialRequest is given, theTestSubscriber offers therequestMore(long) method to keep requesting in a fluent manner:

Flowable.range(1,5).test(0).assertValues().requestMore(1).assertValues(1).requestMore(2).assertValues(1,2,3).requestMore(2).assertResult(1,2,3,4,5);

or alternatively theTestSubscriber instance has to be captured to gain access to itsrequest() method:

PublishProcessor<Integer>pp =PublishProcessor.create();TestSubscriber<Integer>ts =pp.test(0L);ts.request(1);pp.onNext(1);pp.onNext(2);ts.assertFailure(MissingBackpressureException.class,1);

Testing an async source

Given an asynchronous source, fluent blocking for a terminal event is now possible:

Flowable.just(1).subscribeOn(Schedulers.single()).test().awaitDone(5,TimeUnit.SECONDS).assertResult(1);

Mockito & TestSubscriber

Those who are using Mockito and mockedObserver in 1.x has to mock theSubscriber.onSubscribe method to issue an initial request, otherwise, the sequence will hang or fail with hot sources:

@SuppressWarnings("unchecked")publicstatic <T>Subscriber<T>mockSubscriber() {Subscriber<T>w =mock(Subscriber.class);Mockito.doAnswer(newAnswer<Object>() {@OverridepublicObjectanswer(InvocationOnMocka)throwsThrowable {Subscriptions =a.getArgumentAt(0,Subscription.class);s.request(Long.MAX_VALUE);returnnull;        }    }).when(w).onSubscribe((Subscription)any());returnw;}

Operator differences

Most operators are still there in 2.x and practically all of them have the same behavior as they had in 1.x. The following subsections list each base reactive type and the difference between 1.x and 2.x.

Generally, many operators gained overloads that now allow specifying the internal buffer size or prefetch amount they should run their upstream (or inner sources).

Some operator overloads have been renamed with a postfix, such asfromArray,fromIterable etc. The reason for this is that when the library is compiled with Java 8, the javac often can't disambiguate between functional interface types.

Operators marked as@Beta or@Experimental in 1.x are promoted to standard.

1.x Observable to 2.x Flowable

Factory methods:

1.x2.x
ambaddedamb(ObservableSource...) overload, 2-9 argument versions dropped
RxRingBuffer.SIZEbufferSize()
combineLatestadded varargs overload, added overloads withbufferSize argument,combineLatest(List) dropped
concatadded overload withprefetch argument, 5-9 source overloads dropped, useconcatArray instead
N/AaddedconcatArray andconcatArrayDelayError
N/AaddedconcatArrayEager andconcatArrayEagerDelayError
concatDelayErroradded overloads with option to delay till the current ends or till the very end
concatEagerDelayErroradded overloads with option to delay till the current ends or till the very end
create(SyncOnSubscribe)replaced withgenerate + overloads (distinct interfaces, you can implement them all at once)
create(AsnycOnSubscribe)not present
create(OnSubscribe)repurposed with safecreate(FlowableOnSubscribe, BackpressureStrategy), raw support viaunsafeCreate()
fromdisambiguated intofromArray,fromIterable,fromFuture
N/AaddedfromPublisher
fromAsyncrenamed tocreate()
N/AaddedintervalRange()
limitdropped, usetake
mergeadded overloads withprefetch
mergeDelayErroradded overloads withprefetch
sequenceEqualadded overload withbufferSize
switchOnNextadded overload withprefetch
switchOnNextDelayErroradded overload withprefetch
timerdeprecated overloads dropped
zipadded overloads withbufferSize anddelayErrors capabilities, disambiguated tozipArray andzipIterable

Instance methods:

1.x2.x
allRC3 returnsSingle<Boolean> now
anyRC3 returnsSingle<Boolean> now
asObservablerenamed tohide(), hides all identities now
bufferoverloads with customCollection supplier
cache(int)deprecated and dropped
collectRC3 returnsSingle<U>
collect(U, Action2<U, T>)disambiguated tocollectInto andRC3 returnsSingle<U>
concatMapadded overloads withprefetch
concatMapDelayErroradded overloads withprefetch, option to delay till the current ends or till the very end
concatMapEageradded overloads withprefetch
concatMapEagerDelayErroradded overloads withprefetch, option to delay till the current ends or till the very end
countRC3 returnsSingle<Long> now
countLongdropped, usecount
distinctoverload with customCollection supplier.
doOnCompletedrenamed todoOnComplete, note the missingd!
doOnUnsubscriberenamed toFlowable.doOnCancel anddoOnDispose for the others,additional info
N/AaddeddoOnLifecylce to handleonSubscribe,request andcancel peeking
elementAt(int)RC3 no longer signals NoSuchElementException if the source is shorter than the index
elementAt(Func1, int)dropped, usefilter(predicate).elementAt(int)
elementAtOrDefault(int, T)renamed toelementAt(int, T) andRC3 returnsSingle<T>
elementAtOrDefault(Func1, int, T)dropped, usefilter(predicate).elementAt(int, T)
first()RC3 renamed tofirstElement and returnsMaybe<T>
first(Func1)dropped, usefilter(predicate).first()
firstOrDefault(T)renamed tofirst(T) andRC3 returnsSingle<T>
firstOrDefault(Func1, T)dropped, usefilter(predicate).first(T)
flatMapadded overloads withprefetch
N/AaddedforEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]]) for conditionally stopping consumption
groupByadded overload withbufferSize anddelayError option,the custom internal map version didn't make it into RC1
ignoreElementsRC3 returnsCompletable
isEmptyRC3 returnsSingle<Boolean>
last()RC3 renamed tolastElement and returnsMaybe<T>
last(Func1)dropped, usefilter(predicate).last()
lastOrDefault(T)renamed tolast(T) andRC3 returnsSingle<T>
lastOrDefault(Func1, T)dropped, usefilter(predicate).last(T)
nestdropped, use manualjust
publish(Func1)added overload withprefetch
reduce(Func2)RC3 returnsMaybe<T>
N/AaddedreduceWith(Callable, BiFunction) to reduce in a Subscriber-individual manner, returnsSingle<T>
N/AaddedrepeatUntil(BooleanSupplier)
repeatWhen(Func1, Scheduler)dropped the overload, usesubscribeOn(Scheduler).repeatWhen(Function) instead
retryaddedretry(Predicate),retry(int, Predicate)
N/AaddedretryUntil(BooleanSupplier)
retryWhen(Func1, Scheduler)dropped the overload, usesubscribeOn(Scheduler).retryWhen(Function) instead
sampledoesn't emit the very last item if the upstream completes within the period, added overloads withemitLast parameter
N/AaddedscanWith(Callable, BiFunction) to scan in a Subscriber-individual manner
single()RC3 renamed tosingleElement and returnsMaybe<T>
single(Func1)dropped, usefilter(predicate).single()
singleOrDefault(T)renamed tosingle(T) andRC3 returnsSingle<T>
singleOrDefault(Func1, T)dropped, usefilter(predicate).single(T)
skipLastadded overloads withbufferSize anddelayError options
startWith2-9 argument version dropped, usestartWithArray instead
N/AaddedstartWithArray to disambiguate
subscribeNo longer wraps all consumer types (i.e.,Observer) with a safety wrapper, (just like the 1.xunsafeSubscribe no longer available). UsesafeSubscribe to get an explicit safety wrapper around a consumer type.
N/AaddedsubscribeWith that returns its input after subscription
switchMapadded overload withprefetch argument
switchMapDelayErroradded overload withprefetch argument
takeLastBufferdropped
N/Aaddedtest() (returns TestSubscriber subscribed to this) with overloads to fluently test
throttleLastdoesn't emit the very last item if the upstream completes within the period, usesample with theemitLast parameter
timeout(Func0<Observable>, ...)signature changed totimeout(Publisher, ...) and dropped the function, usedefer(Callable<Publisher>>) if necessary
toBlocking().yinlined asblockingY() operators, excepttoFuture
toCompletableRC3 dropped, useignoreElements
toListRC3 returnsSingle<List<T>>
toMapRC3 returnsSingle<Map<K, V>>
toMultimapRC3 returnsSingle<Map<K, Collection<V>>>
N/AaddedtoFuture
N/AaddedtoObservable
toSingleRC3 dropped, usesingle(T)
toSortedListRC3 returnsSingle<List<T>>
unsafeSubscribeRemoved as the Reactive Streams specification mandates theonXXX methods don't crash and therefore the default is to not have a safety net insubscribe. The newsafeSubscribe method was introduced to explicitly add the safety wrapper around a consumer type.
withLatestFrom5-9 source overloads dropped
zipWithadded overloads withprefetch anddelayErrors options

Different return types

Some operators that produced exactly one value or an error now returnSingle in 2.x (orMaybe if an empty source is allowed).

(Remark: this is "experimental" in RC2 and RC3 to see how it feels to program with such mixed-type sequences and whether or not there has to be too muchtoObservable/toFlowable back-conversion.)

OperatorOld return typeNew return typeRemark
all(Predicate)Observable<Boolean>Single<Boolean>Emits true if all elements match the predicate
any(Predicate)Observable<Boolean>Single<Boolean>Emits true if any elements match the predicate
count()Observable<Long>Single<Long>Counts the number of elements in the sequence
elementAt(int)Observable<T>Maybe<T>Emits the element at the given index or completes
elementAt(int, T)Observable<T>Single<T>Emits the element at the given index or the default
elementAtOrError(int)Observable<T>Single<T>Emits the indexth element or aNoSuchElementException
first(T)Observable<T>Single<T>Emits the very first element orNoSuchElementException
firstElement()Observable<T>Maybe<T>Emits the very first element or completes
firstOrError()Observable<T>Single<T>Emits the first element or aNoSuchElementException if the source is empty
ignoreElements()Observable<T>CompletableIgnore all but the terminal events
isEmpty()Observable<Boolean>Single<Boolean>Emits true if the source is empty
last(T)Observable<T>Single<T>Emits the very last element or the default item
lastElement()Observable<T>Maybe<T>Emits the very last element or completes
lastOrError()Observable<T>Single<T>Emits the lastelement or aNoSuchElementException if the source is empty
reduce(BiFunction)Observable<T>Maybe<T>Emits the reduced value or completes
reduce(Callable, BiFunction)Observable<U>Single<U>Emits the reduced value (or the initial value)
reduceWith(U, BiFunction)Observable<U>Single<U>Emits the reduced value (or the initial value)
single(T)Observable<T>Single<T>Emits the only element or the default item
singleElement()Observable<T>Maybe<T>Emits the only element or completes
singleOrError()Observable<T>Single<T>Emits the one and only element, IndexOutOfBoundsException if the source is longer than 1 item or aNoSuchElementException if the source is empty
toList()Observable<List<T>>Single<List<T>>collects all elements into aList
toMap()Observable<Map<K, V>>Single<Map<K, V>>collects all elements into aMap
toMultimap()Observable<Map<K, Collection<V>>>Single<Map<K, Collection<V>>>collects all elements into aMap with collection
toSortedList()Observable<List<T>>Single<List<T>>collects all elements into aList and sorts it

Removals

To make sure the final API of 2.0 is clean as possible, we remove methods and other components between release candidates without deprecating them.

Removed in versionComponentRemark
RC3Flowable.toCompletable()useFlowable.ignoreElements()
RC3Flowable.toSingle()useFlowable.single(T)
RC3Flowable.toMaybe()useFlowable.singleElement()
RC3Observable.toCompletable()useObservable.ignoreElements()
RC3Observable.toSingle()useObservable.single(T)
RC3Observable.toMaybe()useObservable.singleElement()

Miscellaneous changes

doOnCancel/doOnDispose/unsubscribeOn

In 1.x, thedoOnUnsubscribe was always executed on a terminal event because 1.x'SafeSubscriber calledunsubscribe on itself. This was practically unnecessary and the Reactive-Streams specification states that when a terminal event arrives at aSubscriber, the upstreamSubscription should be considered cancelled and thus callingcancel() is a no-op.

For the same reason,unsubscribeOn is not called on the regular termination path but only when there is an actualcancel (ordispose) call on the chain.

Therefore, the following sequence won't calldoOnCancel:

Flowable.just(1,2,3).doOnCancel(() ->System.out.println("Cancelled!")).subscribe(System.out::println);

However, the following will call since thetake operator cancels after the set amount ofonNext events have been delivered:

Flowable.just(1,2,3).doOnCancel(() ->System.out.println("Cancelled!")).take(2).subscribe(System.out::println);

If you need to perform cleanup on both regular termination or cancellation, consider the operatorusing instead.

Alternatively, thedoFinally operator (introduced in 2.0.1 and standardized in 2.1) calls a developer specifiedAction that gets executed after a source completed, failed with an error or got cancelled/disposed:

Flowable.just(1,2,3).doFinally(() ->System.out.println("Finally")).subscribe(System.out::println);Flowable.just(1,2,3).doFinally(() ->System.out.println("Finally")).take(2)// cancels the above after 2 elements.subscribe(System.out::println);

Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

Clone this wiki locally


[8]ページ先頭

©2009-2025 Movatter.jp