Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Writing operators for 2.0

David Karnok edited this pageMay 4, 2018 ·38 revisions
Table of contents

Introduction

Writing operators, source-like (fromEmitter) or intermediate-like (flatMap)has always been a hard task to do in RxJava. There are many rules to obey, many cases to consider but at the same time, many (legal) shortcuts to take to build a well performing code. Now writing an operator specifically for 2.x is 10 times harder than for 1.x. If you want to exploit all the advanced, 4th generation features, that's even 2-3 times harder on top (so 30 times harder in total).

(If you have been followingmy blog about RxJava internals, writing operators is maybe only 2 times harder than 1.x; some things have moved around, some tools popped up while others have been dropped but there is a relatively straight mapping from 1.x concepts and approaches to 2.x concepts and approaches.)

In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the Reactive-Streams specification as well as RxJava 2.x's own extensions and additional expectations/requirements.

SinceReactor 3 has the same architecture asRxJava 2 (no accident, I architected and contributed 80% ofReactor 3 as well) the same principles outlined in this page applies to writing operators forReactor 3. Note however that they chose different naming and locations for their utility and support classes so you may have to search for the equivalent components.

Warning on internal components

RxJava has several hundred public classes implementing various operators and helper facilities. Since there is no way to hide these in Java 6-8, the general contract is that anything belowio.reactivex.internal is considered private and subject to change without warnings. It is not recommended to reference these in your code (unless you contribute to RxJava itself) and must be prepared that even a patch change may shuffle/rename things around in them. That being said, they usually contain valuable tools for operator builders and as such are quite attractive to use them in your custom code.

Atomics, serialization, deferred actions

As RxJava itself has building blocks for creating reactive dataflows, its components have building blocks as well in the form of concurrency primitives and algorithms. Many refer to the bookConcurrency in Practice for learning the fundamentals needed. Unfortunately, other than some explanation of the Java Memory Model, the book lacks the techniques required for developing operators for RxJava 1.x and 2.x.

Field updaters and Android

If you looked at the source code of RxJava and then atReactor 3, you might have noticed that RxJava doesn't use theAtomicXFieldUpdater classes. The reason for this is that on certain Android devices, the runtime "messes up" the fieldnames and the reflection logic behind the field updaters fails to locate those fields in the operators. To avoid this we decided to only use the fullAtomicX classes (as fields or extending them).

If you target the RxJava library with your custom operator (or Android), you are encouraged to do the same. If you plan have operators running on desktop Java, feel free to use the field updaters instead.

Request accounting

When dealing with backpressure inFlowable operators, one needs a way to account the downstream requests and emissions in response to those requests. For this we use a singleAtomicLong. Accounting must be atomic because requesting more and emitting items to fulfill an earlier request may happen at the same time.

The naive approach for accounting would be to simply callAtomicLong.getAndAdd() with new requests andAtomicLong.addAndGet() for decrementing based on how many elements were emitted.

The problem with this is that the Reactive-Streams specification declaresLong.MAX_VALUE as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow thelong into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs.

Therefore, both addition and subtraction have to be capped atLong.MAX_VALUE and0 respectively. Since there is no dedicatedAtomicLong method for it, we have to use a Compare-And-Set loop. (Usually, requesting happens relatively rarely compared to emission amounts thus the lack of dedicated machine code instruction is not a performance bottleneck.)

publicstaticlongadd(AtomicLongrequested,longn) {for (;;) {longcurrent =requested.get();if (current ==Long.MAX_VALUE) {returnLong.MAX_VALUE;        }longupdate =current +n;if (update <0L) {update =Long.MAX_VALUE;        }if (requested.compareAndSet(current,update)) {returncurrent;        }    }}publicstaticlongproduced(AtomicLongrequested,longn) {for (;;) {longcurrent =requested.get();if (current ==Long.MAX_VALUE) {returnLong.MAX_VALUE;        }longupdate =current -n;if (update <0L) {update =0;        }if (requested.compareAndSet(current,update)) {returnupdate;        }    }}

In fact, these are so common in RxJava's operators that these algorithms are available as utility methods on theinternalBackpressureHelper class under the same name.

Sometimes, instead of having a separateAtomicLong field, your operator can extendAtomicLong saving on the indirection and class size. The practice in RxJava 2.x operators is that unless there is another atomic counter needed by the operator, (such as work-in-progress counter, see the later subsection) and otherwise doesn't need a base class, they extendAtomicLong directly.

TheBackpressureHelper class features special versions ofadd andproduced which treatLong.MIN_VALUE as a cancellation indication and won't change theAtomicLongs value if they see it.

Once

RxJava 2 expanded the single-event reactive types to includeMaybe (called as the reactiveOptional by some). The common property ofSingle,Completable andMaybe is that they can only call one of the 3 kinds of methods on their consumers:(onSuccess | onError | onComplete). Since they also participate in concurrent scenarios, an operator needs a way to ensure that only one of them is called even though the input sources may call multiple of them at once.

To ensure this, operators may use theAtomicBoolean.compareAndSet to atomically chose the event to relay (and thus the other events to drop).

finalAtomicBooleanonce =newAtomicBoolean();finalMaybeObserver<?superT>child = ...;voidemitSuccess(Tvalue) {if (once.compareAndSet(false,true)) {child.onSuccess(value);   }}voidemitFailure(Throwablee) {if (once.compareAndSet(false,true)) {child.onError(e);    }else {RxJavaPlugins.onError(e);    }}

Note that the same sequential requirement applies to these 0-1 reactive sources as toFlowable/Observable, therefore, if your operator doesn't have to deal with events from multiple sources (and pick one of them), you don't need this construct.

Serialization

With more complicated sources, it may happen that multiple things happen that may trigger emission towards the downstream, such as upstream becoming available while the downstream requests for more data while the sequence gets cancelled by a timeout.

Instead of working out the often very complicated state transitions via atomics, perhaps the easiest way is to serialize the events, actions or tasks and have one thread perform the necessary steps after that. This is what I callqueue-drain approach (or trampolining by some).

(The other approach,emitter-loop is no longer recommended with 2.x due to its potential blockingsynchronized constructs that looks performant in single-threaded case but destroys it in true concurrent case.)

The concept is relatively simple: have a concurrent queue and a work-in-progress atomic counter, enqueue the item, increment the counter and if the counter transitioned from 0 to 1, keep draining the queue, work with the element and decrement the counter until it reaches zero again:

finalConcurrentLinkedQueue<Runnable>queue = ...;finalAtomicIntegerwip = ...;voidexecute(Runnabler) {queue.offer(r);if (wip.getAndIncrement() ==0) {do {queue.poll().run();        }while (wip.decrementAndGet() !=0);    }}

