- Notifications
You must be signed in to change notification settings - Fork7.6k
Creating Observables
This page shows methods that create reactive sources, such asObservables.
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/just.html
Constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription.
Stringgreeting ="Hello world!";Observable<String>observable =Observable.just(greeting);observable.subscribe(item ->System.out.println(item));
There exist overloads with 2 to 9 arguments for convenience, which objects (with the same common type) will be emitted in the order they are specified.
Observable<Object>observable =Observable.just("1","A","3.2","def");observable.subscribe(item ->System.out.print(item),error ->error.printStackTrace(), () ->System.out.println());
Constructs a sequence from a pre-existing source or generator type.
Note: These static methods use the postfix naming convention (i.e., the argument type is repeated in the method name) to avoid overload resolution ambiguities.
ReactiveX documentation:http://reactivex.io/documentation/operators/from.html
Available in:Flowable,Observable,Maybe,Single,Completable
Signals the items from ajava.lang.Iterable source (such asLists,Sets orCollections or customIterables) and then completes the sequence.
List<Integer>list =newArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8));Observable<Integer>observable =Observable.fromIterable(list);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));
Available in:Flowable,Observable,Maybe,Single,Completable
Signals the elements of the given array and then completes the sequence.
Integer[]array =newInteger[10];for (inti =0;i <array.length;i++) {array[i] =i;}Observable<Integer>observable =Observable.fromArray(array);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));
Note: RxJava does not support primitive arrays, only (generic) reference arrays.
Available in:Flowable,Observable,Maybe,Single,Completable
When a consumer subscribes, the givenjava.util.concurrent.Callable is invoked and its returned value (or thrown exception) is relayed to that consumer.
Callable<String>callable = () -> {System.out.println("Hello World!");return"Hello World!");}Observable<String>observable =Observable.fromCallable(callable);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));
Remark: InCompletable, the actual returned value is ignored and theCompletable simply completes.
Available in:Flowable,Observable,Maybe,Single,Completable
When a consumer subscribes, the givenio.reactivex.function.Action is invoked and the consumer completes or receives the exception theAction threw.
Actionaction = () ->System.out.println("Hello World!");Completablecompletable =Completable.fromAction(action);completable.subscribe(() ->System.out.println("Done"),error ->error.printStackTrace());
Note: the difference betweenfromAction andfromRunnable is that theAction interface allows throwing a checked exception while thejava.lang.Runnable does not.
Available in:Flowable,Observable,Maybe,Single,Completable
When a consumer subscribes, the givenio.reactivex.function.Action is invoked and the consumer completes or receives the exception theAction threw.
Runnablerunnable = () ->System.out.println("Hello World!");Completablecompletable =Completable.fromRunnable(runnable);completable.subscribe(() ->System.out.println("Done"),error ->error.printStackTrace());
Note: the difference betweenfromAction andfromRunnable is that theAction interface allows throwing a checked exception while thejava.lang.Runnable does not.
Available in:Flowable,Observable,Maybe,Single,Completable
Given a pre-existing, already running or already completedjava.util.concurrent.Future, wait for theFuture to complete normally or with an exception in a blocking fashion and relay the produced value or exception to the consumers.
ScheduledExecutorServiceexecutor =Executors.newSingleThreadScheduledExecutor();Future<String>future =executor.schedule(() ->"Hello world!",1,TimeUnit.SECONDS);Observable<String>observable =Observable.fromFuture(future);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));executor.shutdown();
Wraps or converts another reactive type to the target reactive type.
The following combinations are available in the various reactive types with the following signature pattern:targetType.from{sourceType}()
Available in:
| targetType \ sourceType | Publisher | Observable | Maybe | Single | Completable |
|---|---|---|---|---|---|
| Flowable | ![]() | ||||
| Observable | ![]() | ||||
| Maybe | ![]() | ![]() | |||
| Single | ![]() | ![]() | |||
| Completable | ![]() | ![]() | ![]() | ![]() |
*Note: not all possible conversion is implemented via thefrom{reactive type} method families. Check out theto{reactive type} method families for further conversion possibilities.
Flux<Integer>reactorFlux =Flux.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1));Observable<Integer>observable =Observable.fromPublisher(reactorFlux);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/create.html
Creates a cold, synchronous and stateful generator of values.
intstartValue =1;intincrementValue =1;Flowable<Integer>flowable =Flowable.generate(() ->startValue, (s,emitter) -> {intnextValue =s +incrementValue;emitter.onNext(nextValue);returnnextValue;});flowable.subscribe(value ->System.out.println(value));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/create.html
Construct asafe reactive type instance which when subscribed to by a consumer, runs an user-provided function and provides a type-specificEmitter for this function to generate the signal(s) the designated business logic requires. This method allows bridging the non-reactive, usually listener/callback-style world, with the reactive world.
ScheduledExecutorServiceexecutor =Executors.newSingleThreadedScheduledExecutor();ObservableOnSubscribe<String>handler =emitter -> {Future<Object>future =executor.schedule(() -> {emitter.onNext("Hello");emitter.onNext("World");emitter.onComplete();returnnull; },1,TimeUnit.SECONDS);emitter.setCancellable(() ->future.cancel(false));};Observable<String>observable =Observable.create(handler);observable.subscribe(item ->System.out.println(item),error ->error.printStackTrace(), () ->System.out.println("Done"));Thread.sleep(2000);executor.shutdown();
Note:Flowable.create() must also specify the backpressure behavior to be applied when the user-provided function generates more items than the downstream consumer has requested.
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/defer.html
Calls an user-providedjava.util.concurrent.Callable when a consumer subscribes to the reactive type so that theCallable can generate the actual reactive instance to relay signals from towards the consumer.defer allows:
- associating a per-consumer state with such generated reactive instances,
- allows executing side-effects before an actual/generated reactive instance gets subscribed to,
- turn hot sources (i.e.,
Subjects andProcessors) into cold sources by basically making those hot sources not exist until a consumer subscribes.
Observable<Long>observable =Observable.defer(() -> {longtime =System.currentTimeMillis();returnObservable.just(time);});observable.subscribe(time ->System.out.println(time));Thread.sleep(1000);observable.subscribe(time ->System.out.println(time));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/range.html
Generates a sequence of values to each individual consumer. Therange() method generatesIntegers, therangeLong() generatesLongs.
Stringgreeting ="Hello World!";Observable<Integer>indexes =Observable.range(0,greeting.length());Observable<Character>characters =indexes .map(index ->greeting.charAt(index));characters.subscribe(character ->System.out.print(character),error ->error.printStackTrace(), () ->System.out.println());
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/interval.html
Periodically generates an infinite, ever increasing numbers (of typeLong). TheintervalRange variant generates a limited amount of such numbers.
Observable<Long>clock =Observable.interval(1,TimeUnit.SECONDS);clock.subscribe(time -> {if (time %2 ==0) {System.out.println("Tick"); }else {System.out.println("Tock"); }});
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/timer.html
After the specified time, this reactive source signals a single0L (then completes forFlowable andObservable).
Observable<Long>eggTimer =Observable.timer(5,TimeUnit.MINUTES);eggTimer.blockingSubscribe(v ->System.out.println("Egg is ready!"));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/empty-never-throw.html
This type of source signals completion immediately upon subscription.
Observable<String>empty =Observable.empty();empty.subscribe(v ->System.out.println("This should never be printed!"),error ->System.out.println("Or this!"), () ->System.out.println("Done will be printed."));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/empty-never-throw.html
This type of source does not signal anyonNext,onSuccess,onError oronComplete. This type of reactive source is useful in testing or "disabling" certain sources in combinator operators.
Observable<String>never =Observable.never();never.subscribe(v ->System.out.println("This should never be printed!"),error ->System.out.println("Or this!"), () ->System.out.println("This neither!"));
Available in:Flowable,Observable,Maybe,Single,Completable
ReactiveX documentation:http://reactivex.io/documentation/operators/empty-never-throw.html
Signal an error, either pre-existing or generated via ajava.util.concurrent.Callable, to the consumer.
Observable<String>error =Observable.error(newIOException());error.subscribe(v ->System.out.println("This should never be printed!"),e ->e.printStackTrace(), () ->System.out.println("This neither!"));
A typical use case is to conditionally map or suppress an exception in a chain utilizingonErrorResumeNext:
Observable<String>observable =Observable.fromCallable(() -> {if (Math.random() <0.5) {thrownewIOException(); }thrownewIllegalArgumentException();});Observable<String>result =observable.onErrorResumeNext(error -> {if (errorinstanceofIllegalArgumentException) {returnObservable.empty(); }returnObservable.error(error);});for (inti =0;i <10;i++) {result.subscribe(v ->System.out.println("This should never be printed!"),error ->error.printStackTrace(), () ->System.out.println("Done"));}
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava |Gitter @RxJava

