Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Reactive Streams Specification for the JVM

License

NotificationsYou must be signed in to change notification settings

reactive-streams/reactive-streams-jvm

Repository files navigation

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

The latest release is available on Maven Central as

<dependency>  <groupId>org.reactivestreams</groupId>  <artifactId>reactive-streams</artifactId>  <version>1.0.4</version></dependency><dependency>  <groupId>org.reactivestreams</groupId>  <artifactId>reactive-streams-tck</artifactId>  <version>1.0.4</version>  <scope>test</scope></dependency>

Goals, Design and Scope

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the backpressure signals were synchronous (see also theReactive Manifesto), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.

It should be noted that the precise nature of stream manipulations (transformation, splitting, merging, etc.) is not covered by this specification. Reactive Streams are only concerned with mediating the stream of data between differentAPI Components. In their development care has been taken to ensure that all basic ways of combining streams can be expressed.

In summary, Reactive Streams is a standard and specification for Stream-oriented libraries for the JVM that

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

The Reactive Streams specification consists of the following parts:

The API specifies the types to implement Reactive Streams and achieve interoperability between different implementations.

The Technology Compatibility Kit (TCK) is a standard test suite for conformance testing of implementations.

Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK.

API Components

The API consists of the following components that are required to be provided by Reactive Stream implementations:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

APublisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).

In response to a call toPublisher.subscribe(Subscriber) the possible invocation sequences for methods on theSubscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means thatonSubscribe is always signalled,followed by a possibly unbounded number ofonNext signals (as requested bySubscriber) followed by anonError signal if there is a failure, or anonComplete signal when no more elements are available—all as long as theSubscription is not cancelled.

NOTES

Glossary

TermDefinition
SignalAs a noun: one of theonSubscribe,onNext,onComplete,onError,request(n) orcancel methods. As a verb: calling/invoking a signal.
DemandAs a noun, the aggregated number of elements requested by a Subscriber which is yet to be delivered (fulfilled) by the Publisher. As a verb, the act ofrequest-ing more elements.
Synchronous(ly)Executes on the calling Thread.
Return normallyOnly ever returns a value of the declared type to the caller. The only legal way to signal failure to aSubscriber is via theonError method.
ResponsivityReadiness/ability to respond. In this document used to indicate that the different components should not impair each others ability to respond.
Non-obstructingQuality describing a method which is as quick to execute as possible—on the calling thread. This means, for example, avoids heavy computations and other things that would stall the caller´s thread of execution.
Terminal stateFor a Publisher: WhenonComplete oronError has been signalled. For a Subscriber: When anonComplete oronError has been received.
NOPExecution that has no detectable effect to the calling thread, and can as such safely be called any number of times.
Serial(ly)In the context of aSignal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
Thread-safeCan be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness.

SPECIFICATION

1. Publisher (Code)

publicinterfacePublisher<T> {publicvoidsubscribe(Subscriber<?superT>s);}
IDRule
1The total number ofonNext´s signalled by aPublisher to aSubscriber MUST be less than or equal to the total number of elements requested by thatSubscriber´sSubscription at all times.
💡The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.
2APublisher MAY signal feweronNext than requested and terminate theSubscription by callingonComplete oronError.
💡The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.
3onSubscribe,onNext,onError andonComplete signaled to aSubscriber MUST be signaledserially.
💡The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.
4If aPublisher fails it MUST signal anonError.
💡The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.
5If aPublisher terminates successfully (finite stream) it MUST signal anonComplete.
💡The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers that it has reached aterminal state—Subscribers can then act on this information; clean up resources, etc.
6If aPublisher signals eitheronError oronComplete on aSubscriber, thatSubscriber’sSubscription MUST be considered cancelled.
💡The intent of this rule is to make sure that a Subscription is treated the same no matter if it was cancelled, the Publisher signalled onError or onComplete.
7Once aterminal state has been signaled (onError,onComplete) it is REQUIRED that no further signals occur.
💡The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.
8If aSubscription is cancelled itsSubscriber MUST eventually stop being signaled.
💡The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason foreventually is because signals can have propagation delay due to being asynchronous.
9Publisher.subscribe MUST callonSubscribe on the providedSubscriber prior to any other signals to thatSubscriber and MUSTreturn normally, except when the providedSubscriber isnull in which case it MUST throw ajava.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject theSubscriber) is by callingonError (after callingonSubscribe).
💡The intent of this rule is to make sure thatonSubscribe is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. AlsoonSubscribe MUST only be called at most once, [see2.12]. If the suppliedSubscriber isnull, there is nowhere else to signal this but to the caller, which means ajava.lang.NullPointerException must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in aterminal state.
10Publisher.subscribe MAY be called as many times as wanted but MUST be with a differentSubscriber each time [see2.12].
💡The intent of this rule is to have callers ofsubscribe be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics ofsubscribe must be upheld no matter how many times it is called.
11APublisher MAY support multipleSubscribers and decides whether eachSubscription is unicast or multicast.
💡The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.