The same pattern applies when one has to emit onNext values to a downstream consumer:

finalConcurrentLinkedQueue<T>queue = ...;finalAtomicIntegerwip = ...;finalSubscriber<?superT>child = ...;voidemit(Tr) {queue.offer(r);if (wip.getAndIncrement() ==0) {do {child.onNext(queue.poll());        }while (wip.decrementAndGet() !=0);    }}

Queues

UsingConcurrentLinkedQueue is a reliable although mostly an overkill for such situations because it allocates on each call tooffer() and is unbounded. It can be replaced with more optimized queues (seeJCTools) and RxJava itself also has some customized queues available (internal!):

  • SpscArrayQueue used when the queue is known to be fed by a single thread but the serialization has to look at other things (request, cancellation, termination) that can be read from other fields. Example:observeOn has a fixed request pattern which fits into this type of queue and extra fields for passing an error, completion or downstream requests into the drain logic.
  • SpscLinkedArrayQueue used when the queue is known to be fed by a single thread but there is no bound on the element count. Example:UnicastProcessor, almost all bufferingObservable operator. Some operators use it with multiple event sources by synchronizing on theoffer side - a tradeoff between allocation and potential blocking:
SpscLinkedArrayQueue<T>q = ...synchronized(q) {q.offer(value);}
  • MpscLinkedQueue where there could be many feeders and unknown number of elements. Example:buffer with reactive boundary.

The RxJava 2.x implementations of these types of queues have different class hierarchy than the JDK/JCTools versions. Our classes don't implement thejava.util.Queue interface but rather a custom, simplified interface:

