Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Parallel flows

David Karnok edited this pageJan 18, 2017 ·2 revisions

Introduction

Version 2.0.5 introduced theParallelFlowable API that allows parallel execution of a few select operators such asmap,filter,concatMap,flatMap,collect,reduce and so on. Note that is aparallel mode forFlowable (a sub-domain specific language) instead of a new reactive base type.

Consequently, several typical operators such astake,skip and many others are not available and there is noParallelObservable becausebackpressure is essential in not flooding the internal queues of the parallel operators as by expectation, we want to go parallel because the processing of the data is slow on one thread.

The easiest way of entering the parallel world is by usingFlowable.parallel:

ParallelFlowable<Integer>source =Flowable.range(1,1000).parallel();

By default, the parallelism level is set to the number of available CPUs (Runtime.getRuntime().availableProcessors()) and the prefetch amount from the sequential source is set toFlowable.bufferSize() (128). Both can be specified via overloads ofparallel().

ParallelFlowable follows the same principles of parametric asynchrony asFlowable does, therefore,parallel() on itself doesn't introduce the asynchronous consumption of the sequential source but only prepares the parallel flow; the asynchrony is defined via therunOn(Scheduler) operator.

ParallelFlowable<Integer>psource =source.runOn(Schedulers.io());

The parallelism level (ParallelFlowable.parallelism()) doesn't have to match the parallelism level of theScheduler. TherunOn operator will use as manyScheduler.Worker instances as defined by the parallelized source. This allowsParallelFlowable to work for CPU intensive tasks viaSchedulers.computation(), blocking/IO bound tasks throughSchedulers.io() and unit testing viaTestScheduler. You can specify the prefetch amount onrunOn as well.

Once the necessary parallel operations have been applied, you can return to the sequentialFlowable via theParallelFlowable.sequential() operator.

Flowable<Integer>result =psource.filter(v ->v %3 ==0).map(v ->v *v).sequential();

Note thatsequential doesn't guarantee any ordering between values flowing through the parallel operators.

Parallel operators

TBD

Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

Clone this wiki locally


[8]ページ先頭

©2009-2025 Movatter.jp