
TheGroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.

RxGroovy implements thegroupBy operator. The Observable it returns emits items of a particular subclass of Observable — theGroupedObservable. Objects that implement theGroupedObservable interface have an additional method —getkey — by which you can retrieve the key by which items were designated for this particularGroupedObservable.
The following sample code usesgroupBy to transform a list of numbers into two lists, grouped by whether or not the numbers are even:
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);def groupFunc = { return(0 == (it % 2)); };numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted);[false, 1, 3, 5, 7, 9][true, 2, 4, 6, 8]Sequence complete
Another version ofgroupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resultingGroupedObservables.
Note that whengroupBy splits up the source Observable into an Observable that emitsGroupedObservables, each of theseGroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of theseGroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring aGroupedObservable that you have no interest in observing, you should instead apply an operator liketake(0) to it as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of theGroupedObservables, or if an operator liketake that you apply to theGroupedObservable unsubscribes from it, thatGroupedObservable will be terminated. If the source Observable later emits an item whose key matches theGroupedObservable that was terminated in this way,groupBy will create and emit anewGroupedObservable to match the key. In other words, unsubscribing from aGroupedObservable willnot causegroupBy to swallow items from its group. For example, see the following code:
Observable.range(1,5) .groupBy({ 0 }) .flatMap({ this.take(1) }) .subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted);12345
In the above code, the source Observable emits the sequence{ 1 2 3 4 5 }. When it emits the first item in this sequence, thegroupBy operator creates and emits aGroupedObservable with the key of0. TheflatMap operator applies thetake(1) operator to thatGroupedObservable, which gives it the item (1) that it emits and that also unsubscribes from theGroupedObservable, which is terminated. When the source Observable emits the second item in its sequence, thegroupBy operator creates and emits asecondGroupedObservable with the same key (0) to replace the one that was terminated.flatMap again appliestake(1) to this newGroupedObservable to retrieve the new item to emit (2) and to unsubscribe from and terminate theGroupedObservable, and this process repeats for the remaining items in the source sequence.
groupBy does not by default operate on any particularScheduler.
groupBy(Func1)groupBy(Func1,Func1)
RxJava implements thegroupBy operator. The Observable it returns emits items of a particular subclass of Observable — theGroupedObservable. Objects that implement theGroupedObservable interface have an additional method —getkey — by which you can retrieve the key by which items were designated for this particularGroupedObservable.
Another version ofgroupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resultingGroupedObservables.
Note that whengroupBy splits up the source Observable into an Observable that emitsGroupedObservables, each of theseGroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of theseGroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring aGroupedObservable that you have no interest in observing, you should instead apply an operator liketake(0) to it as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of theGroupedObservables, thatGroupedObservable will be terminated. If the source Observable later emits an item whose key matches theGroupedObservable that was terminated in this way,groupBy will create and emit a newGroupedObservable to match the key.
groupBy does not by default operate on any particularScheduler.
groupBy(Func1)groupBy(Func1,Func1)
RxJS implementsgroupBy. It takes one to three parameters:
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a];var source = Rx.Observable.fromArray(codes) .groupBy( function (x) { return x.keyCode; }, function (x) { return x.keyCode; });var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });Count: 2Count: 2Count: 2Count: 2Count: 1Count: 1Completed
groupBy is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.coincidence.js
RxJS also implementsgroupByUntil. It monitors an additional Observable, and whenever that Observable emits an item, it closes any of the keyed Observables it has opened (it will open new ones if additional items are emitted by the source Observable that match the key).groupByUntil takes from two to four parameters:
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a];var source = Rx.Observable .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); }) .groupByUntil( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }, function (x) { return Rx.Observable.timer(2000); });var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });Count: 2Count: 2Count: 1Count: 1Count: 1Count: 1Count: 1Count: 1Completed
groupByUntil is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.coincidence.js RxPHP implements this operator asgroupBy.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);$observable ->groupBy( function ($elem) { if ($elem === 42) { return 0; } return 1; }, null, function ($key) { return $key; } ) ->subscribe(function ($groupedObserver) use ($createStdoutObserver) { $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": ")); });1: Next value: 210: Next value: 421: Next value: 210: Next value: 421: Next value: 210: Next value: 421: Complete!0: Complete!
RxPHP also has an operatorgroupByUntil.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php$codes = [ ['id' => 38], ['id' => 38], ['id' => 40], ['id' => 40], ['id' => 37], ['id' => 39], ['id' => 37], ['id' => 39], ['id' => 66], ['id' => 65]];$source = Rx\Observable ::fromArray($codes) ->concatMap(function ($x) { return \Rx\Observable::timer(100)->mapTo($x); }) ->groupByUntil( function ($x) { return $x['id']; }, function ($x) { return $x['id']; }, function ($x) { return Rx\Observable::timer(200); });$subscription = $source->subscribe(new CallbackObserver( function (\Rx\Observable $obs) { // Print the count $obs->count()->subscribe(new CallbackObserver( function ($x) { echo 'Count: ', $x, PHP_EOL; })); }, function (Throwable $err) { echo 'Error', $err->getMessage(), PHP_EOL; }, function () { echo 'Completed', PHP_EOL; }));Count: 2Count: 2Count: 1Count: 1Count: 1Count: 1Count: 1Count: 1Completed
RxPHP also has an operatorpartition.
Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.phplist($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate()) ->partition(function ($x) { return $x % 2 === 0; });//Because we used the immediate scheduler with range, the subscriptions are not asynchronous.$evens->subscribe($createStdoutObserver('Evens '));$odds->subscribe($createStdoutObserver('Odds '));Evens Next value: 0Evens Next value: 2Evens Next value: 4Evens Next value: 6Evens Next value: 8Evens Complete!Odds Next value: 1Odds Next value: 3Odds Next value: 5Odds Next value: 7Odds Next value: 9Odds Complete!
TBD