Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

RxJava 2.x & 3.x extra sources, operators and components and ports of many 1.x companion libraries.

License

NotificationsYou must be signed in to change notification settings

akarnokd/RxJavaExtensions

Repository files navigation

codecov.ioMaven CentralMaven Central

RxJava 3.x implementation of extra sources, operators and components and ports of many 1.x companion libraries.

Releases

gradle

dependencies {    implementation "com.github.akarnokd:rxjava3-extensions:3.1.1"}

Javadoc:https://akarnokd.github.io/RxJavaExtensions/javadoc/index.html

Maven search:

http://search.maven.org

Features

Extra functional interfaces

Support the join-patterns and async-util with functional interfaces of consumers with 3-9 type argumentsand have functional interfaces of functions without thethrows Exception.

  • SimpleCallable<T> -Callable<T> withoutthrows Exception
  • Consumer3 - 3 argumentConsumer
  • Consumer4 - 4 argumentConsumer
  • Consumer5 - 5 argumentConsumer
  • Consumer6 - 6 argumentConsumer
  • Consumer7 - 7 argumentConsumer
  • Consumer8 - 8 argumentConsumer
  • Consumer9 - 9 argumentConsumer
  • PlainFunction -Function withoutthrows Exception
  • PlainBiFunction -BiFunction withoutthrows Exception
  • PlainFunction3 -Function3 withoutthrows Exception
  • PlainFunction4 -Function4 withoutthrows Exception
  • PlainFunction5 -Function5 withoutthrows Exception
  • PlainFunction6 -Function6 withoutthrows Exception
  • PlainFunction7 -Function7 withoutthrows Exception
  • PlainFunction8 -Function8 withoutthrows Exception
  • PlainFunction9 -Function9 withoutthrows Exception

Utility functions supporting these can be found inFunctionsEx class.

Mathematical operations over numerical sequences

Although most of the operations can be performed withreduce, these operators have lower overheadas they cut out the reboxing of primitive intermediate values.

The following operations are available inMathFlowable forFlowable sequences andMathObservable inObservablesequences:

  • averageDouble()
  • averageFloat()
  • max()
  • min()
  • sumDouble()
  • sumFloat()
  • sumInt()
  • sumLong()

Example

MathFlowable.averageDouble(Flowable.range(1,10)).test().assertResult(5.5);Flowable.just(5,1,3,2,4).to(MathFlowable::min).test().assertResult(1);

String operations

characters

TheStringFlowable andStringObservable support streaming the characters of aCharSequence:

StringFlowable.characters("Hello world").map(v ->Characters.toLower((char)v)).subscribe(System.out::print,Throwable::printStackTrace,System.out::println);

split

Splits an incoming sequence of Strings based on a Regex pattern within and between subsequent elements if necessary.

Flowable.just("abqw","ercdqw","eref").compose(StringFlowable.split("qwer")).test().assertResult("ab","cd","ef");Flowable.just("ab",":cde:""fg").compose(StringFlowable.split(":")).test().assertResult("ab","cde","fg");

Asynchronous jumpstarting a sequence

Wrap functions and consumers into Flowables and Observables or into another layer of Functions.Most of these can now be achieved viafromCallable and some function composition in plain RxJava.

start

Run a function or action once on a background thread and cache its result.

AtomicIntegercounter =newAtomicInteger();Flowable<Integer>source =AsyncFlowable.start(() ->counter.incrementAndGet());source.test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1);source.test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1);

toAsync

Call a function (with parameters) to call a function inside aFlowable/Observable with the sameparameter and have the result emitted by thatFlowable/Observable from a background thread.

Function<Integer,Flowable<String>>func =AsyncFlowable.toAsync(param ->"[" +param +"]");func.apply(1)    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult("[1]");

startFuture

Run a Supplier that returns a Future to call blocking get() on to get the solo value or exception.

ExecutorServiceexec =Executors.newSingleThreadedScheduler();AsyncFlowable.startFuture(() ->exec.submit(() ->1))    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1);exec.shutdown();

deferFuture

Run a Supplier that returns a Future to call blocking get() on to get aPublisher to stream back.

ExecutorServiceexec =Executors.newSingleThreadedScheduler();AsyncFlowable.startFuture(() ->exec.submit(() ->Flowable.range(1,5)))    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1,2,3,4,5);exec.shutdown();

forEachFuture

Consume aPublisher and haveFuture that completes when the consumption ends withonComplete oronError.

Future<Object>f =AsyncFlowable.forEachFuture(Flowable.range(1,100),System.out::println);f.get();

runAsync

Allows emitting multiple values through a Processor mediator from a background thread and allows disposingthe sequence externally.

AsyncFlowable.runAsync(Schedulers.single(),UnicastProcessor.<Object>create(),newBiConsumer<Subscriber<Object>,Disposable>() {@Overridepublicvoidaccept(Subscriber<?superObject>s,Disposabled)throwsException {s.onNext(1);s.onNext(2);s.onNext(3);Thread.sleep(200);s.onNext(4);s.onNext(5);s.onComplete();            }        }).test().awaitDone(5,TimeUnit.SECONDS).assertResult(1,2,3,4,5);

Computational expressions

The operators onStatementFlowable andStatementObservable allow taking different branches at subscription time:

ifThen

Conditionally chose a source to subscribe to. This is similar to the imperativeif statement but with reactive flows:

if ((System.currentTimeMillis() &1) !=0) {System.out.println("An odd millisecond");}else {System.out.println("An even millisecond");}
Flowable<String>source =StatementFlowable.ifThen(    () -> (System.currentTimeMillis() &1) !=0,Flowable.just("An odd millisecond"),Flowable.just("An even millisecond"));source.delay(1,TimeUnit.MILLISECONDS).repeat(1000).subscribe(System.out::println);

switchCase

Calculate a key and pick a source from a Map. This is similar to the imperativeswitch statement:

switch ((int)(System.currentTimeMillis() &7)) {case1:System.out.println("one");break;case2:System.out.println("two");break;case3:System.out.println("three");break;default:System.out.println("Something else");}
Map<Integer,Flowable<String>>map =newHashMap<>();map.put(1,Flowable.just("one"));map.put(2,Flowable.just("two"));map.put(3,Flowable.just("three"));Flowable<String>source =StatementFlowable.switchCase(    () -> (int)(System.currentTimeMillis() &7),map,Flowable.just("Something else"));source.delay(1,TimeUnit.MILLISECONDS).repeat(1000).subscribe(System.out::println);

doWhile

Resubscribe if a condition is true after the last subscription completed normally. This is similar to the imperativedo-while loop (executing the loop body at least once):

