- Notifications
You must be signed in to change notification settings - Fork7.6k
Transforming Observables
This page shows operators with which you can transform items that are emitted by reactive sources, such asObservables.
buffercastconcatMapconcatMapCompletableconcatMapCompletableDelayErrorconcatMapDelayErrorconcatMapEagerconcatMapEagerDelayErrorconcatMapIterableconcatMapMaybeconcatMapMaybeDelayErrorconcatMapSingleconcatMapSingleDelayErrorflatMapflatMapCompletableflatMapIterableflatMapMaybeflatMapObservableflatMapPublisherflatMapSingleflatMapSingleElementflattenAsFlowableflattenAsObservablegroupBymapscanswitchMapwindow
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/buffer.html
Collects the items emitted by a reactive source into buffers, and emits these buffers.
Observable.range(0,10) .buffer(4) .subscribe((List<Integer>buffer) ->System.out.println(buffer));// prints:// [0, 1, 2, 3]// [4, 5, 6, 7]// [8, 9]
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/map.html
Converts each item emitted by a reactive source to the specified type, and emits these items.
Observable<Number>numbers =Observable.just(1,4.0,3f,7,12,4.6,5);numbers.filter((Numberx) ->Integer.class.isInstance(x)) .cast(Integer.class) .subscribe((Integerx) ->System.out.println(x));// prints:// 1// 7// 12// 5
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items that result from concatenating the results of these function applications.
Observable.range(0,5) .concatMap(i -> {longdelay =Math.round(Math.random() *2);returnObservable.timer(delay,TimeUnit.SECONDS).map(n ->i); }) .blockingSubscribe(System.out::print);// prints 01234
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.CompletableSource, subscribes to them one at a time and returns aCompletable that completes when all sources completed.
Observable<Integer>source =Observable.just(2,1,3);Completablecompletable =source.concatMapCompletable(x -> {returnCompletable.timer(x,TimeUnit.SECONDS) .doOnComplete(() ->System.out.println("Info: Processing of item\"" +x +"\" completed")); });completable.doOnComplete(() ->System.out.println("Info: Processing of all items completed")) .blockingAwait();// prints:// Info: Processing of item "2" completed// Info: Processing of item "1" completed// Info: Processing of item "3" completed// Info: Processing of all items completed
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.CompletableSource, subscribes to them one at a time and returns aCompletable that completes when all sources completed. Any errors from the sources will be delayed until all of them terminate.
Observable<Integer>source =Observable.just(2,1,3);Completablecompletable =source.concatMapCompletableDelayError(x -> {if (x.equals(2)) {returnCompletable.error(newIOException("Processing of item\"" +x +"\" failed!")); }else {returnCompletable.timer(1,TimeUnit.SECONDS) .doOnComplete(() ->System.out.println("Info: Processing of item\"" +x +"\" completed")); }});completable.doOnError(error ->System.out.println("Error: " +error.getMessage())) .onErrorComplete() .blockingAwait();// prints:// Info: Processing of item "1" completed// Info: Processing of item "3" completed// Error: Processing of item "2" failed!
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items that result from concatenating the results of these function applications. Any errors from the sources will be delayed until all of them terminate.
Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS) .concatMapDelayError(x -> {if (x.equals(1L))returnObservable.error(newIOException("Something went wrong!"));elsereturnObservable.just(x,x *x); }) .blockingSubscribe(x ->System.out.println("onNext: " +x),error ->System.out.println("onError: " +error.getMessage()));// prints:// onNext: 2// onNext: 4// onNext: 3// onNext: 9// onError: Something went wrong!
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items that result from concatenating the results of these function applications. UnlikeconcatMap, this operator eagerly subscribes to all sources.
Observable.range(0,5) .concatMapEager(i -> {longdelay =Math.round(Math.random() *3);returnObservable.timer(delay,TimeUnit.SECONDS) .map(n ->i) .doOnNext(x ->System.out.println("Info: Finished processing item " +x)); }) .blockingSubscribe(i ->System.out.println("onNext: " +i));// prints (lines beginning with "Info..." can be displayed in a different order):// Info: Finished processing item 2// Info: Finished processing item 0// onNext: 0// Info: Finished processing item 1// onNext: 1// onNext: 2// Info: Finished processing item 3// Info: Finished processing item 4// onNext: 3// onNext: 4
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items that result from concatenating the results of these function applications. Aboolean value must be specified, which iftrue indicates that all errors from all sources will be delayed until the end, otherwise iffalse, an error from the main source will be signalled when the current source terminates. UnlikeconcatMapDelayError, this operator eagerly subscribes to all sources.
Observable<Integer>source =Observable.create(emitter -> {emitter.onNext(1);emitter.onNext(2);emitter.onError(newError("Fatal error!"));});source.doOnError(error ->System.out.println("Info: Error from main source " +error.getMessage())) .concatMapEagerDelayError(x -> {returnObservable.timer(1,TimeUnit.SECONDS).map(n ->x) .doOnSubscribe(it ->System.out.println("Info: Processing of item\"" +x +"\" started")); },true) .blockingSubscribe(x ->System.out.println("onNext: " +x),error ->System.out.println("onError: " +error.getMessage()));// prints:// Info: Processing of item "1" started// Info: Processing of item "2" started// Info: Error from main source Fatal error!// onNext: 1// onNext: 2// onError: Fatal error!
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns ajava.lang.Iterable, and emits the items that result from concatenating the results of these function applications.
Observable.just("A","B","C") .concatMapIterable(item ->List.of(item,item,item)) .subscribe(System.out::print);// prints AAABBBCCC
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.MaybeSource, and emits the items that result from concatenating theseMaybeSources.
Observable.just("5","3,14","2.71","FF") .concatMapMaybe(v -> {returnMaybe.fromCallable(() ->Double.parseDouble(v)) .doOnError(e ->System.out.println("Info: The value\"" +v +"\" could not be parsed."))// Ignore values that can not be parsed. .onErrorComplete(); }) .subscribe(x ->System.out.println("onNext: " +x));// prints:// onNext: 5.0// Info: The value "3,14" could not be parsed.// onNext: 2.71// Info: The value "FF" could not be parsed.
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.MaybeSource, and emits the items that result from concatenating theseMaybeSources. Any errors from the sources will be delayed until all of them terminate.
DateTimeFormatterdateFormatter =DateTimeFormatter.ofPattern("dd.MM.uuuu");Observable.just("04.03.2018","12-08-2018","06.10.2018","01.12.2018") .concatMapMaybeDelayError(date -> {returnMaybe.fromCallable(() ->LocalDate.parse(date,dateFormatter)); }) .subscribe(localDate ->System.out.println("onNext: " +localDate),error ->System.out.println("onError: " +error.getMessage()));// prints:// onNext: 2018-03-04// onNext: 2018-10-06// onNext: 2018-12-01// onError: Text '12-08-2018' could not be parsed at index 2
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.SingleSource, and emits the items that result from concatenating theseSingleSources.
Observable.just("5","3,14","2.71","FF") .concatMapSingle(v -> {returnSingle.fromCallable(() ->Double.parseDouble(v)) .doOnError(e ->System.out.println("Info: The value\"" +v +"\" could not be parsed."))// Return a default value if the given value can not be parsed. .onErrorReturnItem(42.0); }) .subscribe(x ->System.out.println("onNext: " +x));// prints:// onNext: 5.0// Info: The value "3,14" could not be parsed.// onNext: 42.0// onNext: 2.71// Info: The value "FF" could not be parsed.// onNext: 42.0
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.SingleSource, and emits the items that result from concatenating the results of these function applications. Any errors from the sources will be delayed until all of them terminate.
DateTimeFormatterdateFormatter =DateTimeFormatter.ofPattern("dd.MM.uuuu");Observable.just("24.03.2018","12-08-2018","06.10.2018","01.12.2018") .concatMapSingleDelayError(date -> {returnSingle.fromCallable(() ->LocalDate.parse(date,dateFormatter)); }) .subscribe(localDate ->System.out.println("onNext: " +localDate),error ->System.out.println("onError: " +error.getMessage()));// prints:// onNext: 2018-03-24// onNext: 2018-10-06// onNext: 2018-12-01// onError: Text '12-08-2018' could not be parsed at index 2
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items that result from merging the results of these function applications.
Observable.just("A","B","C") .flatMap(a -> {returnObservable.intervalRange(1,3,0,1,TimeUnit.SECONDS) .map(b ->'(' +a +", " +b +')'); }) .blockingSubscribe(System.out::println);// prints (not necessarily in this order):// (A, 1)// (C, 1)// (B, 1)// (A, 2)// (C, 2)// (B, 2)// (A, 3)// (C, 3)// (B, 3)
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.CompletableSource, and returns aCompletable that completes when all sources completed.
Observable<Integer>source =Observable.just(2,1,3);Completablecompletable =source.flatMapCompletable(x -> {returnCompletable.timer(x,TimeUnit.SECONDS) .doOnComplete(() ->System.out.println("Info: Processing of item\"" +x +"\" completed"));});completable.doOnComplete(() ->System.out.println("Info: Processing of all items completed")) .blockingAwait();// prints:// Info: Processing of item "1" completed// Info: Processing of item "2" completed// Info: Processing of item "3" completed// Info: Processing of all items completed
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns ajava.lang.Iterable, and emits the elements from theseIterables.
Observable.just(1,2,3,4) .flatMapIterable(x -> {switch (x %4) {case1:returnList.of("A");case2:returnList.of("B","B");case3:returnList.of("C","C","C");default:returnList.of(); } }) .subscribe(System.out::println);// prints:// A// B// B// C// C// C
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.MaybeSource, and emits the items that result from merging theseMaybeSources.
Observable.just(9.0,16.0, -4.0) .flatMapMaybe(x -> {if (x.compareTo(0.0) <0)returnMaybe.empty();elsereturnMaybe.just(Math.sqrt(x)); }) .subscribe(System.out::println,Throwable::printStackTrace, () ->System.out.println("onComplete"));// prints:// 3.0// 4.0// onComplete
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to the item emitted by aMaybe orSingle, where that function returns anio.reactivex.rxjava3.core.ObservableSource, and returns anObservable that emits the items emitted by thisObservableSource.
Single<String>source =Single.just("Kirk, Spock, Chekov, Sulu");Observable<String>names =source.flatMapObservable(text -> {returnObservable.fromArray(text.split(",")) .map(String::strip);});names.subscribe(name ->System.out.println("onNext: " +name));// prints:// onNext: Kirk// onNext: Spock// onNext: Chekov// onNext: Sulu
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to the item emitted by aMaybe orSingle, where that function returns anorg.reactivestreams.Publisher, and returns aFlowable that emits the items emitted by thisPublisher.
Single<String>source =Single.just("Kirk, Spock, Chekov, Sulu");Flowable<String>names =source.flatMapPublisher(text -> {returnFlowable.fromArray(text.split(",")) .map(String::strip);});names.subscribe(name ->System.out.println("onNext: " +name));// prints:// onNext: Kirk// onNext: Spock// onNext: Chekov// onNext: Sulu
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns aio.reactivex.rxjava3.core.SingleSource, and emits the items that result from merging theseSingleSources.
Observable.just(4,2,1,3) .flatMapSingle(x ->Single.timer(x,TimeUnit.SECONDS).map(i ->x)) .blockingSubscribe(System.out::print);// prints 1234
Note:Maybe::flatMapSingle returns aSingle that signals an error notification if theMaybe source is empty:
Maybe<Object>emptySource =Maybe.empty();Single<Object>result =emptySource.flatMapSingle(x ->Single.just(x));result.subscribe(x ->System.out.println("onSuccess will not be printed!"),error ->System.out.println("onError: Source was empty!"));// prints:// onError: Source was empty!
UseMaybe::flatMapSingleElement -- which returns aMaybe -- if you don't want this behaviour.
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to the item emitted by aMaybe, where that function returns aio.reactivex.rxjava3.core.SingleSource, and returns aMaybe that either emits the item emitted by thisSingleSource or completes if the sourceMaybe just completes.
Maybe<Integer>source =Maybe.just(-42);Maybe<Integer>result =source.flatMapSingleElement(x -> {returnSingle.just(Math.abs(x));});result.subscribe(System.out::println);// prints 42
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to the item emitted by aMaybe orSingle, where that function returns ajava.lang.Iterable, and returns aFlowable that emits the elements from thisIterable.
Single<Double>source =Single.just(2.0);Flowable<Double>flowable =source.flattenAsFlowable(x -> {returnList.of(x,Math.pow(x,2),Math.pow(x,3));});flowable.subscribe(x ->System.out.println("onNext: " +x));// prints:// onNext: 2.0// onNext: 4.0// onNext: 8.0
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to the item emitted by aMaybe orSingle, where that function returns ajava.lang.Iterable, and returns anObservable that emits the elements from thisIterable.
Single<Double>source =Single.just(2.0);Observable<Double>observable =source.flattenAsObservable(x -> {returnList.of(x,Math.pow(x,2),Math.pow(x,3));});observable.subscribe(x ->System.out.println("onNext: " +x));// prints:// onNext: 2.0// onNext: 4.0// onNext: 8.0
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/groupby.html
Groups the items emitted by a reactive source according to a specified criterion, and emits these grouped items as aGroupedObservable orGroupedFlowable.
Observable<String>animals =Observable.just("Tiger","Elephant","Cat","Chameleon","Frog","Fish","Turtle","Flamingo");animals.groupBy(animal ->animal.charAt(0),String::toUpperCase) .concatMapSingle(Observable::toList) .subscribe(System.out::println);// prints:// [TIGER, TURTLE]// [ELEPHANT]// [CAT, CHAMELEON]// [FROG, FISH, FLAMINGO]
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/map.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source and emits the results of these function applications.
Observable.just(1,2,3) .map(x ->x *x) .subscribe(System.out::println);// prints:// 1// 4// 9
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/scan.html
Applies the givenio.reactivex.rxjava3.functions.BiFunction to a seed value and the first item emitted by a reactive source, then feeds the result of that function application along with the second item emitted by the reactive source into the same function, and so on until all items have been emitted by the reactive source, emitting each intermediate result.
Observable.just(5,3,8,1,7) .scan(0, (partialSum,x) ->partialSum +x) .subscribe(System.out::println);// prints:// 0// 5// 8// 16// 17// 24
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/flatmap.html
Applies the givenio.reactivex.rxjava3.functions.Function to each item emitted by a reactive source, where that function returns a reactive source, and emits the items emitted by the most recently projected of these reactive sources.
Observable.interval(0,1,TimeUnit.SECONDS) .switchMap(x -> {returnObservable.interval(0,750,TimeUnit.MILLISECONDS) .map(y ->x); }) .takeWhile(x ->x <3) .blockingSubscribe(System.out::print);// prints 001122
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/window.html
Collects the items emitted by a reactive source into windows, and emits these windows as aFlowable orObservable.
Observable.range(1,10)// Create windows containing at most 2 items, and skip 3 items before starting a new window. .window(2,3) .flatMapSingle(window -> {returnwindow.map(String::valueOf) .reduce(newStringJoiner(", ","[","]"),StringJoiner::add); }) .subscribe(System.out::println);// prints:// [1, 2]// [4, 5]// [7, 8]// [10]
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