2. Subscriber (Code)

publicinterfaceSubscriber<T> {publicvoidonSubscribe(Subscriptions);publicvoidonNext(Tt);publicvoidonError(Throwablet);publicvoidonComplete();}
IDRule
1ASubscriber MUST signal demand viaSubscription.request(long n) to receiveonNext signals.
💡The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.
2If aSubscriber suspects that its processing of signals will negatively impact itsPublisher´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals.
💡The intent of this rule is that a Subscriber shouldnot obstruct the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.
3Subscriber.onComplete() andSubscriber.onError(Throwable t) MUST NOT call any methods on theSubscription or thePublisher.
💡The intent of this rule is to prevent cycles and race-conditions—between Publisher, Subscription and Subscriber—during the processing of completion signals.
4Subscriber.onComplete() andSubscriber.onError(Throwable t) MUST consider the Subscription cancelled after having received the signal.
💡The intent of this rule is to make sure that Subscribers respect a Publisher’sterminal state signals. A Subscription is simply not valid anymore after an onComplete or onError signal has been received.
5ASubscriber MUST callSubscription.cancel() on the givenSubscription after anonSubscribe signal if it already has an activeSubscription.
💡The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.
6ASubscriber MUST callSubscription.cancel() if theSubscription is no longer needed.
💡The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to callcancel so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.
7A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performedserially.
💡The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if aserial relation between each of the calls is established.
8ASubscriber MUST be prepared to receive one or moreonNext signals after having calledSubscription.cancel() if there are still requested elements pending [see3.12].Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.
💡The intent of this rule is to highlight that there may be a delay between callingcancel and the Publisher observing that cancellation.
9ASubscriber MUST be prepared to receive anonComplete signal with or without a precedingSubscription.request(long n) call.
💡The intent of this rule is to establish that completion is unrelated to the demand flow—this allows for streams which complete early, and obviates the need topoll for completion.
10ASubscriber MUST be prepared to receive anonError signal with or without a precedingSubscription.request(long n) call.
💡The intent of this rule is to establish that Publisher failures may be completely unrelated to signalled demand. This means that Subscribers do not need to poll to find out if the Publisher will not be able to fulfill its requests.
11ASubscriber MUST make sure that all calls on itssignal methods happen-before the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic.
💡The intent of this rule is to establish that it is the responsibility of the Subscriber implementation to make sure that asynchronous processing of its signals are thread safe. SeeJMM definition of Happens-Before in section 17.4.5.
12Subscriber.onSubscribe MUST be called at most once for a givenSubscriber (based on object equality).
💡The intent of this rule is to establish that it MUST be assumed that the same Subscriber can only be subscribed at most once. Note thatobject equality isa.equals(b).
13CallingonSubscribe,onNext,onError oronComplete MUSTreturn normally except when any provided parameter isnull in which case it MUST throw ajava.lang.NullPointerException to the caller, for all other situations the only legal way for aSubscriber to signal failure is by cancelling itsSubscription. In the case that this rule is violated, any associatedSubscription to theSubscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
💡The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.

3. Subscription (Code)