longstart =System.currentTimeMillis();do {Thread.sleep(1);System.out.println("Working...");while (start +100 >System.currentTimeMillis());
longstart =System.currentTimeMillis();Flowable<String>source =StatementFlowable.doWhile(Flowable.just("Working...").delay(1,TimeUnit.MILLISECONDS),    () ->start +100 >System.currentTimeMillis());source.subscribe(System.out::println);

whileDo

Subscribe and resubscribe if a condition is true. This is similar to the imperativewhile loop (where the loop body may not execute if the condition is falseto begin with):

while ((System.currentTimeMillis() &1) !=0) {System.out.println("What an odd millisecond!");}
Flowable<String>source =StatementFlowable.whileDo(Flowable.just("What an odd millisecond!"),    () -> (System.currentTimeMillis() &1) !=0);source.subscribe(System.out::println);

Join patterns

(Conversion done)

TBD: examples

Debug support

By default, RxJava 3's RxJavaPlugins only offers the ability to hook into the assembly process (i.e., when you apply an operator on a sequence or create one) unlike 1.x where there is anRxJavaHooks.enableAssemblyTracking() method. Since the standard format is of discussion there, 3.x doesn't have such feature built in but onlyin this extension library.

Usage

You enable tracking via:

RxJavaAssemblyTracking.enable();

and disable via:

RxJavaAssemblyTracking.disable();

Note that this doesn't save or preserve the old hooks (namedAssembly) you may have set as of now.

Output

In debug mode, you can walk through the reference graph of Disposables and Subscriptions to find anFlowableOnAssemblyX named nodes (similar in the other base types) where there is anassembled field of typeRxJavaAssemblyException. This has also a field namedstacktrace that contains a pretty printed stacktrace string pointing to the assembly location:

RxJavaAssemblyException: assembledat io.reactivex.Completable.error(Completable.java:280)at hu.akarnokd.rxjava3.debug.RxJava3AssemblyTrackingTest.createCompletable(RxJava3AssemblyTrackingTest.java:78)at hu.akarnokd.rxjava3.debug.RxJava3AssemblyTrackingTest.completable(RxJava3AssemblyTrackingTest.java:185)

This is a filtered list of stacktrace elements (skipping threading, unit test and self-related entries). Most modern IDEs should allow you to navigate to the locations when printed on (or pasted into) its console.

To avoid interference, theRxJavaAssemblyException is attached as the last cause to potential chain of the original exception that travels through each operator to the end consumer.

You can programmatically find this via:

RxJavaAssemblyExceptionassembled =RxJavaAssemblyException.find(someThrowable);if (assembled !=null) {System.err.println(assembled.stacktrace());}

Function tagging

Often, when a function throws or returns null, there is not enough information tolocate said function in the codebase. TheFunctionTagging utility class offersstatic wrappers for RxJava function types that when fail or return null, a customstring tag is added or appended to the exception and allows locating that functionin your codebase. Since added logic has overhead, the tagging process has to beenabled and can be disabled as necessary.

FunctionTagging.enable();Function<Integer,Integer>tagged =FunctionTagging.tagFunction(v ->null,"F1");FunctionTagging.disable();Function<Integer,Integer>notTagged =FunctionTagging.tagFunction(v ->null,"F2");assertNull(notTagged.apply(1));try {tagged.apply(1);fail("Should have thrown");}catch (NullPointerExceptionex) {assertTrue(ex.getMessage().contains("F1"));}

To avoid lambda ambiguity, the methods are namedtagX whereX is the functional type name such asBiFunction,Function3 etc.

The wrappers check fornull parameters and if the wrapped function returns anull and throw aNullPointerException containing the parametername (t1 .. t9) and the tag provided.

Protocol validation

Custom operators and sources sometimes contain bugs that manifest themselves in odd sequence behavior or crashesfrom within the standard operators. Since the revealing stacktraces is often missing or incomplete, diagnosingsuch failures can be tiresome. Therefore, thehu.akarnokd.rxjava3.debug.validator.RxJavaProtocolValidatorclass offers assembly hooks for the standard reactive base types.

The validation hooks can be enabled viaRxJavaProtocolValidator.enable() and disabled viaRxJavaProtocolValidator.disable().The validator also supports chaining with existing hooks viaenableAndChain() which returns aSavedHooks instanceto restore the original hooks specifically:

SavedHookshooks =RxJavaProtocolValidator.enableAndChain();// assemble and run flows// ...hooks.restore();

By default, the violations, subclasses ofProtocolNonConformanceException, are reported to theRxJavaPlugins.onErrorhandler but can be overridden viaRxJavaProtocolValidator.setOnViolationHandler.

RxJavaProtocolValidator.setOnViolationHandler(e ->e.printStackTrace());RxJavaProtocolValidator.enable();// ...

The following error violations are detected:

ExceptionViolation description
MultipleTerminationsExceptionWhen multiple calls toonError oronComplete happened.
MultipleOnSubscribeCallsExceptionWhen multiple calls toonSubscribe happened
NullOnErrorParameterExceptionWhen theonError was called with anullThrowable.
NullOnNextParameterExceptionWhen theonNext was called with anull value.
NullOnSubscribeParameterExceptionWhen theonSubscribe was called with anullDisposable orSubscription.
NullOnSuccessParameterExceptionWhen theonSuccess was called with anull value.
OnNextAfterTerminationExceptionWen theonNext was called afteronError oronComplete.
OnSubscribeNotCalledExceptionWhen any of theonNext,onSuccess,onError oronComplete is invoked without invokingonSubscribe first.
OnSuccessAfterTerminationExceptionWen theonSuccess was called afteronError oronComplete.

Multi-hook handlers

The standardRxJavaPlugins allows only one hook to be associated with each main intercept option.If multiple hooks should be invoked, that option is not directly supported byRxJavaPlugins butcan be built upon the single hook scheme.

Thehu.akarnokd.rxjava3.debug.multihook package offers hook managers that can work with multiple hooksthemselves.

The various multi-hook managers are built upon the genericMultiHandlerManager<H> class. The class offers theregistermethod to register a particular hook for which aDisposable is returned. This allows removing a particularhook without the need to remember the hook instance or the manager class.

OnScheduleMultiHookManager

Offers multi-hook management for theRxJavaPlugins.setScheduleHandler andonSchedule hooks.

Theenable() method will install the instance as the main hook, thedisable() will restore the default no-hook.The convenienceappend() will take any existing hook, register it with the manager and install the manager as the main hook.

SoloProcessor, PerhapsProcessor and NonoProcessor

These are the backpressure-aware, Reactive-Streams Processor-based implementations of theSingleSubject,MaybeSubject and CompletableSubject respectively. Their usage is quite similar.

PerhapsProcessor<Integer>ms =PerhapsProcessor.create();TestSubscriber<Integer>to =ms.test();ms.onNext(1);ms.onComplete();to.assertResult(1);

Similarly with NonoProcessor, although callingonNext(null) will throw aNullPointerException to the caller.

NonoProcessorcs =NonoProcessor.create();TestSubscriber<Void>to2 =cs.test();cs.onComplete();to2.assertResult();

Finally

SoloProcessor<Integer>ss =SoloProcessor.create();TestSubscriber<Integer>to3 =ss.test();ss.onNext(1);ss.onComplete();to3.assertResult(1);

Note that callingonComplete afteronNext is optional withSoloProcessor but callingonComplete without callingonNext terminates theSoloProcessor with aNoSuchElementException.

MulticastProcessor

Moved to RxJava as standard processor:io.reactivex.rxjava3.processors.MulticastProcessor.

UnicastWorkSubject

ASubject variant that buffers items and allows oneObserver to consume it at a time, but unlikeUnicastSubject,once the previousObserver disposes, a newObserver can subscribe and resume consuming the items.

UnicastWorkSubject<Integer>uws =UnicastWorkSubject.create();uws.onNext(1);uws.onNext(2);uws.onNext(3);uws.onNext(4);uws.take(2).test().assertResult(1,2);uws.take(2).test().assertResult(3,4);uws.onComplete();uws.test().assertResult();

DispatchWorkSubject

ASubject variant that buffers items and allows one or moreObservers to exclusively consume one of the items in the bufferasynchronously. If there are noObservers (or they all disposed), theDispatchWorkSubject will keep buffering and laterObservers can resume the consumption of the buffer.

DispatchWorkSubject<Integer>dws =DispatchWorkSubject.create(Schedulers.computation());Single<List<Integer>>asList =dws.toList();TestObserver<List<Integer>>to =Single    .zip(asList,asList, (a,b) ->a.addAll(b))    .test();Observable.range(1,1000000).subscribe(dws);to.awaitDone(5,TimeUnit.SECONDS).assertValueCount(1).assertComplete().assertNoErrors();assertEquals(1000000,to.values().get().size());

DispatchWorkProcessor

AFlowableProcessor variant that buffers items and allows one or moreSubscribers to exclusively consume one of the items in the bufferasynchronously. If there are noSubscribers (or they all canceled), theDispatchWorkProcessor will keep buffering and laterSubscribers can resume the consumption of the buffer.

DispatchWorkProcessor<Integer>dwp =DispatchWorkProcessor.create(Schedulers.computation());Single<List<Integer>>asList =dwp.toList();TestObserver<List<Integer>>to =Single    .zip(asList,asList, (a,b) ->a.addAll(b))    .test();Flowable.range(1,1000000).subscribe(dwp);to.awaitDone(5,TimeUnit.SECONDS).assertValueCount(1).assertComplete().assertNoErrors();assertEquals(1000000,to.values().get().size());

FlowableProcessor utils

An utility class that helps working with Reactive-StreamsProcessor,FlowableProcessorSubject instances via static methods.

wrap

Wraps an arbitraryProcessor into aFlowableProcessor

refCount

Wraps aFlowableProcessor/Subject and makes sure if all subscribers/observer cancel/disposetheir subscriptions, the upstream'sSubscription/Disposable gets cancelled/disposed as well.

Custom Schedulers

SharedScheduler

ThisScheduler implementation takes aWorker directly or from anotherScheduler and shares it across its ownWorkers whilemaking sure disposing one of its ownSharedWorker doesn't dispose any otherSharedWorker or the underlying sharedWorker.

This type of scheduler may help solve the problem when one has to return to the same thread/scheduler at different stages of the pipelinebut one doesn't want to or isn't able to useSingleScheduler or some other single-threaded thread-pool wrapped viaSchedulers.from().

SharedSchedulershared =newSharedScheduler(Schedulers.io());Flowable.just(1).subscribeOn(shared).map(v ->Thread.currentThread().getName()).observeOn(Schedulers.computation()).map(v ->v.toLowerCase()).observeOn(shared).map(v ->v.equals(Thread.currentThread().getName().toLowerCase())).blockingForEach(System.out::println);

ParallelScheduler

It is similar toSchedulers.computation() but you can control the number of threads, the thread name prefix, the thread priority and to track each task submitted to its worker.

Tracking a task means that if one callsWorker.dispose(), all outstanding tasks is cancelled. However, certain use cases can get away with just preventing the execution of the task body and just run through all outstanding tasks yielding lower overhead.

TheParallelScheduler supportsstart andshutdown to start and stop the backing thread-pools. The non-ThreadFactory constructors create a daemon-thread backed set of single-threaded thread-pools.

Schedulers =newParallelScheduler(3);try {Flowable.range(1,10)    .flatMap(v ->Flowable.just(1).subscribeOn(s).map(v ->v +1))    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertValueSet(Arrays.asList(2,3,4,5,6,7,8,9,10,11))    .assertComplete()    .assertNoErrors();}finally {s.shutdown();}

BlockingScheduler

This type of scheduler runs its execution loop on the "current thread", more specifically, the thread which invoked itsexecute() method. The method blocks until theshutdown() is invoked. This type of scheduler allows returning to the "main" thread from other threads.

publicstaticvoidmain(String[]args) {BlockingSchedulerscheduler =newBlockingScheduler();scheduler.execute(() -> {Flowable.range(1,10)        .subscribeOn(Schedulers.io())        .observeOn(scheduler)        .doAfterTerminate(() ->scheduler.shutdown())        .subscribe(v ->System.out.println(v +" on " +Thread.currentThread()));    });System.out.println("BlockingScheduler finished");}

Custom operators and transformers

The custom transformers (to be applied withFlowable.compose for example), can be found inhu.akarnokd.rxjava3.operators.FlowableTransformers class. The custom source-like operators can be found inhu.akarnokd.rxjava3.operators.Flowables class. The operators and transformers for the other basereactive classes (will) follow the usual naming scheme.

FlowableTransformers.valve()

Pauses and resumes a main flow if the secondary flow signals false and true respectively.

Also available asObservableTransformers.valve().

PublishProcessor<Boolean>valveSource =PublishProcessor.create();Flowable.intervalRange(1,20,1,1,TimeUnit.SECONDS).compose(FlowableTransformers.<Long>valve(valveSource)).subscribe(System.out::println,Throwable::printStackTrace);Thread.sleep(3100);valveSource.onNext(false);Thread.sleep(5000);valveSource.onNext(true);Thread.sleep(3000);valveSource.onNext(false);Thread.sleep(6000);valveSource.onNext(true);Thread.sleep(3000);

Flowables.orderedMerge()

Given a fixed number of input sources (which can be self-comparable or given aComparator) merges theminto a single stream by repeatedly picking the smallest one from each source until all of them completes.

Flowables.orderedMerge(Flowable.just(1,3,5),Flowable.just(2,4,6)).test().assertResult(1,2,3,4,5,6);

FlowableTransformers.bufferWhile()

Buffers into a list/collection while the given predicate returns true forthe current item, otherwise starts a new list/collection containing the given item (i.e., the "separator" ends up in the next list/collection).

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.bufferWhile(v -> !"#".equals(v))).test().assertResult(Arrays.asList("1","2"),Arrays.asList("#","3"),Arrays.asList("#","4"),Arrays.asList("#"));

