T - the value typepublic abstract classResourceSubscriber<T>extendsObjectimplementsFlowableSubscriber<T>,Disposable
All pre-implemented final methods are thread-safe.
To release the associated resources, one has to calldispose() inonError() andonComplete() explicitly.
Useadd(Disposable) to associate resources (asDisposables) with thisResourceSubscriber that will be cleaned up whendispose() is called. Removing previously associated resources is not possible but one can create aCompositeDisposable, associate it with thisResourceSubscriber and then add/remove resources to/from theCompositeDisposable freely.
The defaultonStart() requests Long.MAX_VALUE by default. Override the method to request a custompositive amount. Use the protectedrequest(long) to request more items anddispose() to cancel the sequence from within anonNext implementation.
Note that callingrequest(long) fromonStart() may trigger an immediate, asynchronous emission of data toSubscriber.onNext(Object). Make sure all initialization happens before the call torequest() inonStart(). Callingrequest(long) insideSubscriber.onNext(Object) can happen at any time because by design,onNext calls from upstream are non-reentrant and non-overlapping.
Like all other consumers,ResourceSubscriber can be subscribed only once. Any subsequent attempt to subscribe it to a new source will yield anIllegalStateException with message"It is not allowed to subscribe with a(n) <class name> multiple times.".
Implementation ofonStart(),Subscriber.onNext(Object),Subscriber.onError(Throwable) andSubscriber.onComplete() are not allowed to throw any unchecked exceptions. If for some reason this can't be avoided, useFlowable.safeSubscribe(org.reactivestreams.Subscriber) instead of the standardsubscribe() method.
Example
Disposable d = Flowable.range(1, 5) .subscribeWith(new ResourceSubscriber<Integer>() { @Override public void onStart() { add(Schedulers.single() .scheduleDirect(() -> System.out.println("Time!"), 2, TimeUnit.SECONDS)); request(1); } @Override public void onNext(Integer t) { if (t == 3) { dispose(); } System.out.println(t); request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); dispose(); } @Override public void onComplete() { System.out.println("Done!"); dispose(); } }); // ... d.dispose();| Constructor and Description |
|---|
ResourceSubscriber() |
| Modifier and Type | Method and Description |
|---|---|
void | add(Disposable resource)Adds a resource to this AsyncObserver. |
void | dispose()Cancels the subscription (if any) and disposes the resources associated with this AsyncObserver (if any). |
boolean | isDisposed()Returns true if this AsyncObserver has been disposed/cancelled. |
protected void | onStart()Called once the upstream sets a Subscription on this AsyncObserver. |
void | onSubscribe(Subscription s)Implementors of this method should make sure everything that needs to be visible in Subscriber.onNext(Object) is established before callingSubscription.request(long). |
protected void | request(long n)Request the specified amount of elements from upstream. |
clone,equals,finalize,getClass,hashCode,notify,notifyAll,toString,wait,wait,waitonComplete,onError,onNextpublic final void add(Disposable resource)
resource - the resource to addNullPointerException - if resource is nullpublic final void onSubscribe(Subscription s)
FlowableSubscriberSubscriber.onNext(Object) is established before callingSubscription.request(long). In practice this means no initialization should happen after therequest() call and additional behavior is thread safe in respect toonNext.onSubscribe in interface FlowableSubscriber<T>onSubscribe in interface Subscriber<T>protected void onStart()
You can perform initialization at this moment. The default implementation requests Long.MAX_VALUE from upstream.
protected final void request(long n)
This method can be called before the upstream calls onSubscribe(). When the subscription happens, all missed requests are requested.
n - the request amount, must be positivepublic final void dispose()
This method can be called before the upstream calls onSubscribe at which case the Subscription will be immediately cancelled.
dispose in interface Disposablepublic final boolean isDisposed()
isDisposed in interface Disposable