- Notifications
You must be signed in to change notification settings - Fork7.6k
Parallel flows
Version 2.0.5 introduced theParallelFlowable API that allows parallel execution of a few select operators such asmap,filter,concatMap,flatMap,collect,reduce and so on. Note that is aparallel mode forFlowable (a sub-domain specific language) instead of a new reactive base type.
Consequently, several typical operators such astake,skip and many others are not available and there is noParallelObservable becausebackpressure is essential in not flooding the internal queues of the parallel operators as by expectation, we want to go parallel because the processing of the data is slow on one thread.
The easiest way of entering the parallel world is by usingFlowable.parallel:
ParallelFlowable<Integer>source =Flowable.range(1,1000).parallel();
By default, the parallelism level is set to the number of available CPUs (Runtime.getRuntime().availableProcessors()) and the prefetch amount from the sequential source is set toFlowable.bufferSize() (128). Both can be specified via overloads ofparallel().
ParallelFlowable follows the same principles of parametric asynchrony asFlowable does, therefore,parallel() on itself doesn't introduce the asynchronous consumption of the sequential source but only prepares the parallel flow; the asynchrony is defined via therunOn(Scheduler) operator.
ParallelFlowable<Integer>psource =source.runOn(Schedulers.io());
The parallelism level (ParallelFlowable.parallelism()) doesn't have to match the parallelism level of theScheduler. TherunOn operator will use as manyScheduler.Worker instances as defined by the parallelized source. This allowsParallelFlowable to work for CPU intensive tasks viaSchedulers.computation(), blocking/IO bound tasks throughSchedulers.io() and unit testing viaTestScheduler. You can specify the prefetch amount onrunOn as well.
Once the necessary parallel operations have been applied, you can return to the sequentialFlowable via theParallelFlowable.sequential() operator.
Flowable<Integer>result =psource.filter(v ->v %3 ==0).map(v ->v *v).sequential();
Note thatsequential doesn't guarantee any ordering between values flowing through the parallel operators.
TBD
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava