Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Reactive Streams for .NET

License

NotificationsYou must be signed in to change notification settings

reactive-streams/reactive-streams-dotnet

Repository files navigation

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

The latest release isavailable on NuGet.

To install Reactive Streams, run the following command in the Package Manager Console

PM> Install-Package Reactive.Streams

Goals, Design and Scope

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also theReactive Manifesto), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.

It should be noted that the precise nature of stream manipulations (transformation, splitting, merging, etc.) is not covered by this specification. Reactive Streams are only concerned with mediating the stream of data between differentAPI Components. In their development care has been taken to ensure that all basic ways of combining streams can be expressed.

In summary, Reactive Streams .NET is a standard and specification for Stream-oriented libraries for .NET that

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

The Reactive Streams specification consists of the following parts:

The API specifies the types to implement Reactive Streams and achieve interoperability between different implementations.

The Technology Compatibility Kit (TCK) is a standard test suite for conformance testing of implementations.

Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK.

API Components

The API consists of the following components that are required to be provided by Reactive Stream implementations:

  1. IPublisher
  2. ISubscriber
  3. ISubscription
  4. IProcessor

AnIPublisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its ISubscriber(s).

In response to a call toIPublisher.Subscribe(ISubscriber) the possible invocation sequences for methods on theISubscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means thatonSubscribe is always signalled,followed by a possibly unbounded number ofonNext signals (as requested byISubscriber) followed by anonError signal if there is a failure, or anonComplete signal when no more elements are available—all as long as theISubscription is not cancelled.

NOTES

  • The specifications below use binding words in capital letters fromhttps://www.ietf.org/rfc/rfc2119.txt
  • The termsemit,signal orsend are interchangeable. The specifications below will usesignal.
  • The termssynchronously orsynchronous refer to executing in the callingThread.
  • The term "return normally" means "only throws exceptions that are explicitly allowed by the rule".

SPECIFICATION

1. IPublisher (Code)

publicinterfaceIPublisher<outT>{voidSubscribe(ISubscriber<T>subscriber);}
IDRule
1The total number ofonNext signals sent by anIPublisher to anISubscriber MUST be less than or equal to the total number of elements requested by thatISubscriber´sISubscription at all times.
2AnIPublisher MAY signal lessonNext than requested and terminate theISubscription by callingOnComplete orOnError.
3onSubscribe,onNext,onError andonComplete signaled to anISubscriber MUST be signaled sequentially (no concurrent notifications).
4If anIPublisher fails it MUST signal anonError.
5If anIPublisher terminates successfully (finite stream) it MUST signal anonComplete.
6If anIPublisher signals eitheronError oronComplete on anISubscriber, thatISubscriber’sISubscription MUST be considered cancelled.
7Once a terminal state has been signaled (onError,onComplete) it is REQUIRED that no further signals occur.
8If anISubscription is cancelled itsISubscriber MUST eventually stop being signaled.
9IPublisher.Subscribe MUST callOnSubscribe on the providedISubscriber prior to any other signals to thatISubscriber and MUST return normally, except when the providedISubscriber isnull in which case it MUST throw aSystem.ArgumentNullException to the caller, for all other situations[1] the only legal way to signal failure (or reject theISubscriber) is by callingOnError (after callingOnSubscribe).
10IPublisher.Subscribe MAY be called as many times as wanted but MUST be with a differentISubscriber each time [see2.12].
11AnIPublisher MAY support multipleISubscribers and decides whether eachISubscription is unicast or multicast.

[1] : A stateful IPublisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

2. ISubscriber (Code)

publicinterfaceISubscriber<inT>{publicvoidOnSubscribe(ISubscriptionsubscription);publicvoidOnNext(Telement);publicvoidOnError(Exceptioncause);publicvoidOnComplete();}
IDRule
1AnISubscriber MUST signal demand viaISubscription.Request(long n) to receiveonNext signals.
2If anISubscriber suspects that its processing of signals will negatively impact itsIPublisher's responsivity, it is RECOMMENDED that it asynchronously dispatches its signals.
3ISubscriber.OnComplete() andISubscriber.OnError(Exception cause) MUST NOT call any methods on theISubscription or theIPublisher.
4ISubscriber.OnComplete() andISubscriber.OnError(Exception cause) MUST consider the ISubscription cancelled after having received the signal.
5AnISubscriber MUST callISubscription.Cancel() on the givenISubscription after anonSubscribe signal if it already has an activeISubscription.
6AnISubscriber MUST callISubscription.Cancel() if it is no longer valid to theIPublisher without theIPublisher having signaledonError oronComplete.
7AnISubscriber MUST ensure that all calls on itsISubscription take place from the same thread or provide for respective external synchronization.
8AnISubscriber MUST be prepared to receive one or moreonNext signals after having calledISubscription.Cancel() if there are still requested elements pending [see3.12].ISubscription.Cancel() does not guarantee to perform the underlying cleaning operations immediately.
9AnISubscriber MUST be prepared to receive anonComplete signal with or without a precedingISubscription.Request(long n) call.
10AnISubscriber MUST be prepared to receive anonError signal with or without a precedingISubscription.Request(long n) call.
11AnISubscriber MUST make sure that all calls on itsOnXXX methods happen-before[1] the processing of the respective signals. I.e. the ISubscriber must take care of properly publishing the signal to its processing logic.
12ISubscriber.OnSubscribe MUST be called at most once for a givenISubscriber (based on object equality).
13CallingOnSubscribe,OnNext,OnError orOnComplete MUST return normally except when any provided parameter isnull in which case it MUST throw aSystem.ArgumentNullException to the caller, for all other situations the only legal way for anISubscriber to signal failure is by cancelling itsISubscription. In the case that this rule is violated, any associatedISubscription to theISubscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

[1] : See JMM definition of Happen-Before in section 17.4.5. onhttp://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html

3. ISubscription (Code)

publicinterfaceISubscription{publicvoidRequest(longn);publicvoidCancel();}
IDRule
1ISubscription.Request andISubscription.Cancel MUST only be called inside of itsISubscriber context. AnISubscription represents the unique relationship between anISubscriber and anIPublisher [see2.12].
2TheISubscription MUST allow theISubscriber to callISubscription.Request synchronously from withinOnNext orOnSubscribe.
3ISubscription.Request MUST place an upper bound on possible synchronous recursion betweenIPublisher andISubscriber[1].
4ISubscription.Request SHOULD respect the responsivity of its caller by returning in a timely manner[2].
5ISubscription.Cancel MUST respect the responsivity of its caller by returning in a timely manner[2], MUST be idempotent and MUST be thread-safe.
6After theISubscription is cancelled, additionalISubscription.Request(long n) MUST be NOPs.
7After theISubscription is cancelled, additionalISubscription.Cancel() MUST be NOPs.
8While theISubscription is not cancelled,ISubscription.Request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
9While theISubscription is not cancelled,ISubscription.Request(long n) MUST signalonError with aSystem.ArgumentException if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule.
10While theISubscription is not cancelled,ISubscription.Request(long n) MAY synchronously callOnNext on this (or other) subscriber(s).
11While theISubscription is not cancelled,ISubscription.Request(long n) MAY synchronously callOnComplete orOnError on this (or other) subscriber(s).
12While theISubscription is not cancelled,ISubscription.Cancel() MUST request theIPublisher to eventually stop signaling itsISubscriber. The operation is NOT REQUIRED to affect theISubscription immediately.
13While theISubscription is not cancelled,ISubscription.Cancel() MUST request theIPublisher to eventually drop any references to the corresponding subscriber. Re-subscribing with the sameISubscriber object is discouraged [see2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely.
14While theISubscription is not cancelled, callingISubscription.Cancel MAY cause theIPublisher, if stateful, to transition into theshut-down state if no otherISubscription exists at this point [see1.9].
15CallingISubscription.Cancel MUST return normally. The only legal way to signal failure to anISubscriber is via theOnError method.
16CallingISubscription.Request MUST return normally. The only legal way to signal failure to anISubscriber is via theOnError method.
17AnISubscription MUST support an unbounded number of calls to Request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (System.Int64.MaxValue). A demand equal or greater than 2^63-1 (System.Int64.MaxValue) MAY be considered by theIPublisher as “effectively unbounded”[3].

[1] : An example for undesirable synchronous, open recursion would beISubscriber.OnNext ->ISubscription.Request ->ISubscriber.OnNext -> …, as it very quickly would result in blowing the calling Thread´s stack.

[2] : Avoid heavy computations and other things that would stall the caller´s thread of execution

[3] : As it is not feasibly reachable with current or foreseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1, it is allowed for anIPublisher to stop tracking demand beyond this point.

AnISubscription is shared by exactly oneIPublisher and oneISubscriber for the purpose of mediating the data exchange between this pair. This is the reason why theSubscribe() method does not return the createdISubscription, but instead returnsvoid; theISubscription is only passed to theISubscriber via theOnSubscribe callback.

4.IProcessor (Code)

publicinterfaceIProcessor<inT1,outT2>:ISubscriber<T1>,IPublisher<T2>{}
IDRule
1AnIProcessor represents a processing stage—which is both anISubscriber and anIPublisher and MUST obey the contracts of both.
2AnIProcessor MAY choose to recover anonError signal. If it chooses to do so, it MUST consider theISubscription cancelled, otherwise it MUST propagate theonError signal to its ISubscribers immediately.

While not mandated, it can be a good idea to cancel anIProcessors upstreamISubscription when/if its lastISubscriber cancels theirISubscription,to let the cancellation signal propagate upstream.

Asynchronous vs Synchronous Processing

The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError,onComplete) MUST NOTblock theIPublisher. However, each of theOn* handlers can process the events synchronously or asynchronously.

