Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. It abstracts some of the more complex tasks associated with asynchronous operations including threading and concurrency.
This example builds on the basics described inPart One of this topic. If you are not familiar with the basics, please look back at this previous article before moving forward with these more advanced concepts. This article will examine asynchronous or concurrency tasks that are made easier with RxJava. The concurrency problems that RxJava is suitable to solve and we will look at in this example include:
- Nested callbacks
- Making asynchronous calls
- Aggregating or combining asynchronous calls
- Streaming
You can also check this tutorial in the following video:
1. Setup
We used Eclipse Neon, Java 8, Maven 3.3.9 and RxJava 2.0.0. At the time of this sample, we ran into issues using the last version, 2.0.7, with Eclipse as the source was not available. This example uses a simple Java application to demonstrate Reactive functionality.
pom.xml
1 2 3 4 5 | <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.0.0</version></dependency> |
2. Simple Asynchronous Call
Let’s start by exploring the asynchronous capabilities in RxJava. In the next few examples we will use RxJava to spawn new threads to do various tasks. The default behavior of anObservable is to observe on the same thread where the subscribe method is called. You can introduce an asynchronous process using thesubscribeOn method. Here we will look at a simple asynchronous call in RxJava.
Notice that in this example we are subscribing on theSchedulers.newThread() scheduler and using lambda notation to execute the serialize method of theFlowableEmitter interface. TheFlowableEmitter interface requires that we call theonNext,onError andonComplete methods sequentially. In this example, we will simplify things and use lambda notation to call theserialize method which will ensure that these methods are serialized. The body of this method will act as the worker thread and will sleep for three seconds while the process moves ahead. You can see from the output that the flowable messages, “RxJavaExampleAdvanced:51 – Executing async flowable…”, are logged after the main thread finishes “RxJavaExampleAdvanced:59 – Finished simple async”.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | publicstaticvoidsimpleAsync() { logger.info("Starting simple async"); Flowable.create((FlowableEmitter s) -> { try{ logger.info("Executing async flowable."); Thread.sleep(3000); logger.info("Finished async flowable."); }catch(Exception e) { } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe(); logger.info("Finished simple async");} |
Output
1 2 3 4 | 2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:47 - Starting simple async2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:59 - Finished simple async2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:51 - Executing async flowable.2017-07-16 10:35:06 INFO RxJavaExampleAdvanced:53 - Finished async flowable. |
3. Asynchronous Web Service Call
A common usage of RxJava is to make long running calls or calls with unpredictable finish times asynchronously. This allows the code to handle other tasks while it is waiting for the long running call to finish. You might see a client user interface make a web service call for data asynchronously so it can finish displaying the components to the user that are not dependent on the data in the service call. In the next example, we will explore using RxJava to make asynchronous calls to a web service. We make one simple call to a web service or API that returns a String result. You’ll notice that the API is executed on a new thread “RxNewThreadScheduler-1” not the main thread. Again we use theFlowableEmitter interface with lambda notation to execute theserialize method which makes a rest call to our API on a new thread.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | publicstaticvoidsimpleAsyncAPICalls() { logger.info("Starting async api"); logger.info("Main Thread: {}", Thread.currentThread().getName()); Flowable.create((FlowableEmitter s) -> { try{ logger.info("Emitted thread: {}", Thread.currentThread().getName()); logger.info("Result: {}", result); s.onNext(result); }catch(Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe(logger::info); logger.info("Ending async api");}privatestaticString makeCallString(String URI) { RestTemplate restTemplate =newRestTemplate(); String result = restTemplate.getForObject(URI, String.class); returnresult;} |
Output
1 2 3 4 5 6 | 2017-07-29 10:49:25 INFO RxJavaExampleAdvanced:63 - Starting async api2017-07-29 10:49:25 INFO RxJavaExampleAdvanced:64 - Main Thread: main2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:77 - Ending async api2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:68 - Emitted thread: RxNewThreadScheduler-12017-07-29 10:49:26 INFO RxJavaExampleAdvanced:69 - Result: Hello Stream!2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:? - Hello Stream! |
4. Multiple Asynchronous Web Service Calls
Many times you’ll need to make multiple calls to a web service. In this next example we’ll leverage the map function of RxJava to execute and return the response from multiple API calls. As a reminder, the map function returns the data type of theFlowable whileflatMap will return theFlowable object. Using theFlowable that is returned from a flatMap call would allow you to take further reactive actions on the response. You’ll see this in later examples but in this case we are just outputing the result and will not need to make use of theFlowable.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | publicstaticvoidmultipleAsyncAPICalls() { logger.info("Starting multi async api"); logger.info("Main Thread: {}", Thread.currentThread().getName()); Flowable.fromArray("http://localhost:8080/jcg/service/stream/no","http://localhost:8080/jcg/service/stream/no", .map(newFunction() { intresultCount =0; @Override publicString apply(String t)throwsException { String result = makeCallString(t); logger.info("Emitted thread: {}", Thread.currentThread().getName()); logger.info("Result {}: {}", resultCount++, result); returnresult +" on "+ Thread.currentThread().getName(); } }).subscribeOn(Schedulers.newThread()).subscribe(logger::info); logger.info("Ending multi async api");} |
Output
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:123 - Starting multi async api2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:124 - Main Thread: main2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:137 - Ending multi async api2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 0: Hello Stream!2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 1: Hello Stream!2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 2: Hello Stream!2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-12017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 3: Hello Stream!2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1 |
While this code will operate the emitted calls on another thread it does not create a new thread for each item. If we want to have our process execute on a new thread for each item we will need to do something slightly different. As a developer leveraging these reactive mechanisms you need to think carefully about which calls you want to make synchronously versus those that you want to make asynchronously. If you need to use the results of earlier calls in later calls, you will want each to execute synchronously. If you just need all the data returned and you are not concerned with the order in which it is returned, you might have a good candidate to make each call asynchronously which will improve the overall performance.
We will start with a simple asynchronous example where multiple items are emitted and build on that to use the same technique for multiple API calls. There’s a lot going on in this small sample. We are using theflatMap method to operate on an array of items and transform the array to another Observable for each item. We’ll need to do that to ensure that we cansubscribeOn each item which will execute those corresponding operations on a different thread. This is how we ensure that each emitted item is handled asychronously rather than the group sequentially executed on a separate thread. Notice that we have a different thread for both of the items in this example;"Thread[RxNewThreadScheduler-1,5,main]","Thread[RxNewThreadScheduler-2,5,main]".
01 02 03 04 05 06 07 08 09 10 | publicstaticvoidsimpleAsyncMulti() { logger.info("Starting multi async"); Observable.just(1,2) .flatMap(item -> Observable.just(item.toString()).subscribeOn(Schedulers.newThread()) .doOnNext(i -> logger.info(Thread.currentThread().toString()))) .subscribe(logger::info); logger.info("Ending multi async");} |
Output
1 2 3 4 5 6 | 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:63 - Starting multi async2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:70 - Ending multi async2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-1,5,main]2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-2,5,main]2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:? - 12017-07-16 10:36:49 INFO RxJavaExampleAdvanced:? - 2 |
Now lets take a look at an asynchronous example where we spawn a new thread for each API call. Again we map each emitted value to a newObservable with a single item andsubscribeOn to a new thread. Again, you can see in this sample that the thread id that is executing each request is different, i.e.Thread:Thread[RxNewThreadScheduler-4,5,main].
1 2 3 4 5 6 7 | Observable .flatMap(item -> Observable.just(item).subscribeOn(Schedulers.newThread()).doOnNext(i -> { logger.info(makeCallString(i)); logger.info(Thread.currentThread().toString()); })).subscribe(System.out::println); |
Output
01 02 03 04 05 06 07 08 09 10 11 12 | 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream!2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-4,5,main]http://localhost:8080/jcg/service/stream/no2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream!2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-3,5,main]http://localhost:8080/jcg/service/stream/no2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream!2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream!2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-2,5,main]http://localhost:8080/jcg/service/stream/no2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-1,5,main]http://localhost:8080/jcg/service/stream/no |
5. Combine Asynchronous Results
To build on this technique, we will make multiple API calls and zip or combine the results. We are making multiple API calls asychronously, each on its own thread, and using the zip method to combine the results.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | publicstaticvoidflatMapZipAsyncAPICalls() { Flowable result = Flowable.create((FlowableEmitter s) -> { try{ logger.info("Emitted thread: {}", Thread.currentThread().getName()); logger.info("Result: {}", r); s.onNext(r); }catch(Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()); Flowable result2 = Flowable.create((FlowableEmitter s) -> { try{ logger.info("Emitted thread: {}", Thread.currentThread().getName()); logger.info("Result: {}", r); s.onNext(r); }catch(Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()); Flowable.zip(result, result2, (s, s2) -> s + s2).subscribe(System.out::println);} |
Output
1 2 3 4 5 | 2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:120 - Emitted thread: RxNewThreadScheduler-12017-08-14 17:59:43 INFO RxJavaExampleAdvanced:121 - Result: ["1","2","3"]2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:131 - Emitted thread: RxNewThreadScheduler-22017-08-14 17:59:43 INFO RxJavaExampleAdvanced:132 - Result: ["test1","test2","test3"]["1","2","3"]["test1","test2","test3"] |
6. Streaming Results
Finally, let’s examine streaming the results of asynchronous API calls where the results are emitted as they are available. This example just builds on the concepts introduced previously with the key addition of theObservableHttp calls that leverage aCloseableHttpAsyncClient.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | publicstaticvoidstreamObserable()throwsURISyntaxException, IOException, InterruptedException { logger.info("Executing Streaming Observable Over Http"); CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start(); ObservableHttp httpclient) .toObservable().flatMap(newFunc1<ObservableHttpResponse, rx.Observable>() { @Override publicrx.Observable call(ObservableHttpResponse response) { returnresponse.getContent().map(newFunc1() { @Override publicString call(byte[] bb) { logger.info("timestamp inner " + SimpleDateFormat.getDateTimeInstance().format(newDate().getTime())); logger.info("counter: "+ RxJavaExample3.counter++); returnnewString(bb); } }); } }).buffer(5, TimeUnit.SECONDS,5, rx.schedulers.Schedulers.io()) .subscribeOn(rx.schedulers.Schedulers.io()).subscribe(newAction1<List>() { @Override publicvoidcall(List resp) { logger.info("timestamp "+ SimpleDateFormat.getDateTimeInstance().format(newDate().getTime())); logger.info(resp.toString()); } });} |
Output
01 02 03 04 05 06 07 08 09 10 11 12 13 | 2017-08-14 18:06:20 INFO RxJavaExampleAdvanced:143 - Executing Streaming Observable Over Http2017-08-14 18:06:23 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:23 PM2017-08-14 18:06:23 INFO RxJavaExampleAdvanced:159 - counter: 02017-08-14 18:06:25 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:25 PM2017-08-14 18:06:25 INFO RxJavaExampleAdvanced:159 - counter: 12017-08-14 18:06:26 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:26 PM2017-08-14 18:06:26 INFO RxJavaExampleAdvanced:171 - [data:Message 2, data:Message 1]2017-08-14 18:06:27 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:27 PM2017-08-14 18:06:27 INFO RxJavaExampleAdvanced:159 - counter: 22017-08-14 18:06:31 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:31 PM2017-08-14 18:06:31 INFO RxJavaExampleAdvanced:171 - [data:Message 0]2017-08-14 18:06:36 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:36 PM2017-08-14 18:06:36 INFO RxJavaExampleAdvanced:171 - [] |
Here is the Spring Boot streaming resource that ties it together.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | @Controller@RequestMapping("/stream")publicclassStreamController { privateSseEmitter sseEmitter;... @RequestMapping("/event2") publicSseEmitter getRealTimeMessageAction2(HttpServletRequest request) { SseEmitter sseEmitter =newSseEmitter(); runAsync(sseEmitter); returnsseEmitter; } privatevoidrunAsync(SseEmitter sseEmitter) { for(inti =0; i <3; i++) { AsyncThread at =newAsyncThread(); at.setEmitter(sseEmitter); at.setSleep((6- (i *2)) *1000); at.setMessageId(i); at.start(); } } privateclassAsyncThreadextendsThread { privateSseEmitter sseEmitter; privateintsleep; privateintid; publicvoidsetEmitter(SseEmitter sseEmitter) { this.sseEmitter = sseEmitter; } publicvoidsetSleep(intsleep) { this.sleep = sleep; } publicvoidsetMessageId(intid) { this.id = id; } publicvoidrun() { try{ try{ Thread.sleep(this.sleep); logger.info("Timestamp:"+ SimpleDateFormat.getDateTimeInstance().format(newDate().getTime())); this.sseEmitter.send("Message "+this.id); }catch(InterruptedException e) { logger.error(e.getMessage()); } }catch(IOException e) { logger.error(e.getMessage()); } } } |
7. Summary
In this example, we looked at making asynchronous calls using RxJava, including calls to RESTful web services. We examined how certain usages make all asynchronous calls on a single thread and how to use Observables to make each call on a separate thread. We also looked at combining the results of multiple calls and finally streaming results of service calls.
8. Download the Source Code
Here we demonstrated how to use the basic RxJava operations.

Thank you!
We will contact you soon.