FlowableTransformers.bufferUntil()

Buffers into a list/collection until the given predicate returns true forthe current item and starts an new empty list/collection (i.e., the "separator" ends up in the same list/collection).

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.bufferUntil(v ->"#".equals(v))).test().assertResult(Arrays.asList("1","2","#"),Arrays.asList("3","#"),Arrays.asList("4","#"));

FlowableTransformers.bufferSplit()

Buffers into a list/collection while the predicate returns false. When it returns true,a new buffer is started and the particular item won't be in any of the buffers.

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.bufferSplit(v ->"#".equals(v))).test().assertResult(Arrays.asList("1","2"),Arrays.asList("3"),Arrays.asList("4"));

FlowableTransformers.spanout()

Inserts a time delay between emissions from the upstream. For example, if the upstream emits 1, 2, 3 in a quick succession, a spanout(1, TimeUnit.SECONDS) will emit 1 immediately, 2 after a second and 3 after a second after 2. You can specify the initial delay, a custom scheduler and if an upstream error should be delayed after the normal items or not.

Flowable.range(1,10).compose(FlowableTransformers.spanout(1,1,TimeUnit.SECONDS)).doOnNext(v ->System.out.println(System.currentTimeMillis() +": " +v)).test().awaitDone(20,TimeUnit.SECONDS).assertResult(1,2,3,4,5,6,7,8,9,10);