publicinterfaceSubscription {publicvoidrequest(longn);publicvoidcancel();}
IDRule
1Subscription.request andSubscription.cancel MUST only be called inside of itsSubscriber context.
💡The intent of this rule is to establish that a Subscription represents the unique relationship between a Subscriber and a Publisher [see2.12]. The Subscriber is in control over when elements are requested and when more elements are no longer needed.
2TheSubscription MUST allow theSubscriber to callSubscription.request synchronously from withinonNext oronSubscribe.
💡The intent of this rule is to make it clear that implementations ofrequest must be reentrant, to avoid stack overflows in the case of mutual recursion betweenrequest andonNext (and eventuallyonComplete /onError). This implies that Publishers can besynchronous, i.e. signallingonNext´s on the thread which callsrequest.
3Subscription.request MUST place an upper bound on possible synchronous recursion betweenPublisher andSubscriber.
💡The intent of this rule is to complement [see3.2] by placing an upper limit on the mutual recursion betweenrequest andonNext (and eventuallyonComplete /onError). Implementations are RECOMMENDED to limit this mutual recursion to a depth of1 (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling thread´s stack.
4Subscription.request SHOULD respect the responsivity of its caller by returning in a timely manner.
💡The intent of this rule is to establish thatrequest is intended to be anon-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution.
5Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST bethread-safe.
💡The intent of this rule is to establish thatcancel is intended to be anon-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution. Furthermore, it is also important that it is possible to call it multiple times without any adverse effects.
6After theSubscription is cancelled, additionalSubscription.request(long n) MUST beNOPs.
💡The intent of this rule is to establish a causal relationship between cancellation of a subscription and the subsequent non-operation of requesting more elements.
7After theSubscription is cancelled, additionalSubscription.cancel() MUST beNOPs.
💡The intent of this rule is superseded by3.5.
8While theSubscription is not cancelled,Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
💡The intent of this rule is to make sure thatrequest-ing is an additive operation, as well as ensuring that a request for elements is delivered to the Publisher.
9While theSubscription is not cancelled,Subscription.request(long n) MUST signalonError with ajava.lang.IllegalArgumentException if the argument is <= 0. The cause message SHOULD explain that non-positive request signals are illegal.
💡The intent of this rule is to prevent faulty implementations to proceed operation without any exceptions being raised. Requesting a negative or 0 number of elements, since requests are additive, most likely to be the result of an erroneous calculation on the behalf of the Subscriber.
10While theSubscription is not cancelled,Subscription.request(long n) MAY synchronously callonNext on this (or other) subscriber(s).
💡The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
11While theSubscription is not cancelled,Subscription.request(long n) MAY synchronously callonComplete oronError on this (or other) subscriber(s).
💡The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
12While theSubscription is not cancelled,Subscription.cancel() MUST request thePublisher to eventually stop signaling itsSubscriber. The operation is NOT REQUIRED to affect theSubscription immediately.
💡The intent of this rule is to establish that the desire to cancel a Subscription is eventually respected by the Publisher, acknowledging that it may take some time before the signal is received.
13While theSubscription is not cancelled,Subscription.cancel() MUST request thePublisher to eventually drop any references to the corresponding subscriber.
💡The intent of this rule is to make sure that Subscribers can be properly garbage-collected after their subscription no longer being valid. Re-subscribing with the same Subscriber object is discouraged [see2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely.
14While theSubscription is not cancelled, callingSubscription.cancel MAY cause thePublisher, if stateful, to transition into theshut-down state if no otherSubscription exists at this point [see1.9].
💡The intent of this rule is to allow for Publishers to signalonComplete oronError followingonSubscribe for new Subscribers in response to a cancellation signal from an existing Subscriber.
15CallingSubscription.cancel MUSTreturn normally.
💡The intent of this rule is to disallow implementations to throw exceptions in response tocancel being called.
16CallingSubscription.request MUSTreturn normally.
💡The intent of this rule is to disallow implementations to throw exceptions in response torequest being called.
17ASubscription MUST support an unbounded number of calls torequest and MUST support a demand up to 2^63-1 (java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by thePublisher as “effectively unbounded”.
💡The intent of this rule is to establish that the Subscriber can request an unbounded number of elements, in any increment above 0 [see3.9], in any number of invocations ofrequest. As it is not feasibly reachable with current or foreseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1, it is allowed for a Publisher to stop tracking demand beyond this point.

ASubscription is shared by exactly onePublisher and oneSubscriber for the purpose of mediating the data exchange between this pair. This is the reason why thesubscribe() method does not return the createdSubscription, but instead returnsvoid; theSubscription is only passed to theSubscriber via theonSubscribe callback.

4.Processor (Code)

publicinterfaceProcessor<T,R>extendsSubscriber<T>,Publisher<R> {}
IDRule
1AProcessor represents a processing stage—which is both aSubscriber and aPublisher and MUST obey the contracts of both.
💡The intent of this rule is to establish that Processors behave, and are bound by, both the Publisher and Subscriber specifications.
2AProcessor MAY choose to recover anonError signal. If it chooses to do so, it MUST consider theSubscription cancelled, otherwise it MUST propagate theonError signal to its Subscribers immediately.
💡The intent of this rule is to inform that it’s possible for implementations to be more than simple transformations.

While not mandated, it can be a good idea to cancel aProcessor´s upstreamSubscription when/if its lastSubscriber cancels theirSubscription,to let the cancellation signal propagate upstream.

Asynchronous vs Synchronous Processing

The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError,onComplete) MUST NOTblock thePublisher. However, each of theon* handlers can process the events synchronously or asynchronously.

Take this example:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

It has an async origin and an async destination. Let’s assume that both origin and destination are selector event loops. TheSubscription.request(n) must be chained from the destination to the origin. This is now where each implementation can choose how to do this.

The following uses the pipe| character to signal async boundaries (queue and schedule) andR# to represent resources (possibly threads).

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

In this example each of the 3 consumers,map,filter andconsumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)------------------- R1 ----------------- | ---------- R2 ----------------

Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. Themap andfilter steps are synchronously performed on the origin thread.

Or another implementation could fuse the operations to the final consumer:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)--------- R1 ---------- | ------------------ R2 -------------------------

