Movatterモバイル変換


[0]ホーム

URL:


ReactiveX
  1. Operators
  2. Transforming
  3. GroupBy

GroupBy

divide an Observable into a set of Observables that each emit a different subset of items from the original Observable

GroupBy

TheGroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.

See Also

Language-Specific Information:

groupBy

RxGroovy implements thegroupBy operator. The Observable it returns emits items of a particular subclass of Observable — theGroupedObservable. Objects that implement theGroupedObservable interface have an additional method —getkey — by which you can retrieve the key by which items were designated for this particularGroupedObservable.

The following sample code usesgroupBy to transform a list of numbers into two lists, grouped by whether or not the numbers are even:

Sample Code

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);def groupFunc = { return(0 == (it % 2)); };numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(  { println(it); },                          // onNext  { println("Error: " + it.getMessage()); }, // onError  { println("Sequence complete"); }          // onCompleted);
[false, 1, 3, 5, 7, 9][true, 2, 4, 6, 8]Sequence complete

Another version ofgroupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resultingGroupedObservables.

Note that whengroupBy splits up the source Observable into an Observable that emitsGroupedObservables, each of theseGroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of theseGroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring aGroupedObservable that you have no interest in observing, you should instead apply an operator liketake(0) to it as a way of signalling to it that it may discard its buffer.

If you unsubscribe from one of theGroupedObservables, or if an operator liketake that you apply to theGroupedObservable unsubscribes from it, thatGroupedObservable will be terminated. If the source Observable later emits an item whose key matches theGroupedObservable that was terminated in this way,groupBy will create and emit anewGroupedObservable to match the key. In other words, unsubscribing from aGroupedObservable willnot causegroupBy to swallow items from its group. For example, see the following code:

Sample Code

Observable.range(1,5)          .groupBy({ 0 })          .flatMap({ this.take(1) })          .subscribe(  { println(it); },                          // onNext  { println("Error: " + it.getMessage()); }, // onError  { println("Sequence complete"); }          // onCompleted);
12345

In the above code, the source Observable emits the sequence{ 1 2 3 4 5 }. When it emits the first item in this sequence, thegroupBy operator creates and emits aGroupedObservable with the key of0. TheflatMap operator applies thetake(1) operator to thatGroupedObservable, which gives it the item (1) that it emits and that also unsubscribes from theGroupedObservable, which is terminated. When the source Observable emits the second item in its sequence, thegroupBy operator creates and emits asecondGroupedObservable with the same key (0) to replace the one that was terminated.flatMap again appliestake(1) to this newGroupedObservable to retrieve the new item to emit (2) and to unsubscribe from and terminate theGroupedObservable, and this process repeats for the remaining items in the source sequence.

groupBy does not by default operate on any particularScheduler.

groupBy

RxJava implements thegroupBy operator. The Observable it returns emits items of a particular subclass of Observable — theGroupedObservable. Objects that implement theGroupedObservable interface have an additional method —getkey — by which you can retrieve the key by which items were designated for this particularGroupedObservable.

Another version ofgroupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resultingGroupedObservables.

Note that whengroupBy splits up the source Observable into an Observable that emitsGroupedObservables, each of theseGroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of theseGroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring aGroupedObservable that you have no interest in observing, you should instead apply an operator liketake(0) to it as a way of signalling to it that it may discard its buffer.

If you unsubscribe from one of theGroupedObservables, thatGroupedObservable will be terminated. If the source Observable later emits an item whose key matches theGroupedObservable that was terminated in this way,groupBy will create and emit a newGroupedObservable to match the key.

groupBy does not by default operate on any particularScheduler.

groupBy

RxJS implementsgroupBy. It takes one to three parameters:

  1. (required) a function that accepts an item from the source Observable and returns its key
  2. a function that accepts an item from the source Observable and returns an item to be emitted in its place by one of the resulting Observables
  3. a function used to compare two keys for identity (that is, whether items with two keys should be emitted on the same Observable)

Sample Code

var codes = [    { keyCode: 38}, // up    { keyCode: 38}, // up    { keyCode: 40}, // down    { keyCode: 40}, // down    { keyCode: 37}, // left    { keyCode: 39}, // right    { keyCode: 37}, // left    { keyCode: 39}, // right    { keyCode: 66}, // b    { keyCode: 65}  // a];var source = Rx.Observable.fromArray(codes)    .groupBy(        function (x) { return x.keyCode; },        function (x) { return x.keyCode; });var subscription = source.subscribe(    function (obs) {        // Print the count        obs.count().subscribe(function (x) {            console.log('Count: ' + x);        });    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });
Count: 2Count: 2Count: 2Count: 2Count: 1Count: 1Completed

groupBy is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js
groupByUntil

RxJS also implementsgroupByUntil. It monitors an additional Observable, and whenever that Observable emits an item, it closes any of the keyed Observables it has opened (it will open new ones if additional items are emitted by the source Observable that match the key).groupByUntil takes from two to four parameters:

  1. (required) a function that accepts an item from the source Observable and returns its key
  2. a function that accepts an item from the source Observable and returns an item to be emitted in its place by one of the resulting Observables
  3. (required) a function that returns an Observable, the emissions from which trigger the termination of any open Observables
  4. a function used to compare two keys for identity (that is, whether items with two keys should be emitted on the same Observable)

Sample Code

var codes = [    { keyCode: 38}, // up    { keyCode: 38}, // up    { keyCode: 40}, // down    { keyCode: 40}, // down    { keyCode: 37}, // left    { keyCode: 39}, // right    { keyCode: 37}, // left    { keyCode: 39}, // right    { keyCode: 66}, // b    { keyCode: 65}  // a];var source = Rx.Observable    .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); })    .groupByUntil(        function (x) { return x.keyCode; },        function (x) { return x.keyCode; },        function (x) { return Rx.Observable.timer(2000); });var subscription = source.subscribe(    function (obs) {        // Print the count        obs.count().subscribe(function (x) { console.log('Count: ' + x); });    },    function (err) {        console.log('Error: ' + err);    },    function () {        console.log('Completed');    });
Count: 2Count: 2Count: 1Count: 1Count: 1Count: 1Count: 1Count: 1Completed

groupByUntil is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

RxPHP implements this operator asgroupBy.

Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);$observable    ->groupBy(        function ($elem) {            if ($elem === 42) {                return 0;            }            return 1;        },        null,        function ($key) {            return $key;        }    )    ->subscribe(function ($groupedObserver) use ($createStdoutObserver) {        $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": "));    });
1: Next value: 210: Next value: 421: Next value: 210: Next value: 421: Next value: 210: Next value: 421: Complete!0: Complete!

RxPHP also has an operatorgroupByUntil.

Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php$codes = [    ['id' => 38],    ['id' => 38],    ['id' => 40],    ['id' => 40],    ['id' => 37],    ['id' => 39],    ['id' => 37],    ['id' => 39],    ['id' => 66],    ['id' => 65]];$source = Rx\Observable    ::fromArray($codes)    ->concatMap(function ($x) {        return \Rx\Observable::timer(100)->mapTo($x);    })    ->groupByUntil(        function ($x) {            return $x['id'];        },        function ($x) {            return $x['id'];        },        function ($x) {            return Rx\Observable::timer(200);        });$subscription = $source->subscribe(new CallbackObserver(    function (\Rx\Observable $obs) {        // Print the count        $obs->count()->subscribe(new CallbackObserver(            function ($x) {                echo 'Count: ', $x, PHP_EOL;            }));    },    function (Throwable $err) {        echo 'Error', $err->getMessage(), PHP_EOL;    },    function () {        echo 'Completed', PHP_EOL;    }));
Count: 2Count: 2Count: 1Count: 1Count: 1Count: 1Count: 1Count: 1Completed

RxPHP also has an operatorpartition.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.phplist($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate())    ->partition(function ($x) {        return $x % 2 === 0;    });//Because we used the immediate scheduler with range, the subscriptions are not asynchronous.$evens->subscribe($createStdoutObserver('Evens '));$odds->subscribe($createStdoutObserver('Odds '));
Evens Next value: 0Evens Next value: 2Evens Next value: 4Evens Next value: 6Evens Next value: 8Evens Complete!Odds Next value: 1Odds Next value: 3Odds Next value: 5Odds Next value: 7Odds Next value: 9Odds Complete!

[8]ページ先頭

©2009-2026 Movatter.jp