FlowableTransformers.mapFilter()

A callbackConsumer is called with the current upstream value and aBasicEmitter on which doXXX methods can be calledto transform a value, signal an error or stop a sequence. If none of thedoXXX methods is called, the current value is dropped and another is requested from upstream. The operator is a pass-through for downstream requests otherwise.

Flowable.range(1,10).compose(FlowableTransformers.mapFilter((v,e) -> {if (v %2 ==0) {e.doNext(v *2);    }if (v ==5) {e.doComplete();    }})).test().assertResult(4,8);

FlowableTransformers.onBackpressureTimeout()

Consumes the upstream in an unbounded manner and buffers elements until the downstream requests but each buffered element has an associated timeout after which it becomes unavailable. Note that this may create discontinuities in the stream. In addition, an overload allows specifying the maximum buffer size and an eviction action which gets triggered when the buffer reaches itscapacity or elements time out.

Flowable.intervalRange(1,5,100,100,TimeUnit.MILLISECONDS)        .compose(FlowableTransformers            .onBackpressureTimeout(2,100,TimeUnit.MILLISECONDS,Schedulers.single(),System.out::println))        .test(0)        .awaitDone(5,TimeUnit.SECONDS)        .assertResult();

Flowables.repeat()

Repeats a scalar value indefinitely (until the downstream actually cancels), honoring backpressure and supporting synchronous fusion and/or conditional fusion.

Flowable.repeat("doesn't matter").map(v ->ThreadLocalRandom.current().nextDouble()).take(100).all(v ->v <1d).test().assertResult(true);

Flowables.repeatSupplier()

Repeatedly calls a supplier, indefinitely (until the downstream actually cancels) or if the supplier throws or returns null (when it signalsNullPointerException), honoring backpressure and supporting synchronous fusion and/or conditional fusion.

Flowable.repeatSupplier(() ->ThreadLocalRandom.current().nextDouble()).take(100).all(v ->v <1d).test().assertResult(true);

FlowableTransformers.every()

Relays every Nth item from upstream (skipping the in-between items).

Flowable.range(1,5).compose(FlowableTransformers.<Integer>every(2)).test().assertResult(2,4)

Flowables.intervalBackpressure()

Emit an ever increasing series of long values, starting from 0L and "buffer"emissions in case the downstream can't keep up. The "buffering" is virtual and isn't accompanied by increased memory usage if it happens for a longerperiod of time.

Flowables.intervalBackpressure(1,TimeUnit.MILLISECONDS).observeOn(Schedulers.single()).take(1000).test().awaitDone(5,TimeUnit.SECONDS).assertValueCount(1000).assertNoErrors().assertComplete();

FlowableTransformers.cacheLast()

Caches the very last value of the upstream source and relays/replays it to Subscribers. The difference fromreplay(1) is that this operator is guaranteedto hold onto exactly one value whereasreplay(1) may keep a reference to the one before too due to continuity reasons.

Flowable<Integer>f =Flowable.range(1,5).doOnSubscribe(s ->System.out.println("Subscribed!")).compose(FlowableTransformers.cacheLast());// prints "Subscribed!"f.test().assertResult(5);// doesn't print anything elsef.test().assertResult(5);f.test().assertResult(5);

FlowableTransformers.timeoutLast() & timeoutLastAbsolute()

The operator consumes the upstream to get to the last value but completes if thesequence doesn't complete within the specified timeout. A use case is when the upstream generates estimates, each better than the previous but we'd like to receive the last of it and not wait for a potentially infinite series.

There are two variants: relative timeout and absolute timeout.

With relative timeout, the operator restarts the timeout after each upstream item, cancels the upstream and emits that latest item if the timeout happens:

Flowable.just(0,50,100,400).flatMap(v ->Flowable.timer(v,TimeUnit.MILLISECONDS).map(w ->v)).compose(FlowableTransformers.timeoutLast(200,TimeUnit.MILLISECONDS)).test().awaitDone(5,TimeUnit.SECONDS).assertResult(100);

With absolute timeout, the upstream operator is expected to complete within thespecified amount of time and if it doesn't, the upstream gets cancelled and the latest item emitted.

Flowable.just(0,50,100,150,400).flatMap(v ->Flowable.timer(v,TimeUnit.MILLISECONDS).map(w ->v)).compose(FlowableTransformers.timeoutLastAbsolute(200,TimeUnit.MILLISECONDS)).test().awaitDone(5,TimeUnit.SECONDS).assertResult(150);

FlowableTransformers.debounceFirst()

Debounces the upstream by taking an item and dropping subsequent items until the specified amount of time elapses after the last item, after which the process repeats.

Flowable.just(0,50,100,150,400,500,550,1000).flatMap(v ->Flowable.timer(v,TimeUnit.MILLISECONDS).map(w ->v)).compose(FlowableTransformers.debounceFirst(200,TimeUnit.MILLISECONDS)).test().awaitDone(5,TimeUnit.SECONDS).assertResult(0,400,1000);

FlowableTransformers.switchFlatMap()

This is a combination of switchMap and a limited flatMap. It merges a maximum number of Publishers at once but if a new inner Publisher gets mapped in and the active count is at max, the oldest active Publisher is cancelled and the new inner Publisher gets flattened as well. Running withmaxActive == 1 is equivalent to the plainswitchMap.