All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity.

The Reactive Streams contract allows implementations the flexibility to manage resources and scheduling and mix asynchronous and synchronous processing within the bounds of a non-blocking, asynchronous, dynamic push-pull stream.

In order to allow fully asynchronous implementations of all participating API elements—Publisher/Subscription/Subscriber/Processor—all methods defined by these interfaces returnvoid.

Subscriber controlled queue bounds

One of the underlying design principles is that all buffer sizes are to be bounded and these bounds must beknown andcontrolled by the subscribers. These bounds are expressed in terms ofelement count (which in turn translates to the invocation count of onNext). Any implementation that aims to support infinite streams (especially high output rate streams) needs to enforce bounds all along the way to avoid out-of-memory errors and constrain resource usage in general.

Since back-pressure is mandatory the use of unbounded buffers can be avoided. In general, the only time when a queue might grow without bounds is when the publisher side maintains a higher rate than the subscriber for an extended period of time, but this scenario is handled by backpressure instead.

Queue bounds can be controlled by a subscriber signaling demand for the appropriate number of elements. At any point in time the subscriber knows:

  • the total number of elements requested:P
  • the number of elements that have been processed:N

Then the maximum number of elements that may arrive—until more demand is signaled to the Publisher—isP - N. In the case that the subscriber also knows the number of elements B in its input buffer then this bound can be refined toP - B - N.

These bounds must be respected by a publisher independent of whether the source it represents can be backpressured or not. In the case of sources whose production rate cannot be influenced—for example clock ticks or mouse movement—the publisher must choose to either buffer or drop elements to obey the imposed bounds.

Subscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips).

Legal

This project is a collaboration between engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter and many others. This project is licensed under MIT No Attribution (SPDX: MIT-0).

About

Reactive Streams Specification for the JVM

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors33

Languages


[8]ページ先頭

©2009-2025 Movatter.jp