interfaceSimpleQueue<T> {booleanoffer(Tt);booleanoffer(Tt1,Tt2);Tpoll()throwsException;booleanisEmpty();voidclear();}interfaceSimplePlainQueue<T>extendsSimpleQueue<T> {@OverrideTpoll();}publicfinalclassSpscArrayQueue<T>implementsSimplePlainQueue<T> {// ...}

This simplified queue API gets rid of the unused parts (iterator, collections API remnants) and adds a bi-offer method (only implemented atomically inSpscLinkedArrayQueue currently). The second interface,SimplePlainQueue is defined to suppress thethrows Exception on poll on queue types that won't throw that exception and there is no need for try-catch around them.

Deferred actions

The Reactive-Streams has a strict requirement that callingonSubscribe() must happen before any calls to the rest of theonXXX methods and by nature, any calls toSubscription.request() andSubscription.cancel(). The same logic applies to the design ofObservable,Single,Completable andMaybe with their connection type ofDisposable.

Often though, such call toonSubscribe may happen later than the respectivecancel() needs to happen. For example, the user may want to callcancel() before the respectiveSubscription actually becomes available insubscribeOn. Other operators may need to callonSubscribe before they connect to other sources but at that time, there is no direct way for relaying acancel call to an unavailable upstreamSubscription.

The solution isdeferred cancellation anddeferred requesting in general.

Deferred cancellation

This approach affects all 5 reactive types and works the same way for everyone. First, have anAtomicReference that will hold the respective connection type (or any other type whose method call has to happen later). Two methods are needed handling theAtomicReference class, one that sets the actual instance and one that calls thecancel/dispose method on it.

staticfinalDisposableDISPOSED;static {DISPOSED =Disposables.empty();DISPOSED.dispose();}staticbooleanset(AtomicReference<Disposable>target,Disposablevalue) {for (;;) {Disposablecurrent =target.get();if (current ==DISPOSED) {if (value !=null) {value.dispose();            }returnfalse;        }if (target.compareAndSet(current,value)) {if (current !=null) {current.dispose();            }returntrue;        }    }}staticbooleandispose(AtomicReference<Disposable>target) {Disposablecurrent =target.getAndSet(DISPOSED);if (current !=DISPOSED) {if (current !=null) {current.dispose();        }returntrue;    }returnfalse;}

The approach uses an unique sentinel valueDISPOSED - that should not appear elsewhere in your code - to indicate once a late actualDisposable arrives, it should be disposed immediately. Both methods return true if the operation succeeded or false if the target was already disposed.

Sometimes, only one call toset is permitted (i.e.,setOnce) and other times, the previous non-null value needs no call todispose because it is known to be disposed already (i.e.,replace).

As with the request management, there are utility classes and methods for these operations:

  • (internal)SequentialDisposable that usesupdate,replace anddispose but leaks the API ofAtomicReference
  • SerialDisposable that has safe API withset,replace anddispose among other things
  • (internal)DisposableHelper that features the methods shown above and the global disposed sentinel used by RxJava. It may come handy when one usesAtomicReference<Disposable> as a base class.

The same pattern applies toSubscription with itscancel() method and with helper (internal) classSubscriptionHelper (but noSequentialSubscription orSerialSubscription, see next subsection).

Deferred requesting

WithFlowables (and Reactive-StreamsPublishers) therequest() calls need to be deferred as well. In one form (the simpler one), the respective lateSubscription will eventually arrive and we need to relay all previous and all subsequent request amount to itsrequest() method.

In 1.x, this behavior was implicitly provided byrx.Subscriber but at a high cost that had to be payed by all instances whether or not they needed this feature.

The solution works by having theAtomicReference for theSubscription and anAtomicLong to store and accumulate the requests until the actualSubscription arrives, then atomically request all deferred value once.

staticbooleandeferredSetOnce(AtomicReference<Subscription>subscription,AtomicLongrequested,SubscriptionnewSubscription) {if (subscription.compareAndSet(null,newSubscription) {longr =requested.getAndSet(0L);if (r !=0) {newSubscription.request(r);        }returntrue;    }newSubscription.cancel();if (subscription.get() !=SubscriptionHelper.CANCELLED) {RxJavaPlugins.onError(newIllegalStateException("Subscription already set!"));    }returnfalse;}staticvoiddeferredRequest(AtomicReference<Subscription>subscription,AtomicLongrequested,longn) {Subscriptioncurrent =subscription.get();if (current !=null) {current.request(n);    }else {BackpressureHelper.add(requested,n);current =subscription.get();if (current !=null) {longr =requested.getAndSet(0L);if (r !=0L) {current.request(r);            }        }    }}

IndeferredSetOnce, if the CAS from null to thenewSubscription succeeds, we atomically exchange the request amount to 0L and if the original value was nonzero, we request fromnewSubscription. IndeferredRequest, if there is aSubscription we simply request from it directly. Otherwise, we accumulate the requests via the helper method then check again if theSubscription arrived or not. If it arrived in the meantime, we atomically exchange the accumulated request value and if nonzero, request it from the newly retrievedSubscription. This non-blocking logic makes sure that in case of concurrent invocations of the two methods, no accumulated request is left behind.

This complex logic and methods, along with other safeguards are available in the (internal)SubscriptionHelper utility class and can be used like this:

finalclassOperator<T>implementsSubscriber<T>,Subscription {finalSubscriber<?superT>child;finalAtomicReference<Subscription>ref =newAtomicReference<>();finalAtomicLongrequested =newAtomicLong();publicOperator(Subscriber<?superT>child) {this.child =child;    }@OverridepublicvoidonSubscribe(Subscriptions) {SubscriptionHelper.deferredSetOnce(ref,requested,s);    }@OverridepublicvoidonNext(Tt) { ... }@OverridepublicvoidonError(Throwablet) { ... }@OverridepublicvoidonComplete() { ... }@Overridepublicvoidcancel() {SubscriptionHelper.cancel(ref);    }@Overridepublicvoidrequest(longn) {SubscriptionHelper.deferredRequested(ref,requested,n);    }}Operator<T>parent =newOperator<T>(child);child.onSubscribe(parent);source.subscribe(parent);

The second form is when multipleSubscriptions replace each other and we not only need to hold onto request amounts when there is none of them but make sure a newerSubscription is requested only that much the previousSubscription's upstream didn't deliver. This is calledSubscription arbitration and the relevant algorithms are quite verbose and will be omitted here. There is, however, an utility class that manages this: (internal)SubscriptionArbiter.

You can extend it (to save on object headers) or have it as a field. Its main use is to send it to the downstream viaonSubscribe and update its currentSubscription in the current operator. Note that even though its methods are thread-safe, it is intended for swappingSubscriptions when the current one finished emitting events. This makes sure that any newerSubscription is requested the right amount and not more due to production/switch race.

finalSubscriptionArbiterarbiter = ...// ...child.onSubscribe(arbiter);// ...longproduced;@OverridepublicvoidonSubscribe(Subscriptions) {arbiter.setSubscription(s);}@OverridepublicvoidonNext(Tvalue) {produced++;child.onNext(value);}@OverridepublicvoidonComplete() {longp =produced;if (p !=0L) {arbiter.produced(p);    }subscribeNext();}

For better performance, most operators can count the produced element amount and issue a singleSubscriptionArbiter.produced() call just before switching to the nextSubscription.

Atomic error management

In some cases, multiple sources may signal aThrowable at the same time but the contract forbids callingonError multiple times. Once can, of course use theonce approach withAtomicReference<Throwable> and throw out but the first one to set theThrowable on it.

The alternative is to collect theseThrowables into aCompositeException as long as possible and at one point lock out the others. This works by doing a copy-on-write scheme or by linkingCompositeExceptions atomically and having a terminal sentinel to indicate all further errors should be dropped.

staticfinalThrowableTERMINATED =newThrowable();staticbooleanaddThrowable(AtomicReference<Throwable>ref,Throwablee) {for (;;) {Throwablecurrent =ref.get();if (current ==TERMINATED) {returnfalse;        }Throwablenext;if (current ==null) {next =e;        }else {next =newCompositeException(current,e);        }if (ref.compareAndSet(current,next)) {returntrue;        }    }}staticThrowableterminate(AtomicReference<Throwable>ref) {returnref.getAndSet(TERMINATED);}

as with most common logic, this is supported by the (internal)ExceptionHelper utility class and the (internal)AtomicThrowable class.

The usage pattern looks as follows:

finalAtomicThrowableerrors = ...;@OverridepublicvoidonError(Throwablee) {if (errors.addThrowable(e)) {drain();    }else {RxJavaPlugins.onError(e);    }}voiddrain() {// ...if (errors.get() !=null) {child.onError(errors.terminate());return;   }// ...}

Half-serialization

Sometimes having the queue-drain,SerializedSubscriber orSerializedObserver is a bit of an overkill. Such cases include when there is only one thread callingonNext but other threads may callonError oronComplete concurrently. Example operators includetakeUntil where the other source may "interrupt" the main sequence and inject anonComplete into it before the main source itself would complete some time later. This is what I callhalf-serialization.

The approach uses the concepts of the deferred actions and atomic error management discussed above and has 3 methods for theonNext,onError andonComplete management:

publicstatic <T>voidonNext(Subscriber<?superT>subscriber,Tvalue,AtomicIntegerwip,AtomicThrowableerror) {if (wip.get() ==0 &&wip.compareAndSet(0,1)) {subscriber.onNext(value);if (wip.decrementAndGet() !=0) {Throwableex =error.terminate();if (ex !=null) {subscriber.onError(ex);            }else {subscriber.onComplete();            }        }    }}publicstaticvoidonError(Subscriber<?>subscriber,Throwableex,AtomicIntegerwip,AtomicThrowableerror) {if (error.addThrowable(ex)) {if (wip.getAndIncrement() ==0) {subscriber.onError(error.terminate());        }    }else {RxJavaPlugins.onError(ex);    }}publicstaticvoidonComplete(Subscriber<?>subscriber,AtomicIntegerwip,AtomicThrowableerror) {if (wip.getAndIncrement() ==0) {Throwableex =error.terminate();if (ex !=null) {subscriber.onError(ex);        }else {subscriber.onComplete();        }    }}

Here, thewip counter indicates there is an active emission happening and if found non-zero when trying to leave theonNext, it is taken as indication there was a concurrentonError oronComplete() call and the child must be notified. All subsequent calls to any of these methods are ignored. In this case, thewip is never decremented back to zero.

RxJava 2.x, again, supports these with the (internal) utility classHalfSerializer and allows targetingSubscribers andObservers with it.

Fast-path queue-drain

In some operators, it is unlikely concurrent threads try to enter into the drain loop at the same time and having to play the full enqueue-increment-dequeue adds unnecessary overhead.

Luckily, such situations can be detected by a simple compare-and-set attempt on the work-in-progress counter, trying to change the amount from 0 to 1. If it fails, there is a concurrent drain in progress and we revert back to the classical queue-drain logic. If succeeds, we don't enqueue anything but emit the value / perform the action right there and we try to leave the serialized section.

publicvoidonNext(Tv) {if (wip.get() ==0 &&wip.compareAndSet(0,1)) {child.onNext(v);if (wip.decrementAndGet() ==0) {break;       }   }else {queue.offer(v);if (wip.getAndIncrement() !=0) {break;       }   }drainLoop();}voiddrain() {if (getAndIncrement() ==0) {drainLoop();   }}voiddrainLoop() {// the usual drain loop part after the classical getAndIncrement()}

In this pattern, the classicaldrain is spit intodrain anddrainLoop. The newdrain does the increment-check and callsdrainLoop anddrainLoop contains the remaining logic with the loop, emission and wip management as usual.

On the fast path, when we try to leave it, it is possible a concurrent call toonNext ordrain incremented thewip counter further and the decrement didn't return it to zero. This is an indication for further work and we calldrainLoop to process it.

FlowableSubscriber

Version 2.0.7 introduced a new interface,FlowableSubscriber that extendsSubscriber from Reactive-Streams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the Reactive-Streams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers ofFlowables.

The rule relaxations are as follows:

  • §1.3 relaxation:onSubscribe may run concurrently with onNext in case theFlowableSubscriber callsrequest() from insideonSubscribe and it is the resposibility ofFlowableSubscriber to ensure thread-safety between the remaining instructions inonSubscribe andonNext.
  • §2.3 relaxation: callingSubscription.cancel andSubscription.request fromFlowableSubscriber.onComplete() orFlowableSubscriber.onError() is considered a no-operation.
  • §2.12 relaxation: if the sameFlowableSubscriber instance is subscribed to multiple sources, it must ensure itsonXXX methods remain thread safe.
  • §3.9 relaxation: issuing a non-positiverequest() will not stop the current stream but signal an error viaRxJavaPlugins.onError.

When aFlowable gets subscribed by aSubscriber, aninstanceof check will detectFlowableSubscriber and not apply theStrictSubscriber wrapper that makes sure the relaxations don't happen. In practice, ensuring rule §3.9 has the most overhead because a bad request may happen concurrently with an emission of any normal event and thus has to be serialized with one of the methods described in previous sections.

In fact, 2.x was always implemented in this relaxed manner thus looking at existing code and style is the way to go.

Therefore, it is strongly recommended one implements custom intermediate and end operators viaFlowableSubscriber.

From a source operator's perspective, extending theFlowable class and implementingsubscribeActual has no need fordispatching over the type of theSubscriber; the backing infrastructure already applies wrapping if necessary thus one can be sure insubscribeActual(Subscriber<? super T> s) the parameters is aFlowableSubscriber. (The signature couldn't be changed for compatibility reasons.) Since the two interfaces on the Java level are the same, no real preferential treating is necessary within sources (i.e., don't casts intoFlowableSubscriber.

The other base reactive consumers,Observer,SingleObserver,MaybeObserver andCompletableObserver don't need such relaxation, because

  • they are only defined and used in RxJava (i.e., no other library implemented with them),
  • they were conceptionally always derived from the relaxedSubscriber RxJava had,
  • they don't have backpressure thus norequest() call that would introduce another concurrency to think about,
  • there is no way to trigger emission from upstream beforeonSubscribe(Disposable) returns in standard operators (again, norequest() method).

Backpressure and cancellation

Backpressure (or flow control) in Reactive-Streams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from callingonNext but the protocol to honor the request amount.

Replenishing

When dealing with basic transformations in a flow, there are often cases when the number of items the upstream sends should be different what the downstream receives. Some operators may want to filter out certain items, others would batch up items before sending one item to the downstream.

However, when an item is not forwarded by an operator, the downstream has no way of knowing itsrequest(1) triggered an item generation that got dropped/buffered. Therefore, it can't know to (nor should it) repeatrequest(1) to "nudge" the source somewhere more upstream to try producing another item which now hopefully will result in an item being received by the downstream. Unlike, say the ACK-NACK based protocols, the requesting specified by the Reactive Streams are to be treated as cumulative. In the previous example, an impatient downstream would have 2 outstanding requests.

Therefore, if an operator is not guaranteed to relay an upstream item to downstream, and thus keeping a 1:1 ratio, it has the duty to keep requesting items from the upstream until said operator ends up in a position to supply an item to the downstream.

This may sound a bit complicated, but perhaps a demonstration of afilter operator can help:

finalclassFilterOddSubscriberimplementsFlowableSubscriber<Integer>,Subscription {finalSubscriber<?superInteger>downstream;Subscriptionupstream;// ...@OverridepublicvoidonSubscribe(Subscriptions) {if (upstream !=null) {s.cancel();        }else {upstream =s;downstream.onSubscribe(this);        }    }@OverridepublicvoidonNext(Integeritem) {if (item %2 !=0) {downstream.onNext(item);        }else {upstream.request(1);        }    }@Overridepublicvoidrequest(longn) {upstream.request(n);    }// the rest omitted for brevity}

In such operators, thee downstream'srequest calls are forwarded to the upstream as is, and forn times (at most, unless completed) theonNext will be invoked. In this operator, we look for the odd numbers of the flow. If we find one, the downstream will be notified. If the incoming item is even, we won't forward it to the downstream. However, the downstream is still expecting at least 1 item, but since the upstream and downstream practically talk to each other directly, the upstream considers its duty to generate items fulfilled. This misalignment is then resolved by requesting 1 more item from the upstream for the previous ignored item. If more items arrive that get ignored, more will be requested as replenishment.

Given that backpressure involves some overhead in the form of one or more atomic operations, requesting one by one could add a lot of overhead if the number of items filtered out is significantly more than those that passed through. If necessary, this situation can be either solved bydecoupling the upstream and downstream's request management or using an RxJava-specific type and protocol extension in the form ofConditionalSubscribers.

Stable prefetching

In a previous section, we saw primitives to deal with request accounting and delayedSubscriptions, but often, operators have to react to request amount changes as well. This comes up when the operator has to decouple the downstream request amount from the amount it requests from upstream, such asobserveOn.

Such logic can get quite complicated in operators but one of the simplest manifestation can be therebatchRequest operator that combines request management with serialization to ensure that upstream is requested with a predictable pattern no matter how the downstream requested (less, more or even unbounded):

finalclassRebatchRequests<T>extendsAtomicIntegerimplementsFlowableSubscriber<T>,Subscription {finalSubscriber<?superT>child;finalAtomicLongrequested;finalSpscArrayQueue<T>queue;finalintbatchSize;finalintlimit;Subscriptions;volatilebooleandone;Throwableerror;volatilebooleancancelled;longemitted;publicRebatchRequests(Subscriber<?superT>child,intbatchSize) {this.child =child;this.batchSize =batchSize;this.limit =batchSize - (batchSize >>2);// 75% of batchSizethis.requested =newAtomicLong();this.queue =newSpscArrayQueue<T>(batchSize);    }@OverridepublicvoidonSubscribe(Subscriptions) {this.s =s;child.onSubscribe(this);s.request(batchSize);    }@OverridepublicvoidonNext(Tt) {queue.offer(t);drain();    }@OverridepublicvoidonError(Throwablet) {error =t;done =true;drain();    }@OverridepublicvoidonComplete() {done =true;drain();    }@Overridepublicvoidrequest(longn) {BackpressureHelper.add(requested,n);drain();    }@Overridepublicvoidcancel() {cancelled =true;s.cancel();    }voiddrain() {// see next code example    }}

Here we extendAtomicInteger since the work-in-progress counting happens more often and is worth avoiding the extra indirection. The class extendsSubscription and it hands itself to thechildSubscriber to capture itsrequest() (andcancel()) calls and route it to the maindrain logic. Some operators need only this, some other operators (such asobserveOn not only routes the downstream request but also does extra cancellations (cancels the asynchrony providingWorker as well) in itscancel() method.

Important: when implementing operators forFlowable andObservable in RxJava 2.x, you are not allowed to pass along an upstreamSubscription orDisposable to the childSubscriber/Observer when the operator logic itself doesn't require hooking therequest/cancel/dispose calls. The reason for this is how operator-fusion is implemented on top ofSubscription andDisposable passing throughonSubscribe in RxJava 2.x (and inReactor 3). See the next section about operator-fusion. There is no fusion inSingle,Completable orMaybe (because there is no requesting or unbounded buffering with them) and their operators can pass the upstreamDisposable along as is.

Next comes thedrain method whose pattern appears in many operators (with slight variations on how and what the emission does).

voiddrain() {if (getAndIncrement() !=0) {return;    }intmissed =1;for (;;) {longr =requested.get();longe =0L;longf =emitted;while (e !=r) {if (cancelled) {return;            }booleand =done;if (d) {Throwableex =error;if (ex !=null) {child.onError(ex);return;                }            }Tv =queue.poll();booleanempty =v ==null;if (d &&empty) {child.onComplete();return;            }if (empty) {break;            }child.onNext(v);e++;if (++f ==limit) {s.request(f);f =0L;            }        }if (e ==r) {if (cancelled) {return;            }if (done) {Throwableex =error;if (ex !=null) {child.onError(ex);return;                }if (queue.isEmpty()) {child.onComplete();return;                }            }        }if (e !=0L) {BackpressureHelper.produced(requested,e);        }emitted =f;missed =addAndGet(-missed);if (missed ==0) {break;        }    }}

This particular pattern is called thestable-request queue-drain. Another variation doesn't care about request amount stability towards upstream and simply requests the amount it delivered to the child:

voiddrain() {if (getAndIncrement() !=0) {return;    }intmissed =1;for (;;) {longr =requested.get();longe =0L;while (e !=r) {if (cancelled) {return;            }booleand =done;if (d) {Throwableex =error;if (ex !=null) {child.onError(ex);return;                }            }Tv =queue.poll();booleanempty =v ==null;if (d &&empty) {child.onComplete();return;            }if (empty) {break;            }child.onNext(v);e++;        }if (e ==r) {if (cancelled) {return;            }if (done) {Throwableex =error;if (ex !=null) {child.onError(ex);return;                }if (queue.isEmpty()) {child.onComplete();return;                }            }        }if (e !=0L) {BackpressureHelper.produced(requested,e);s.request(e);        }missed =addAndGet(-missed);if (missed ==0) {break;        }    }}

The third variation allows delaying a potential error until the upstream has terminated and all normal elements have been delivered to the child:

finalbooleandelayError;voiddrain() {if (getAndIncrement() !=0) {return;    }intmissed =1;for (;;) {longr =requested.get();longe =0L;while (e !=r) {if (cancelled) {return;            }booleand =done;if (d && !delayError) {Throwableex =error;if (ex !=null) {child.onError(ex);return;                }            }Tv =queue.poll();booleanempty =v ==null;if (d &&empty) {Throwableex =error;if (ex !=null) {child.onError(ex);                }else {child.onComplete();                }return;            }if (empty) {break;            }child.onNext(v);e++;        }if (e ==r) {if (cancelled) {return;            }if (done) {if (delayError) {if (queue.isEmpty()) {Throwableex =error;if (ex !=null) {child.onError(ex);                        }else {child.onComplete();                        }return;                    }                }else {Throwableex =error;if (ex !=null) {child.onError(ex);return;                    }if (queue.isEmpty()) {child.onComplete();return;                    }                }            }        }if (e !=0L) {BackpressureHelper.produced(requested,e);s.request(e);        }missed =addAndGet(-missed);if (missed ==0) {break;        }    }}

If the downstream cancels the operator, thequeue may still hold elements which may get referenced longer than expected if the operator chain itself is referenced in some way. On the user level, applyingonTerminateDetach will forget all references going upstream and downstream and can help with this situation. On the operator level, RxJava 2.x usually callsclear() on thequeue when the sequence is cancelled or ends before the queue is drained naturally. This requires some slight change to the drain loop:

finalbooleandelayError;@Overridepublicvoidcancel() {cancelled =true;s.cancel();if (getAndIncrement() ==0) {queue.clear();// <----------------------------    }}voiddrain() {if (getAndIncrement() !=0) {return;    }intmissed =1;for (;;) {longr =requested.get();longe =0L;while (e !=r) {if (cancelled) {queue.clear();// <----------------------------return;            }booleand =done;if (d && !delayError) {Throwableex =error;if (ex !=null) {queue.clear();// <----------------------------child.onError(ex);return;                }            }Tv =queue.poll();booleanempty =v ==null;if (d &&empty) {Throwableex =error;if (ex !=null) {child.onError(ex);                }else {child.onComplete();                }return;            }if (empty) {break;            }child.onNext(v);e++;        }if (e ==r) {if (cancelled) {queue.clear();// <----------------------------return;            }if (done) {if (delayError) {if (queue.isEmpty()) {Throwableex =error;if (ex !=null) {child.onError(ex);                        }else {child.onComplete();                        }return;                    }                }else {Throwableex =error;if (ex !=null) {queue.clear();// <----------------------------child.onError(ex);return;                    }if (queue.isEmpty()) {child.onComplete();return;                    }                }            }        }if (e !=0L) {BackpressureHelper.produced(requested,e);s.request(e);        }missed =addAndGet(-missed);if (missed ==0) {break;        }    }}

Since the queue is single-producer-single-consumer, itsclear() must be called from a single thread - which is provided by the serialization loop and is enabled by thegetAndIncrement() == 0 "half-loop" insidecancel().

An important note on the order of calls todone and thequeue's state:

booleand =done;Tv =queue.poll();

and

booleand =done;booleanempty =queue.isEmpty();

These must happen in the order specified. If they were swapped, it is possible when the drain runs asynchronously to anonNext/onComplete(), the queue may appear empty at first, then it gets elements followed bydone = true and a latedone check in the drain loop may complete the sequence thinking it delivered all values there was.

Single valued results

Sometimes an operator only emits one single value at some point instead of emitting more or all of its sources. Such operators includefromCallable,reduce,any,all,first, etc.

The classical queue-drain works here but is a bit of an overkill to allocate objects to store the work-in-progress counter, request accounting and the queue itself. These elements can be reduced to a single state-machine with one state counter object - often inlinded by extending AtomicInteger - and a plain field for storing the single value to be emitted.

The state machine handing the possible concurrent downstream requests and normal completion path is a bit complicated to show here and is quite easy to get wrong.

RxJava 2.x supports this kind of behavior through the (internal)DeferredScalarSubscription for operators without an upstream source (fromCallable) and the (internal)DeferredScalarSubscriber for reduce-like operators with an upstream source.

Using theDeferredScalarSubscription is straightforward, one creates it, sends it to the downstream viaonSubscribe and later on callscomplete(T) to signal the end with a single value:

DeferredScalarSubscription<Integer>dss =newDeferredScalarSubscription<>(child);child.onSubscribe(dss);dss.complete(1);

Using theDeferredScalarSubscriber requires more coding and extending the class itself:

finalclassCounterextendsDeferredScalarSubscriber<Object,Integer> {publicCounter(Subscriber<?superInteger>child) {super(child);value =0;hasValue =true;   }@OverridepublicvoidonNext(Objectt) {value++;   }}

By default, theDeferredScalarSubscriber.onSubscribe() requestsLong.MAX_VALUE from the upstream (but the method can be overridden in subclasses).

Single-element post-complete

Some operators have to modulate a sequence of elements in a 1:1 fashion but when the upstream terminates, they need to produce a final element followed by a terminal event (usuallyonComplete).

finalclassOnCompleteEndWithimplementsSubscriber<T>,Subscription {finalSubscriber<?superT>child;finalTfinalElement;Subscriptions;publicOnCompleteEndWith(Subscriber<?superT>child,TfinalElement) {this.child =child;this.finalElement =finalElement;    }@OverridepublicvoidonSubscribe(Subscriptions) {this.s =s;child.onSubscribe(this);    }@OverridepublicvoidonNext(Tt) {child.onNext(t);    }@OveridepublicvoidonError(Throwablet) {child.onError(t);    }@OverridepublicvoidonComplete() {child.onNext(finalElement);child.onComplete();    }@Overridepublicvoidrequest(longn) {s.request(n);    }@Overridepublicvoidcancel() {s.cancel();    }}

This works if the downstream request more than the upstream produces + 1, otherwise the call toonComplete may overflow the childSubscriber.

Heavyweight solutions such as queue-drain orSubscriptionArbiter withScalarSubscriber can be used here, however, there is a more elegant solution to the problem.

The idea is that request amounts occupy only 63 bits of a 64 bit (atomic) long type. If we'd mask out the lower 63 bits when working with the amount, we can use the most significant bit to indicate the upstream sequence has finished and then on, any 0 to n request amount change can trigger the emission of thefinalElement. Since a downstreamrequest() can race with an upstreamonComplete, marking the bit atomically via a compare-and-set ensures correct state transition.

For this, theOnCompleteEndWith has to be changed by adding anAtomicLong for accounting requests, a long for counting the production, then updatingrequest() andonComplete() methods:

finalclassOnCompleteEndWithextendsAtomicLongimplementsFlowableSubscriber<T>,Subscription {finalSubscriber<?superT>child;finalTfinalElement;Subscriptions;longproduced;staticfinalclasslongREQUEST_MASK =Long.MAX_VALUE;// 0b01111...111LstaticfinalclasslongCOMPLETE_MASK =Long.MIN_VALUE;// 0b10000...000LpublicOnCompleteEndWith(Subscriber<?superT>child,TfinalElement) {this.child =child;this.finalElement =finalElement;    }@OverridepublicvoidonSubscribe(Subscriptions) { ... }@OverridepublicvoidonNext(Tt) {produced++;// <------------------------child.onNext(t);    }@OveridepublicvoidonError(Throwablet) { ... }@OverridepublicvoidonComplete() {longp =produced;if (p !=0L) {produced =0L;BackpressureHelper.produced(this,p);        }for (;;) {longcurrent =get();if ((current &COMPLETE_MASK) !=0) {break;            }if ((current &REQUEST_MASK) !=0) {lazySet(Long.MIN_VALUE +1);child.onNext(finalElement);child.onComplete();return;            }if (compareAndSet(current,COMPLETE_MASK)) {break;            }        }    }@Overridepublicvoidrequest(longn) {for (;;) {longcurrent =get();if ((current &COMPLETE_MASK) !=0) {if (compareAndSet(current,COMPLETE_MASK +1)) {child.onNext(finalElement);child.onComplete();                }break;            }longu =BackpressureHelper.addCap(current,n);if (compareAndSet(current,u)) {s.request(n);break;            }        }    }@Overridepublicvoidcancel() { ... }}

RxJava 2 has a couple of operators,materialize,mapNotification,onErrorReturn, that require this type of behavior and for that, the (internal)SinglePostCompleteSubscriber class captures the algorithms above:

finalclassOnCompleteEndWith<T>extendsSinglePostCompleteSubscriber<T,T> {finalSubscriber<?superT>child;publicOnCompleteEndWith(Subscriber<?superT>child,TfinalElement) {this.child =child;this.value =finalElement;    }@OverridepublicvoidonNext(Tt) {produced++;// <------------------------child.onNext(t);    }@OveridepublicvoidonError(Throwablet) { ... }@OverridepublicvoidonComplete() {complete(value);    }}

Multi-element post-complete

Certain operators may need to emit multiple elements after the main sequence completes, which may or may not relay elements from the live upstream before its termination. An example operator isbuffer(int, int) when the skip < size yielding overlapping buffers. In this operator, it is possible when the upstream completes, several overlapping buffers are waiting to be emitted to the child but that has to happen only when the child actually requested more buffers.

The state machine for this case is complicated but RxJava has two (internal) utility methods onQueueDrainHelper for dealing with the situation:

<T>voidpostComplete(Subscriber<?superT>actual,Queue<T>queue,AtomicLongstate,BooleanSupplierisCancelled);<T>booleanpostCompleteRequest(longn,Subscriber<?superT>actual,Queue<T>queue,AtomicLongstate,BooleanSupplierisCancelled);

They take the childSubscriber, the queue to drain from, the state holding the current request amount and a callback to see if the downstream cancelled the sequence.

Usage of these methods is as follows:

finalclassEmitTwice<T>extendsAtomicLongimplementsFlowableSubscriber<T>,Subscription,BooleanSupplier {finalSubscriber<?superT>child;finalArrayDeque<T>buffer;volatilebooleancancelled;Subscriptions;longproduced;publicEmitTwice(Subscriber<?superT>child) {this.child =child;this.buffer =newArrayDeque<>();    }@OverridepublicvoidonSubscribe(Subscriptions) {this.s =s;child.onSubscribe(this);    }@OverridepublicvoidonNext(Tt) {produced++;buffer.offer(t);child.onNext(t);    }@OverridepublicvoidonError(Throwablet) {buffer.clear();child.onError(t);    }@OverridepublicvoidonComplete() {longp =produced;if (p !=0L) {produced =0L;BackpressureHelper.produced(this,p);        }QueueDrainHelper.postComplete(child,buffer,this,this);    }@OverridepublicbooleangetAsBoolean() {returncancelled;    }@Overridepublicvoidcancel() {cancelled =true;s.cancel();    }@Overridepublicvoidrequest(longn) {if (!QueueDrainHelper.postCompleteRequest(n,child,buffer,this,this)) {s.request(n);        }    }}

Creating operator classes

Creating operator implementations in 2.x is simpler than in 1.x and incurs less allocation as well. You have the choice to implement your operator as aSubscriber-transformer to be used vialift or as a fully-fledged base reactive class.

Operator by extending a base reactive class

In 1.x, extendingObservable was possible but convoluted because you had to implement theOnSubscribe interface separately and pass it toObservable.create() or to theObservable(OnSubscribe) protected constructor.

In 2.x, all base reactive classes are abstract and you can extend them directly without any additional indirection:

publicfinalclassFlowableMyOperatorextendsFlowable<Integer> {finalPublisher<Integer>source;publicFlowableMyOperator(Publisher<Integer>source) {this.source =source;    }@OverrideprotectedvoidsubscribeActual(Subscriber<?superInteger>s) {source.map(v ->v +1).subscribe(s);    }}

When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially withFlowable operators and other Reactive-StreamsPublishers). To recap, these are the class-interface pairs:

  • Flowable -Publisher -FlowableSubscriber/Subscriber
  • Observable -ObservableSource -Observer
  • Single -SingleSource -SingleObserver
  • Completable -CompletableSource -CompletableObserver
  • Maybe -MaybeSource -MaybeObserver

RxJava 2.x locks downFlowable.subscribe (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given thesubscribeActual() to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respectiveSubscriber due to lifecycle restrictions of the Reactive-Streams specification and sends it to the global error consumer viaRxJavaPlugins.onError.

Unlike in 1.x, In the example above, the incomingSubscriber is simply used directly for subscribing again (but still at most once) without any kind of wrapping. In 1.x, one needs to callSubscribers.wrap to avoid double calls toonStart and cause unexpected double initialization or double-requesting.

Unless one contributes a new operator to RxJava, working with such classes may become tedious, especially if they are intermediate operators:

newFlowableThenSome(newFlowableOther(newFlowableMyOperator(Flowable.range(1,10).map(v ->v *v))    ))

This is an unfortunate effect of Java lacking extension method support. A possible ease on this burden is by usingcompose to have fluent inline application of the custom operator:

Flowable.range(1,10).map(v ->v *v).compose(f ->newFlowableOperatorWithParameter(f,10));Flowable.range(1,10).map(v ->v *v).compose(FlowableMyOperator::new);

Operator targeting lift()

The alternative to the fluent application problem is to have aSubscription-transformer implemented instead of extending the whole reactive base class and use the respective type'slift() operator to get it into the sequence.

First one has to implement the respectiveXOperator interface:

publicfinalclassMyOperatorimplementsFlowableOperator<Integer,Integer> {@OverridepublicSubscriber<?superInteger>apply(Subscriber<?superInteger>child) {returnnewOp(child);    }staticfinalclassOpimplementsFlowableSubscriber<Integer>,Subscription {finalSubscriber<?superInteger>child;Subscriptions;publicOp(Subscriber<?superInteger>child) {this.child =child;        }@OverridepubicvoidonSubscribe(Subscriptions) {this.s =s;child.onSubscribe(this);        }@OverridepublicvoidonNext(Integerv) {child.onNext(v *v);        }@OverridepublicvoidonError(Throwablee) {child.onError(e);        }@OverridepublicvoidonComplete() {child.onComplete();        }@Overridepublicvoidcancel() {s.cancel();        }@Overridepublicvoidrequest(longn) {s.request(n);        }    }}

You may recognize that implementing operators via extension or lifting looks quite similar. In both cases, one usually implements aFlowableSubscriber (Observer, etc) that takes a downstreamSubscriber, implements the business logic in theonXXX methods and somehow (manually or as part oflift()'s lifecycle) gets subscribed to an upstream source.

The benefit of applying the Reactive-Streams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x becauseSubscriber andSingleSubscriber are classes themselves, plusSubscriber.request() is a protected-final method and an operator'sSubscriber can't implement theProducer interface at the same time. In 2.x there is no such problem and one can have bothSubscriber,Subscription or evenObserver together in the same consumer type.

Operator fusion

Operator fusion has the premise that certain operators can be combined into one single operator (macro-fusion) or their internal data structures shared between each other (micro-fusion) that allows fewer allocations, lower overhead and better performance.

This advanced concept was invented, worked out and studied in theReactive-Streams-Commons research project manned by the leads of RxJava and Project Reactor. Both libraries use the results in their implementation, which look the same but are incompatible due to different classes and packages involved. In addition, RxJava 2.x's approach is a more polished version of the invention due to delays between the two project's development.

Since operator-fusion is optional, you may chose to not bother making your operator fusion-enabled. TheDeferredScalarSubscription is fusion-enabled and needs no additional development in this regard though.

If you chose to ignore operator-fusion, you still have to follow the requirement of never forwarding aSubscription/Disposable coming throughonSubscribe ofSubscriber/Observer as this may break the fusion protocol and may skip your operator's business logic entirely:

finalclassSomeOp<T>implementsSubscriber<T>,Subscription {// ...Subscriptions;publicvoidonSubscribe(Subscriptions) {this.s =s;child.onSubscribe(this);// <---------------------------    }@Overridepublicvoidcancel() {s.cancel();    }@Overridepublicvoidrequest(longn) {s.request(n);    }// ...}

Yes, this adds one more indirection between operators but it is still cheap (and would be necessary for the operator anyway) but enables huge performance gains with the right chain of operators.

Generations

Given this novel approach, a generation number can be assigned to various implementation styles of reactive architectures:

Generation 0

These are the classical libraries that either usejava.util.Observable or are listener based (Java Swing'sActionListener). Their common property is that they don't support composition (of events and cancellation). See alsoGoogle Agera.

Generation 1

This is the level of theRx.NET library (even up to 3.x) that supports composition, but has no notion for backpressure and doesn't properly support synchronous cancellation. Many JavaScript libraries such asRxJS 5 are still on this level. See alsoGoogle gRPC.

Generation 2

This is whatRxJava 1.x is categorized, it supports composition, backpressure and synchronous cancellation along with the ability to lift an operator into a sequence.

Generation 3

This is the level of the Reactive-Streams based libraries such asReactor 2 andAkka-Stream. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything fromsubscribe()). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch.

Generation 4

This level expands upon the Reactive-Streams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them).Reactor 3 andRxJava 2 are at this level. The material aroundAkka-Stream mentions operator-fusion as well, however,Akka-Stream is not a native Reactive-Streams implementation (requires a materializer to get aPublisher out) and as such it is only Gen 3.

There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in Reactive-Streams 2.0 specification (or in a neighboring extension) and haveRxJava 3 andReactor 4 work together on that aspect as well.

Components

Callable and ScalarCallable

CertainFlowable sources, similar toSingle orCompletable are known to ever emit zero or one item and that single item is known to be constant or is computed synchronously. Well known examples of this arejust(),empty() andfromCallable. Subscribing to these sources, like any other sources, adds the same infrastructure overhead which can often be avoided if the consumer could just pick or have the item calculated on the spot.

For example,just andempty appears as the mapping result of aflatMap operation:

source.flatMap(v -> {if (v %2 ==0) {returnjust(v);    }returnempty();})

Here, if we'd somehow recognize thatempty() won't emit a value but onlyonComplete we could simply avoid subscribing to it insideflatMap, saving on the overhead. Similarly, recognizing thatjust emits exactly one item we can route it differently insideflatMap and again, avoiding creating a lot of objects to get to the same single item.

In other times, knowing the emission property can simplify or chose a different operator instead of the applied one. For example, applyingflatMap to anempty() source has no use since there won't be any item to be flattened into a sequence; the whole flattened sequence is going to be empty. Knowing that a source isjust toflatMap, there is no need for the complicated inner mechanisms as there is going to be only one mapped inner source and one can subscribe the downstream'sSubscriber to it directly.

Flowable.just(1).flatMap(v ->Flowable.range(v,5)).subscribe(...);// in some specialized operator:Tvalue;// from just()@OverridepublicvoidsubscribeActual(Subscriber<?superT>s) {mapper.apply(value).subscribe(s);}

There could be other sources with these properties, therefore, RxJava 2 uses theio.reactivex.internal.fusion.ScalarCallable andjava.util.Callable interfaces to indicate a source is a constant or sequentially computable. When a sourceFlowable orObservable is marked with one of these interfaces, many fusion enabled operators will perform special actions to avoid the overhead of a normal and general source.

We use Java's own and preexistingjava.util.Callable interface to indicate a synchronously computable source. TheScalarCallable is an extension to this interface by which it suppresses thethrows Exception ofCallable.call():

interfaceCallable<T> {Tcall()throwsException;}interfaceScalarCallable<T>extendsCallable<T> {@OverrideTcall();}

The reason for the two separate interfaces is that if a source is constant, likejust, one can perform assembly-time optimizations with it knowing that each regularsubscribe invocation would have resulted in the same single value.

Callable denotes sources, such asfromCallable that indicates the single value has to be calculated at runtime of the flow. By this logic, you can see thatScalarCallable is aCallable on its own right because the constant can be "calculated" as late as the runtime phase of the flow.

Since Reactive-Streams forbids usingnulls as emission values, we can usenull in(Scalar)Callable marked sources to indicate there is no value to be emitted, thus one can't mistake an user'snull with the empty indicatornull. For example, this is howempty() is implemented:

finalclassFlowableEmptyextendsFlowable<Object>implementsScalarCallable<Object> {@OverridepublicvoidsubscribeActual(Subscriber<?superT>s) {EmptySubscription.complete(s);    }@OverridepublicObjectcall() {returnnull;// interpreted as no value available    }}

Sources implementingCallable may throw checked exceptions fromcall() which is handled by the consumer operators as an indication to signalonError in an operator specific manner (such as delayed).

finalclassFlowableIOExceptionextendsFlowable<Object>implementsCallable<Object> {@OverridepublicvoidsubscribeActual(Subscriber<?superT>s) {EmptySubscription.error(newIOException(),s);    }@OverridepublicObjectcall()throwsException {thrownewIOException();    }}

However, implementors ofScalarCallable should avoid throwing any exception and limit the code incall() be constant or simple computation that can be legally executed during assembly time.

As the consumer of sources, one may want to deal with such kind of specialFlowables orObservables. For example, if you create an operator that can leverage the knowledge of a single element source as its main input, you can check the types and extract the value of aScalarCallable at assembly time right in the operator:

// Flowable.javapublicfinalFlowable<Integer>plusOne() {if (thisinstanceofScalarCallable) {Integervalue = ((ScalarCallable<Integer>)this).call();if (value ==null) {returnempty();        }returnjust(value +1);    }returncast(Integer.class).map(v ->v +1);}

or as aFlowableTransformer:

FlowableTransformer<Integer,Integer>plusOneTransformer =source -> {if (sourceinstanceofScalarCallable) {Integervalue = ((ScalarCallable<Integer>)source).call();if (value ==null) {returnempty();        }returnjust(value +1);    }returnsource.map(v ->v +1);};

However, it is not mandatory to handleScalarCallables andCallables separately. Since the former extends the latter, the type check can be deferred till subscription time and handled with the same code path:

finalclassFlowablePlusOneextendsFlowable<Integer> {finalPublisher<Integer>source;FlowablePlusOne(Publisher<Integer>source) {this.source =source;    }@OverridepublicvoidsubscribeActual(Subscriber<?superInteger>s) {if (sourceinstanceofCallable) {Integervalue;try {value = ((Callable<Integer>)source).call();            }catch (Throwableex) {Exceptions.throwIfFatal(ex);EmptySubscription.error(ex,s);return;            }s.onSubscribe(newScalarSubscription<Integer>(s,value +1));        }else {newFlowableMap<>(source,v ->v +1).subscribe(s);        }    }}

ConditionalSubscriber

TBD

QueueSubscription and QueueDisposable

TBD

Example implementations

TBD

map +filter hybrid

TBD

Orderedmerge

TBD

Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

Clone this wiki locally


[8]ページ先頭

©2009-2025 Movatter.jp