Flowable.just(100,300,500).flatMap(v ->Flowable.timer(v,TimeUnit.MILLISECONDS).map(w ->v)).compose(FlowableTransformers.switchFlatMap(v -> {if (v ==100) {returnFlowable.intervalRange(1,3,75,100,TimeUnit.MILLISECONDS)           .map(w ->"A" +w);    }elseif (v ==300) {returnFlowable.intervalRange(1,3,10,100,TimeUnit.MILLISECONDS)           .map(w ->"B" +w);    }returnFlowable.intervalRange(1,3,20,100,TimeUnit.MILLISECONDS)        .map(w ->"C" +w);},2).test().awaitDone(5,TimeUnit.SECONDS).assertResult("A1","A2","B1","A3","B2","C1",B3","C2", "C3);

FlowableTransformers.flatMapSync()

A bounded-concurrencyflatMap implementation optimized for mostly non-trivial, largely synchronous sources in mind and using different tracking method and configurable merging strategy: depth-first consumes each inner source as much as possible before switching to the next; breadth-first consumes one element from each source in a round-robin fashion. Overloads allow specifying the concurrency level (32 default), inner-prefetch (Flowable.bufferSize() default) and the merge strategy (depth-first default).

Flowable.range(1,1000).compose(FlowableTransformers.flatMapSync(v ->Flowable.range(1,1000))).test().assertValueCount(1_000_000).assertNoErrors().assertComplete();

FlowableTransformers.flatMapAsync()

A bounded-concurrencyflatMap implementation taking a scheduler which is used for collecting and emitting items from the active sources and freeing up the inner sources to keep producing. It also uses a different tracking method and configurable merging strategy: depth-first consumes each inner source as much as possible before switching to the next; breadth-first consumes one element from each source in a round-robin fashion. Overloads allow specifying the concurrency level (32 default), inner-prefetch (Flowable.bufferSize() default) and the merge strategy (depth-first default).

Flowable.range(1,1000).compose(FlowableTransformers.flatMapAsync(v ->Flowable.range(1,1000),Schedulers.single())).test().awaitDone(5,TimeUnit.SECONDS).assertValueCount(1_000_000).assertNoErrors().assertComplete();

FlowableTransformers.switchIfEmpty() & switchIfEmptyArray()

Switches to the alternatives, one after the other if the main source or the previous alternative turns out to be empty.

Flowable.empty().compose(FlowableTransformers.switchIfEmpty(Arrays.asList(Flowable.empty(),Flowable.range(1,5)))).test().assertResult(1,2,3,4,5);Flowable.empty().compose(FlowableTransformers.switchIfEmptyArray(Flowable.empty(),Flowable.range(1,5))).test().assertResult(1,2,3,4,5);

FlowableTransformers.expand()

Streams values from the main source, maps each of them onto another Publisher and recursively streams those Publisher values until all Publishers terminate.Two recursing mode is available: breadth-first will stream the main source (level 1), then the Publishers generated by its items (level 2), then the Publishers generated by the level 2and so on; depth-first will take an item from the main source, maps it to a Publisher then takes an item from this Publisher and maps it further.

Flowable.just(10).compose(FlowableTransformers.expand(v ->v ==0 ?Flowable.empty() :Flowable.just(v -1))).test().assertResult(10,9,8,7,6,5,4,3,2,1,0);

Depth-first example:

Flowable.just(newFile(".")).compose(FlowableTransformers.expand(file -> {if (file.isDirectory()) {File[]files =file.listFiles();if (files !=null) {returnFlowable.fromArray(files);        }    }returnFlowable.empty();},ExpandStrategy.DEPTH_FIRST)).subscribe(System.out::println);// prints something like// ~/git/RxJavaExtensions// ~/git/RxJavaExtensions/src// ~/git/RxJavaExtensions/src/main// ~/git/RxJavaExtensions/src/main/java// ~/git/RxJavaExtensions/src/main/java/hu// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/operators// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/operators/FlowableExpand.java// ...// ~/git/RxJavaExtensions/src/test// ~/git/RxJavaExtensions/src/test/java

Breadth-first example:

Flowable.just(newFile(".")).compose(FlowableTransformers.expand(file -> {if (file.isDirectory()) {File[]files =file.listFiles();if (files !=null) {returnFlowable.fromArray(files);        }    }returnFlowable.empty();},ExpandStrategy.BREADTH_FIRST)).subscribe(System.out::println);// prints something like// ~/git/RxJavaExtensions// ~/git/RxJavaExtensions/src// ~/git/RxJavaExtensions/build// ~/git/RxJavaExtensions/gradle// ~/git/RxJavaExtensions/HEADER// ~/git/RxJavaExtensions/README.md// ...// ~/git/RxJavaExtensions/src/main// ~/git/RxJavaExtensions/src/test// ~/git/RxJavaExtensions/src/jmh// ~/git/RxJavaExtensions/src/main/java// ~/git/RxJavaExtensions/src/main/java/hu// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/operators// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/math// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/async// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/debug// ...// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/operators/FlowableExpand.java// ~/git/RxJavaExtensions/src/main/java/hu/akarnokd/rxjava3/operators/FlowableTransformers.java

FlowableTransformers.mapAsync()

Also available asObservableTransformers.mapAsync().

This is an "asynchronous" version of the regularmap() operator where an upstream value is mapped to aPublisher whichis expected to emit a single value to be the result itself or through a combiner function become the result. Onlyone suchPublisher is executed at once and the source order is kept. If thePublisher is empty, no value is emittedand the sequence continues with the next upstream value. If thePublisher has more than one element, only the firstelement is considered and the inner sequence gets cancelled after that first element.

Flowable.range(1,5).compose(FlowableTransformers.mapAsync(v ->Flowable.just(v +1).delay(1,TimeUnit.SECONDS))).test().awaitDone(10,TimeUnit.SECONDS).assertResult(2,3,4,5,6);

Example when using a combiner function to combine the original and the generated values:

Flowable.range(1,5).compose(FlowableTransformers.mapAsync(v ->Flowable.just(v +1).delay(1,TimeUnit.SECONDS)), (v,w) ->v +"-" +w).test().awaitDone(10,TimeUnit.SECONDS).assertResult("1-2","2-3","3-4","4-5","5-6");

FlowableTransformers.filterAsync()

Also available asObservableTransformers.filterAsync().

This is an "asynchronous" version of the regularfilter() operator where an upstream value is mapped to aPublisherwhich is expected to emit a singletrue orfalse that indicates the original value should go through. An emptyPublisheris considered to be afalse response. If thePublisher has more than one element, only the firstelement is considered and the inner sequence gets cancelled after that first element.

Flowable.range(1,10).compose(FlowableTransformers.filterAsync(v ->Flowable.just(v).delay(1,TimeUnit.SECONDS).filter(v %2 ==0)).test().awaitDone(15,TimeUnit.SECONDS).assertResult(2,4,6,8,10);

FlowableTransformers.refCount()

*Moved to RxJava as standard operators:ConnectableObservable.refCount,ConnectableFlowable.refCount.

Flowables.zipLatest()

Zips the latest values from multiple sources and calls a combiner function for them.If one of the sources is faster then the others, its unconsumed values will be overwritten by newervalues.UnlikecombineLatest, source items are participating in the combination at most once; i.e., theoperator emits only if all sources have produced an item.The emission speed of this operator is determined by the slowest emitting source and the speed of the downstream consumer.There are several overloads available: methods taking 2-4 sources and the respective combiner functions, a method taking avarargs of sources and a method taking anIterable of sourcePublishers.The operator supports combining and scheduling the emission of the result via a customScheduler, thusallows avoiding the buffering effects of theobserveOn operator.The operator terminates if any of the sources runs out of items and terminates by itself.The operator works with asynchronous sources the best; synchronous sources may get consumed fullyin order they appear among the parameters and possibly never emit more than one combined result evenif the last source has more than one item.

TestSchedulerscheduler =newTestScheduler();TestSubscriber<String>ts =Flowables.zipLatest(toString,Flowable.intervalRange(1,6,99,100,TimeUnit.MILLISECONDS,scheduler),Flowable.intervalRange(4,3,200,200,TimeUnit.MILLISECONDS,scheduler)).test();scheduler.advanceTimeBy(200,TimeUnit.MILLISECONDS);ts.assertValue("[2, 4]");scheduler.advanceTimeBy(200,TimeUnit.MILLISECONDS);ts.assertValues("[2, 4]","[4, 5]");scheduler.advanceTimeBy(200,TimeUnit.MILLISECONDS);ts.assertResult("[2, 4]","[4, 5]","[6, 6]");

FlowableTransformers.coalesce()

Coalesces items from upstream into a container via a consumer and emits the container ifthere is a downstream demand, otherwise it keeps coalescing into the same container. Notethat the operator keeps an internal unbounded buffer to collect up upstream values beforethe coalescing happens and thus a computational heavy downstream hogging the emission threadmay lead to excessive memory usage. It is recommended to useobserveOn in this case.

Flowable.range(1,5).compose(FlowableTransformers.coalesce(ArrayList::new, (a,b) ->a.add(b)).test(1).assertValue(Arrays.asList(1)).requestMore(1).assertResult(Arrays.asList(1),Arrays.asList(2,3,4,5));

FlowableTransformers.windowWhile

Emits elements into a Flowable window while the given predicate returns true.If the predicate returns false, a new Flowable window is emitted.

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.windowWhile(v -> !"#".equals(v))).flatMapSingle(v ->v.toList()).test().assertResult(Arrays.asList("1","2"),Arrays.asList("#","3"),Arrays.asList("#","4"),Arrays.asList("#"));

FlowableTransformers.windowUntil

Emits elements into a Flowable window until the given predicate returns trueat which point a new Flowable window is emitted.

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.windowUntil(v ->"#".equals(v))).flatMapSingle(v ->v.toList()).test().assertResult(Arrays.asList("1","2","#"),Arrays.asList("3","#"),Arrays.asList("4","#"));

FlowableTransformers.windowSplit

Emits elements into a Flowable window until the given predicate returns true at whichpoint a new Flowable window is emitted; the particular item will be dropped.

Flowable.just("1","2","#","3","#","4","#").compose(FlowableTransformers.windowSplit(v ->"#".equals(v))).flatMapSingle(v ->v.toList()).test().assertResult(Arrays.asList("1","2"),Arrays.asList("3"),Arrays.asList("4"));

FlowableTransformers.indexOf

Returns the first index of an element that matches a predicate or -1L if no elements match.(Also available forObservables asObservableTransformers.indexOf().)

Flowable.range(1,5).compose(FlowableTransformers.indexOf(v ->v ==5)).test().assertResult(4);

FlowableTransformers.requestObserveOn

Requests items one-by-one from the upstream on the specifiedScheduler and emits the received items from thegivenScheduler as well in a fashion that allows tasks to be interleaved on the targetScheduler (aka "fair" use)on a much more granular basis thanFlowable.observeOn.

Flowable.range(1,5).compose(FlowableTransformers.requestObserveOn(Schedulers.single())).test().awaitDone(5,TimeUnit.SECONDS).assertResult(1,2,3,4,5);

FlowableTransformers.requestSample

Periodically (and after an optional initial delay) issues a singlerequest(1) to the upstream and forwards theitems to a downstream that must be ready to receive them.

Flowables.repeatCallable(() ->1).compose(FlowableTransformers.requestSample(1,TimeUnit.SECONDS,Schedulers.single())).take(5).test().awaitDone(7,TimeUnit.SECONDS).assertResult(1,1,1,1,1);

The sampling can be of a more complex pattern by using anotherPublisher as the indicator when to request:

Flowables.repeatCallable(() ->1).compose(FlowableTransformers.requestSample(Flowable.fromArray(100,500,1000,2000,5000)    .concatMap(delay ->Flowable.timer(delay,TimeUnit.MILLISECONDS)))).take(5).test().awaitDone(10,TimeUnit.SECONDS).assertResult(1,1,1,1,1);

FlowableTransformers.switchOnFirst

Depending on the very first item of the source sequence, see if that item matches a predicate and if so, switch toa generated alternative sequence.

Flowable.fromCallable(() ->Math.random()).compose(FlowableTransformers.switchOnFirst(v ->v <0.5,v ->Flowable.just(v *6))).repeat(20).subscribe(System.out::println);

Note that if one wishes to switch always based on the first item, one can usev -> true for the predicate and return the resumptionsequence conditionally in the selector property.

Note that the initial value is not emitted if the predicate returns true. If one wants to keep that in the sequence, concatenateit to the sequence returned from the selector:v -> Flowable.just(v).concatWith(theNewSequence).

ObservableTransformers.observeOnDrop

Drop upstream items while the downstream is working on an item in itsonNext method on anScheduler.This is similar toFlowable.onBackpressureDrop but instead dropping on a lack of requests, itemsfrom upstream are dropped while a work indicator is active during the execution of the downstream'sonNext method.

Observable.range(1,1000000).compose(ObservableTransformers.observeOnDrop(Schedulers.io())).doOnNext(v ->Thread.sleep(1)).test().awaitDone(5,TimeUnit.SECONDS).assertOf(to -> {assertTrue(to.getValueCount() >=1 &&to.getValueCount() <=1000000); });

ObservableTransformers.observeOnLatest

Keeps the latest item from the upstream while the downstream working on the current item in itsonNextmethdo on aScheduler, so that when it finishes with the current item, it can continue immediatelywith the latest item from the upstream.This is similar toFlowable.onBackpressureLatest except that the latest item is always picked up, not just whenthere is also a downstream demand for items.

Observable.range(1,1000000).compose(ObservableTransformers.observeOnLatest(Schedulers.io())).doOnNext(v ->Thread.sleep(1)).test().awaitDone(5,TimeUnit.SECONDS).assertOf(to -> {assertTrue(to.getValueCount() >=1 &&to.getValueCount() <=1000000); });

Flowables.generateAsync

A source operator to bridge async APIs that can be repeatedly called to produce the next item(or terminate in some way) asynchronously and only call the API again once the result hasbeen received and delivered to the downstream, while honoring the backpressure of the downstream.This means if the downstream stops requesting, the API won't be called until the latest result hasbeen requested and consumed by the downstream.

Example APIs could beAsyncEnumerablestyle, coroutine style orasync-await.

Let's assume there is an async API with the following interface definition:

interfaceAsyncAPI<T>extendsAutoCloseable {CompletableFuture<Void>nextValue(Consumer<?superT>onValue);}

When the call succeeds, theonValue is invoked with it. If there are no more items, the CompletableFuture returned by the last nextValue is completed (with null ).If there is an error, the same CompletableFuture is completed exceptionally. Each nextValue invocation creates a fresh CompletableFuture which can be cancelledif necessary. nextValue should not be invoked again until the onValue callbackhas been notified.

An instance of this API can be obtained on demand, thus the state of this operator consists of the AsyncAPI instance supplied for each individual {@code Subscriber}. The API can be transformed intoa Flowable as follows:

Flowable<Integer>source =Flowables.<Integer,AsyncAPI<Integer>>generateAsync(// create a fresh API instance for each individual Subscriber    () ->newAsyncAPIImpl<Integer>(),// this BiFunction will be called once the operator is ready to receive the next item// and will invoke it again only when that item is delivered via emitter.onNext()    (state,emitter) -> {// issue the async API callCompletableFuture<Void>f =state.nextValue(// handle the value receivedvalue -> {// we have the option to signal that itemif (value %2 ==0) {emitter.onNext(value);                }elseif (value ==101) {// or stop altogether, which will also trigger a cleanupemitter.onComplete();                }else {// or drop it and have the operator start a new callemitter.onNothing();                }            }        );// This API call may not produce further items or failf.whenComplete((done,error) -> {// As per the CompletableFuture API, error != null is the error outcome,// done is always null due to the Void typeif (error !=null) {emitter.onError(error);            }else {emitter.onComplete();            }        });// In case the downstream cancels, the current API call// should be cancelled as wellemitter.replaceCancellable(() ->f.cancel(true));// some sources may want to create a fresh state object// after each invocation of this generatorreturnstate;    },// cleanup the state objectstate -> {state.close(); });

FlowableTransformers.partialCollect

Allows converting upstream items into output objects where an upstream itemmay represent such output objects partially or may represent more than oneoutput object.

For example, given a stream of {@code byte[]} where each array could contain partof a larger object, and thus more than one subsequent arrays are required to constructthe output object. The same array could also contain more than one output items, therefore,it should be kept around in case the output is backpressured.

This example shows, given a flow ofStrings with embedded separator|, how onecan split them along the separator and have individual items returned, even whenthey span multiple subsequent items (cdefgh) or more than one is present in a source item (mno||pqr|s).

Flowable.just("ab|cdef","gh|ijkl|","mno||pqr|s","|","tuv|xy","|z").compose(FlowableTransformers.partialCollect(newConsumer<PartialCollectEmitter<String,Integer,StringBuilder,String>>() {@Overridepublicvoidaccept(PartialCollectEmitter<String,Integer,StringBuilder,String>emitter)throwsException {Integeridx =emitter.getIndex();if (idx ==null) {idx =0;        }StringBuildersb =emitter.getAccumulator();if (sb ==null) {sb =newStringBuilder();emitter.setAccumulator(sb);        }if (emitter.demand() !=0) {booleand =emitter.isComplete();if (emitter.size() !=0) {Stringstr =emitter.getItem(0);intj =str.indexOf('|',idx);if (j >=0) {sb.append(str.substring(idx,j));emitter.next(sb.toString());sb.setLength(0);idx =j +1;                }else {sb.append(str.substring(idx));emitter.dropItems(1);idx =0;                }            }elseif (d) {if (sb.length() !=0) {emitter.next(sb.toString());                }emitter.complete();return;            }        }emitter.setIndex(idx);    }},Functions.emptyConsumer(),128)).test().assertResult("ab","cdefgh","ijkl","mno","","pqr","s","tuv","xy","z");

Note that the resulting sequence completes only when the handler callsemitter.complete() becausethe upstream's termination may not mean all output has been generated.

The operator allows generating more than one item per invocation of the handler, but the handler shouldalways checkemitter.demand() if the downstream is ready to receive andemitter.size() tosee if there are more upstream items to produce. The operator will call the handler again if itdetects an item has been produced, therefore, there is no need to exhaustively process all sourceitems on one call (which may not be possible if only partial data is available).

The cleanup callback is called to release the source items if necessary (such as pooledByteBuffers)when it is dropped viaemitter.dropItems() or when the operator gets cancelled.

Theemitter.dropItems() has an additional function, indicate that the upstream can send more itemsas the previous ones have been consumed by the handler. Note though that the operator uses a lowwatermark algorithm to replenish items from upstream (also called stable prefetch), that is, whenmore than 75% of theprefetch parameter has been consumed, that many items are requested fromthe upstream. This reduces an overhead the one-by-one requesting would have.

ObservableTransformers.flatMapDrop

FlatMap only oneObservableSource at a time and ignore upstream values until it terminates.In the following example, click events are practically ignored while therequestDatais active.

Observable<ClickEvent>clicks = ...clicks.compose(ObservableTransformers.flatMapDrop(click ->service.requestData()        .subscribeOn(Schedulers.io())    )).observeOn(mainThread()).subscribe(data -> {/* ... */ });

ObservableTransformers.flatMapLatest

FlatMap only oneObservableSource at a time and keep the latest upstream value until it terminatesand resume with theObservableSource mapped for that latest upstream value.

UnlikeflatMapDrop, this operator will resume with the latest upstream value and not wait for the upstreamto signal a fresh item.

Observable<ClickEvent>clicks = ...clicks.compose(ObservableTransformers.flatMapLatest(click ->service.requestData()        .subscribeOn(Schedulers.io())    )).observeOn(mainThread()).subscribe(data -> {/* ... */ });

FlowableTransformers.errorJump

Allows an upstream error to jump over an inner transformation and isthen re-applied once the inner transformation's returnedFlowable terminates.

Flowable.range(1,5)    .concatWith(Flowable.<Integer>error(newTestException()))    .compose(FlowableTransformers.errorJump(newFlowableTransformer<Integer,List<Integer>>() {@OverridepublicPublisher<List<Integer>>apply(Flowable<Integer>v) {returnv.buffer(3);        }    }))    .test()    .assertFailure(TestException.class,Arrays.asList(1,2,3),Arrays.asList(4,5));

Available also:ObservableTransformers.errorJump()

flatMap signal

Map the upstream signals onto some reactive type and relay its events to the downstream.

Availability:

  • Single
    • SingleTransformers.flatMap (use withSingle.compose())
    • Singles.flatMapCompletable (use withSingle.to())
    • Singles.flatMapMaybe (use withSingle.to())
    • Singles.flatMapObservable (use withSingle.to())
    • Singles.flatMapFlowable (use withSingle.to())
  • Maybe
    • MaybeTransformers.flatMap (use withMaybe.compose())
    • Maybes.flatMapCompletable (use withMaybe.to())
    • Maybes.flatMapSingle (use withMaybe.to())
    • Maybes.flatMapObservable (use withMaybe.to())
    • Maybes.flatMapFlowable (use withMaybe.to())
  • Completable
    • CompletableTransformers.flatMap (use withCompletable.compose())
    • Completables.flatMapSingle (use withCompletable.to())
    • Completables.flatMapMaybe (use withCompletable.to())
    • Completables.flatMapObservable (use withCompletable.to())
    • Completables.flatMapFlowable (use withCompletable.to())

Note: same-type transformations forFlowable.flatMap,Observable.flatMap already exist in RxJava.

Custom parallel operators and transformers

ParallelTransformers.sumX()

Sums the numerical values on each rail as integer, long or double.

Flowable.range(1,5).parallel(1).compose(ParallelTransformers.<Integer>sumInteger()).sequential().test().assertResult(15);Flowable.range(1,5).parallel(1).compose(ParallelTransformers.<Integer>sumLong()).sequential().test().assertResult(15L);Flowable.range(1,5).parallel(1).compose(ParallelTransformers.<Integer>sumDouble()).sequential().test().assertResult(15d);

ParallelTransformers.orderedMerge()

Merges the sourceParallelFlowable rails in an ordered fashion picking the smallest of the available value fromthem (determined by their natural order or via aComparator). The operator supports delaying error and settingthe internal prefetch amount.

ParallelFlowable.fromArray(Flowable.just(1,3,5,7),Flowable.just(0,2,4,6,8,10)).to(p ->ParallelTransformers.orderedMerge(p, (a,b) ->a.compareTo(b))).test().assertResult(0,1,2,3,4,5,6,7,8,10);

Special Publisher implementations

Nono - 0-error publisher

ThePublisher-based sibling of theCompletable type. The usage is practically the same asCompletable with the exception that becauseNono implements the Reactive-StreamsPublisher, you can use it directly with operators ofFlowable that acceptPublisher in some form.

Examples:

Nono.fromAction(() ->System.out.println("Hello world!"))    .subscribe();Nono.fromAction(() ->System.out.println("Hello world!"))    .delay(1,TimeUnit.SECONDS)    .blockingSubscribe();Nono.complete()    .test()    .assertResult();Nono.error(newIOException())    .test()    .assertFailure(IOException.class);Flowable.range(1,10)    .to(Nono::fromPublisher)    .test()    .assertResult();

NonoProcessor

A hot, Reactive-StreamsProcessor implementation ofNono.

NonoProcessornp =NonoProcessor.create();TestSubscriber<Void>ts =np.test();np.onComplete();ts.assertResult();

Solo - 1-error publisher

ThePublisher-based sibling of theSingle type. The usage is practically the same asSingle with the exception that becauseSolo implements the Reactive-StreamsPublisher, you can use it directly with operators ofFlowable that acceptPublisher in some form.

Solo's emission protocol is a restriction over the generalPublisher protocol: one either callsonNext followed byonComplete or justonError. Operators will and should never callonNext followed byonError oronComplete on its own. Note that some operators may react toonNext immediately not waiting for anonComplete but on their emission side,onComplete is always called after anonNext.

Examples:

Solo.fromCallable(() -> {System.out.println("Hello world!");return1;}).subscribe();Solo.fromCallable(() ->"Hello world!")    .delay(1,TimeUnit.SECONDS)    .blockingSubscribe(System.out::println);Flowable.concat(Solo.just(1),Solo.just(2)).test().assertResult(1,2);

SoloProcessor

A hot, Reactive-StreamsProcessor implementation ofSolo.

SoloProcessor<Integer>sp =SoloProcessor.create();TestSubscriber<Integer>ts =sp.test();sp.onNext(1);sp.onComplete();ts.assertResult(1);

Perhaps - 0-1-error publisher

ThePublisher-based sibling of theMaybe type. The usage is practically the same asMaybe with the exception that becausePerhaps implements the Reactive-StreamsPublisher, you can use it directly with operators ofFlowable that acceptPublisher in some form.

Perhaps's emission protocol is a restriction over the generalPublisher protocol: one either callsonNext followed byonComplete,onComplete only or justonError. Operators will and should never callonNext followed byonError on its own. Note that some operators may react toonNext immediately not waiting for anonComplete but on their emission side,onComplete is always called after anonNext.

Examples:

Perhaps.fromCallable(() -> {System.out.println("Hello world!");return1;}).subscribe();Perhaps.fromCallable(() ->"Hello world!")    .delay(1,TimeUnit.SECONDS)    .blockingSubscribe(System.out::println);Flowable.concat(Perhaps.just(1),Perhaps.just(2)).test().assertResult(1,2);Perhaps.fromCallable(() -> {System.out.println("Hello world!");returnnull;// null is considered to indicate an empty Perhaps}).test().assertResult();

PerhapsProcessor

A hot, Reactive-StreamsProcessor implementation ofPerhaps.

PerhapsProcessor<Integer>ph =PerhapsProcessor.create();TestSubscriber<Integer>ts =ph.test();ph.onNext(1);ph.onComplete();ts.assertResult(1);

Custom consumers

The utility classes can be found inhu.akarnokd.rxjava3.consumers package.

FlowableConsumers

subscribeAutoDispose

Wraps the givenonXXX callbacks into aDisposableSubscriber,adds it to the givenCompositeDisposable and ensures, that if the upstreamcompletes or this particularDisposable is disposed, theSubscriber is removedfrom the given composite.

The Subscriber will be removedafter the callback for the terminal event has been invoked.

CompositeDisposablecomposite =newCompositeDisposable();Disposabled =FlowableConsumers.subscribeAutoDispose(Flowable.just(1),composite,System.out::println,Throwable::printStackTrace, () ->System.out.println("Done"));assertEquals(0,composite.size());// --------------------------Disposabled2 =FlowableConsumers.subscribeAutoDispose(Flowable.never(),composite,System.out::println,Throwable::printStackTrace, () ->System.out.println("Done"));assertEquals(1,composite.size());d2.dispose();assertEquals(0,composite.size());

The Subscriber will be removed after the callback for the terminal event has been invoked.

ObservableConsumers

  • subscribeAutoDispose: Similar toFlowableConsumers.subscribeAutoDispose() but targetingObservables andObservers-likeconsumers.

SingleConsumers

  • subscribeAutoDispose: Similar toFlowableConsumers.subscribeAutoDispose() but targetingSingles andSingleObservers-likeconsumers.

MaybeConsumers

  • subscribeAutoDispose: Similar toFlowableConsumers.subscribeAutoDispose() but targetingMaybes andMaybeObservers-likeconsumers.

CompletableConsumers

  • subscribeAutoDispose: Similar toFlowableConsumers.subscribeAutoDispose() but targetingCompletables andCompletableObservers-likeconsumers.

About

RxJava 2.x & 3.x extra sources, operators and components and ports of many 1.x companion libraries.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors15

Languages


[8]ページ先頭

©2009-2025 Movatter.jp