Take this example:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

It has an async origin and an async destination. Let's assume that both origin and destination are selector event loops. TheISubscription.Request(n) must be chained from the destination to the origin. This is now where each implementation can choose how to do this.

The following uses the pipe| character to signal async boundaries (queue and schedule) andR# to represent resources (possibly threads).

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

In this example each of the 3 consumers,map,filter andconsumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)------------------- R1 ----------------- | ---------- R2 ----------------

Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. Themap andfilter steps are synchronously performed on the origin thread.

Or another implementation could fuse the operations to the final consumer:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)--------- R1 ---------- | ------------------ R2 -------------------------

All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity.

The Reactive Streams contract allows implementations the flexibility to manage resources and scheduling and mix asynchronous and synchronous processing within the bounds of a non-blocking, asynchronous, dynamic push-pull stream.

In order to allow fully asynchronous implementations of all participating API elements—IPublisher/ISubscription/ISubscriber/IProcessor—all methods defined by these interfaces returnvoid.

ISubscriber controlled queue bounds

One of the underlying design principles is that all buffer sizes are to be bounded and these bounds must beknown andcontrolled by the subscribers. These bounds are expressed in terms ofelement count (which in turn translates to the invocation count of onNext). Any implementation that aims to support infinite streams (especially high output rate streams) needs to enforce bounds all along the way to avoid out-of-memory errors and constrain resource usage in general.

Since back-pressure is mandatory the use of unbounded buffers can be avoided. In general, the only time when a queue might grow without bounds is when the publisher side maintains a higher rate than the subscriber for an extended period of time, but this scenario is handled by backpressure instead.

Queue bounds can be controlled by a subscriber signaling demand for the appropriate number of elements. At any point in time the subscriber knows:

  • the total number of elements requested:P
  • the number of elements that have been processed:N

Then the maximum number of elements that may arrive—until more demand is signaled to the IPublisher—isP - N. In the case that the subscriber also knows the number of elements B in its input buffer then this bound can be refined toP - B - N.

These bounds must be respected by a publisher independent of whether the source it represents can be backpressured or not. In the case of sources whose production rate cannot be influenced—for example clock ticks or mouse movement—the publisher must choose to either buffer or drop elements to obey the imposed bounds.

ISubscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips).

Legal

This project is a collaboration between engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter and many others. This project is licensed under MIT No Attribution (SPDX: MIT-0).

About

Reactive Streams for .NET

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors7

Languages


[8]ページ先頭

©2009-2025 Movatter.jp