Notes on Reactive Programming Part II: Writing Some Code
In this article we continue the series onReactive Programming, and we concentrate on explaining some concepts through actual code samples. The end result should be that you understand a bit better what makes Reactive different, and what makes it functional. The examples here are quite abstract, but they give you a way to think about the APIs and the programming style, and start to get a feel for how it is different. We will see the elements of Reactive, and learn how to control the flow of data, and process in background threads if necessary.
Setting Up a Project
We will use the Reactor libraries to illustrate the points we need to make. The code could just as easily be written with other tools. If you want to play with the code and see it working without having to copy-paste anything, there are working samples with tests inGithub.
To get started grab a blank project fromhttps://start.spring.io and add the Reactor Core dependency. With Maven
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.0.0.RC2</version></dependency>With Gradle it’s very similar:
compile 'io.projectreactor:reactor-core:3.0.0.RC2'Now let’s write some code.
What Makes it Functional?
The basic building block of Reactive is a sequence of events, and two protagonists, a publisher and a subscriber to those events. It’s also OK to call a sequence a "stream" because that’s what it is. If we need to, we will use the word "stream" with a small "s", but Java 8 has ajava.util.Stream which is different, so try not to get confused. We will try to concentrate the narrative on the publisher and subscriber anyway (that’s what Reactive Streams does).
Reactor is the library we are going to use in samples, so we’ll stick to the notation there, and call the publisher aFlux (it implements the interfacePublisher from Reactive Streams). The RxJava library is very similar and has a lot of parallel features, so in that case we would be talking about anObservable instead, but the code would be very similar. (Reactor 2.0 called it aStream which is confusing if we need to talk about Java 8Streams as well, so we’ll only use the new code in Reactor 3.0.)
Generators
AFlux is a publisher of a sequence of events of a specific POJO type, so it is generic, i.e.Flux<T> is a publisher ofT.Flux has some static convenience methods to create instances of itself from a variety of sources. For example, to create aFlux from an array:
Flux<String> flux = Flux.just("red", "white", "blue");We just generated aFlux, and now we can do stuff with it. There are actually only two things you can do with it: operate on it (transform it, or combine it with other sequences), subscribe to it (it’s a publisher).
Single Valued Sequences
Often you encounter a sequence that you know has only one or zero elements, for example a repository method that finds an entity by its id. Reactor has aMono type representing a single valued or emptyFlux.Mono has a very similar API toFlux but more focused because not all operators make sense for single-valued sequences. RxJava also has a bolt on (in version 1.x) calledSingle, and alsoCompletable for an empty sequence. The empty sequence in Reactor isMono<Void>.
Operators
There are alot of methods on aFlux and nearly all of them are operators. We aren’t going to look at them all here because there are better places to look for that (like the Javadocs). We only need to get a flavour for what an operator is, and what it can do for you.
For instance, to ask for the internal events inside aFlux to be logged to standard out, you can call the.log() method. Or you can transform it using amap():
Flux<String> flux = Flux.just("red", "white", "blue");Flux<String> upper = flux .log() .map(String::toUpperCase);In this code we transformed the strings in the input by converting them to upper case. So far, so trivial.
What’s interesting about this little sample — mind blowing, even, if you’re not used to it — is that no data have been processed yet. Nothing has even been logged because literally, nothing happened (try it and you will see). Calling operators on aFlux amounts to building a plan of execution for later. It is completely declarative, and it’s why people call it "functional". The logic implemented in the operators is only executed when data starts to flow, and that doesn’t happen until someone subscribes to theFlux (or equivalently to thePublisher).
The same declarative, functional approach to processing a sequence of data exists in all Reactive libraries, and also in Java 8Streams. Consider this, similar looking code, using aStream with the same contents as theFlux:
Stream<String> stream = Streams.of("red", "white", "blue");Stream<String> upper = stream.map(value -> { System.out.println(value); return value.toUpperCase();});The observation we made aboutFlux applies here: no data is processed, it’s just a plan of execution. There are, however, some important differences betweenFlux andStream, which makeStream an inappropriate API for Reactive use cases.Flux has a lot more operators, much of which is just convenience, but the real difference comes when you want to consume the data, so that’s what we need to look at next.
Tip
There is a useful blog by Sebastien Deleuze onReactive Types, where he describes the differences between the various streaming and reactive APIs by looking at the types they define, and how you would use them. The differences betweenFlux andStream are highlighted there in more detail.
Subscribers
To make the data flow you have to subscribe to theFlux using one of thesubscribe() methods. Only those methods make the data flow. They reach back through the chain of operators you declared on your sequence (if any) and request the publisher to start creating data. In the sample samples we have been working with, this means the underlying collection of strings is iterated. In more complicated use case it might trigger a file to be read from the filesystem, or a pull from a database or a call to an HTTP service.
Here’s a call tosubscribe() in action:
Flux.just("red", "white", "blue") .log() .map(String::toUpperCase).subscribe();The output is:
09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()So we can see from this that the effect ofsubscribe() without an argument, is to request the publisher to sendall data — there’s only onerequest() logged and it’s "unbounded". We can also see callbacks for each item that is published (onNext()), for the end of the sequence (onComplete()), and for the original subscription (onSubscribe()). If you needed to you could listen for those events yourself using thedoOn*() methods inFlux, which are themselves operators, not subscribers, so they don’t cause any data to flow on their own.
Thesubscribe() method is overloaded, and the other variants give you different options to control what happens. One important and convenient form issubscribe() with callbacks as arguments. The first argument is aConsumer, which gives you a callback with each of the items, and you can also optionally add aConsumer for an error if there is one, and a vanillaRunnable to execute when the sequence is complete. For example, just with the per-item callback:
Flux.just("red", "white", "blue") .log() .map(String::toUpperCase).subscribe(System.out::println);Here’s the output:
09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)RED09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)WHITE09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)BLUE09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()We could control the flow of data, and make it "bounded", in a variety of ways. The raw API for control is theSubscription you get from aSubscriber. The equivalent long form of the short call tosubscribe() above is:
.subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String t) { System.out.println(t); } @Override public void onError(Throwable t) { } @Override public void onComplete() { }});To control the flow, e.g. to consume at most 2 items at a time, you could use theSubscription more intelligently:
.subscribe(new Subscriber<String>() { private long count = 0; private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(2); } @Override public void onNext(String t) { count++; if (count>=2) { count = 0; subscription.request(2); } }...ThisSubscriber is "batching" items 2 at a time. It’s a common use case so you might think about extracting the implementation to a convenience class, and that would make the code more readable too. The output looks like this:
09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()In fact the batching subscriber is such a common use case that there are convenience methods already available inFlux. The batching example above can be implemented like this:
Flux.just("red", "white", "blue") .log() .map(String::toUpperCase).subscribe(null, 2);(note the call tosubscribe() with a request limit). Here’s the output:
10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()Tip
A library that will process sequences for you, like Spring Reactive Web, can handle the subscriptions. It’s good to be able to push these concerns down the stack because it saves you from cluttering your code with non-business logic, making it more readable and easier to test and maintain. So as a rule, it is a good thing if you canavoid subscribing to a sequence, or at least push that code into a processing layer, and out of the business logic.
Threads, Schedulers and Background Processing
An interesting feature of all the logs above is that they are all on the "main" thread, which is the thread of the caller tosubscribe(). This highlights an important point: Reactor is extremely frugal with threads, because that gives you the greatest chance of the best possible performance. That might be a surprising statement if you’ve been wrangling threads and thread pools and asynchronous executions for the last 5 years, trying to squeeze more juice out of your services. But it’s true: in the absence of any imperative to switch threads, even if the JVM is optimized to handle threads very efficiently, it is always faster to do computation on a single thread. Reactor has handed you the keys to control all the asynchronous processing, and it assumes you know what you are doing.
Flux provides a few configurer methods that control the thread boundaries. For example, you can configure the subscriptions to be handled in a background thread usingFlux.subscribeOn():
Flux.just("red", "white", "blue") .log() .map(String::toUpperCase) .subscribeOn(Schedulers.parallel()).subscribe(null, 2);the result can be seen in the output:
13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()Tip
if you write this code yourself, or copy-paste it, remember to wait for the processing to stop before the JVM exits.
Note that the subscription, and all the processing, takes place on a single background thread "parallel-1-1" — this is because we asked for the subscriber to our mainFlux to be in the background. This is fine if the item processing is CPU intensive (but pointless being in a background thread, in point of fact, since you pay for the context switch but don’t get the results any faster). You might also want to be able to perform item processing that is I/O intensive and possibly blocking. In this case, you would want to get it done as quickly as possible without blocking the caller. A thread pool is still your friend, and that’s what you get fromSchedulers.parallel(). To switch the processing of the individual items to separate threads (up to the limit of the pool) we need to break them out into separate publishers, and for each of those publishers ask for the result in a background thread. One way to do this is with an operator calledflatMap(), which maps the items to aPublisher (potentially of a different type), and then back to a sequence of the new type:
Flux.just("red", "white", "blue") .log() .flatMap(value -> Mono.just(value.toUpperCase()) .subscribeOn(Schedulers.parallel()), 2).subscribe(value -> { log.info("Consumed: " + value);})Note here the use offlatMap() to push the items down into a "child" publisher, where we can control the subscription per item instead of for the whole sequence. Reactor has built in default behaviour to hang onto a single thread as long as possible, so we need to be explicit if we want it to process specific items or groups of items in a background thread. Actually, this is one of a handful of recognized tricks for forcing parallel processing (see theReactive Gems issue for more detail).
The output looks like this:
15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITENotice that there are now multiple threads consuming the items, and the concurrency hint in theflatMap() ensures that there are 2 items being processed at any given time, as long as they are available. We seerequest(1) a lot because the system is trying to keep 2 items in the pipeline, and generally they don’t finish processing at the same time. Reactor tries to be very smart here in fact, and it pre-fetches items from the upstreamPublisher to try to eliminate waiting time for the subscriber (we aren’t seeing that here because the numbers are low — we are only processing 3 items).
Tip
Three items ("red", "white", "blue") might be too few to convincingly see more than one background thread, so it might be better to generate more data. You could do that with a random number generator, for instance.
Flux also has apublishOn() method which is the same, but for the listeners (i.e.onNext() or consumer callbacks) instead of for the subscriber itself:
Flux.just("red", "white", "blue") .log() .map(String::toUpperCase) .subscribeOn(Schedulers.newParallel("sub")) .publishOn(Schedulers.newParallel("pub"), 2).subscribe(value -> { log.info("Consumed: " + value);});The output looks like this:
15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUENotice that the consumer callbacks (logging "Consumed: …") are on the publisher threadpub-1-1. If you take out thesubscribeOn() call, you might see all of the 2nd chunk of data processed on thepub-1-1 thread as well. This, again, is Reactor being frugal with threads — if there’s no explicit request to switch threads it stays on the same one for the next call, whatever that is.
Note
We changed the code in this sample fromsubscribe(null, 2) to adding aprefetch=2 to thepublishOn(). In this case the fetch size hint insubscribe() would have been ignored.
Extractors: The Subscribers from the Dark Side
There is another way to subscribe to a sequence, which is to callMono.block() orMono.toFuture() orFlux.toStream() (these are the "extractor" methods — they get you out of the Reactive types into a less flexible, blocking abstraction).Flux also has converterscollectList() andcollectMap() that convert fromFlux toMono. They don’t actually subscribe to the sequence, but they do throw away any control you might have had over the suscription at the level of the individual items.
Warning
A good rule of thumb is "never call an extractor". There are some exceptions (otherwise the methods would not exist). One notable exception is in tests because it’s useful to be able to block to allow results to accumulate.
These methods are there as an escape hatch to bridge from Reactive to blocking; if you need to adapt to a legacy API, for instance Spring MVC. When you callMono.block() you throw away all the benefits of the Reactive Streams. This is the key difference between Reactive Streams and Java 8Streams — the native JavaStream only has the "all or nothing" subscription model, the equivalent ofMono.block(). Of coursesubscribe() can block the calling thread as well, so it’s just as dangerous as the converter methods, but you have more control — you can prevent it from blocking by usingsubscribeOn() and you can drip the items through by applying back pressure and periodically deciding whether to continue.
Conclusion
In this article we have covered the basics of the Reactive Streams and Reactor APIs. If you need to know more there are plenty of places to look, but there’s no substitute for hands on coding, so use the code inGitHub (for this article in tests in the project called "flux"), or head over to theLite RX Hands On workshop. So far, really this is just overhead, and we haven’t learned much that we couldn’t have done in a more obvious way using non-Reactive tools. Thenext article in the series will dig a little deeper into the blocking, dispatching and asynchronous sides of the Reactive model, and show you what opportunities there are to reap the real benefits.
Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreGet support
Tanzu Spring offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.
Learn moreUpcoming events
Check out all the upcoming events in the Spring community.
View all