Core Java

Reactive Java (RxJava) Tutorial: Introduction

Photo of Andy BeckAndy BeckApril 4th, 2017Last Updated: November 21st, 2020
0 311 5 minutes read

Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. The key addition that RxJava provides in addition to the observer pattern is the ability to determine when event processing is complete or an error has occurred.

You can also check this tutorial in the following video:

Java Reactive Programming Tutorial – video

The primary components of reactive java areObservable /Flowable,Subscriber and operators. The idea is that an observable collects and emits actions to a subscriber who will perform an operation on the emitted items. Operators provide a way to manipulate the data that is emitted by an observable before it is sent to the subscriber for action. I will use the terms flowable and observable interchangeably in this example as they operate in a similar manner. The main difference is that a flowable will define a back pressure where an observable will not. The back pressure setting will define how downstream consumers handle emitted data.

1. Setup

We used Eclipse Neon, Java 8, Maven 3.3.9, Apache CLI 1.4 and RxJava 2.0.0.  At the time of this sample, I ran into issues using the last version, 2.0.7, with Eclipse as the source was not available.  This example uses a simple Java application to demonstrate Reactive functionality.  I used a command line switch with Apache’s cli tool to move back and forth between examples.

pom.xml

01
02
03
04
05
06
07
08
09
10
<dependency>
  <groupId>io.reactivex.rxjava2</groupId>
  <artifactId>rxjava</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>commons-cli</groupId>
  <artifactId>commons-cli</artifactId>
  <version>1.4</version>
</dependency>

2. Flowables and Subscribers

We will start with a simple example of gather, emit and act on a single item. This will consist of aFlowable,Consumer,Subscriber and the subscribe method. AFlowable is just like anObservable but it supports back pressure which is used to define how a consumer handles emitted items.

2.1 Basics

We will look at a few examples that emit and consume items with aFlowable andConsumer using the subscribe method. These examples require at least a high level understanding of lambdas and Java 8. As a quick primer for what lies ahead the lambda expressions used will provide a simplified notation for anonymous inner classes or passing functions as parameters to methods. In our first example we will compare the three different notations you will see in this article.

Anonymous inner implementation in lambda

1
2
3
4
5
6
Flowable.just("Hello world").subscribe(newConsumer() {
        @Override
        publicvoidaccept(String t)throwsException {
                System.out.println(t);
        }
});

Lambda

1
Flowable.just("Hello world").subscribe(s -> System.out.println(t));

Method Reference

1
Flowable.just("Hello world").subscribe(System.out::println);

2.2 Subscriber Details

Now lets explore the subscriber a little more. The subscribe method on aFlowable provides the option to implementonNext,onError andonCompleted. In the first example we saw the subscriber implement theonNext method from theConsumer interface but now lets look at one that implements all three.

1
2
3
4
5
Flowable.fromArray(1,2,3,4).subscribe(
        i -> System.out.printf("Entry %d\n", i),
        e -> System.err.printf("Failed to process: %s\n", e),
        () -> System.out.println("Done")
);

Some of the key interfaces to understand when using aFlowable areConsumer andSubscriber. When you subscribe to a flowable you can either pass aSubscriber or pass the individualConsumer implementations that represent onNext, onError and onComplete. These method implementations are optional and provide convenience for working with observables.

Consumer

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
//Consumer { void accept(T t); }
 
Flowable.fromArray(1,2,3,4).subscribe(newConsumer<Integer>() {
 @Override
 publicvoidaccept(Integer t)throwsException {
   System.out.printf("Entry %d\n", t);
 }
 },newConsumer<Throwable>() {
 @Override
 publicvoidaccept(Throwable t)throwsException {
   System.err.printf("Failed to process: %s\n", t);
 }
 },newAction() {
 @Override
 publicvoidrun()throwsException {
   System.out.println("Done");
 }
});

Subscriber

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//Subscriber { void onNext(T t); void onError(Throwable t); void onComplete(); }
 
Subscriber<Integer> subscriber =newSubscriber<Integer>(){
 
 @Override
 publicvoidonSubscribe(Subscription s) {
  
 }
 
 @Override
 publicvoidonNext(Integer t) {
 System.out.printf("Entry %d\n", t);
 }
 
 @Override
 publicvoidonError(Throwable t) {
 System.err.printf("Failed to process: %s\n", t);
 }
 
 @Override
 publicvoidonComplete() {
 System.out.println("Done");
 }
  
};
Flowable.fromArray(1,2,3,4).subscribe(subscriber);

2.3 Flowables

To create your own flowable you implement theFlowableOnSubscribe and provide the back pressure strategy. The back pressure strategy indicates how you intend to handle emitted items by either waiting, skipping, erring or holding the items for the consumer. In this implementation will use the onNext method to send a few integers and buffer the items until the downstream consumer is ready.

1
2
3
4
5
6
Flowable flowable = Flowable.create((FlowableEmitter emitter) -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println);

3. Transformations

There are many different transformation mechanisms and you can see a list here. Two that we will examine in this section are the map and flatMap. The map method is used for taking emitted items and modifying the data. With these methods you can change the data or even the type. The flatMap method is used for performing operations on emitted items and returning a new flowable/observable with new emitted items that can be processed by the subscriber. This means that a map is meant for processing a single emitted item while flatMap can process one or more emitted items as an flowable is designed to handle.  Let’s look at a couple of examples.

3.1 Map

As you can see in this example, the Flowable emits a String that it transforms to an int that it will send to the Subscriber.

1
Flowable.just("1").map(s -> Integer.parseInt(s)).subscribe(System.out::println);

3.2 FlatMap (ConcatMap which orders)

In this example we are taking the emitted items and applying the flatMap method to it which in turn responds with a new flowable of type Integer. This is the critical difference between map and flatMap that it returns the emitted results while flatMap returns a new flowable of the emitted type. This is powerful when you have complex transformations that need to process multiple emitted items the way a flowable will process them.

FlatMap Verbose

1
2
3
4
5
Observable.fromArray(1,2,3,4).flatMap(newFunction<Integer, ObservableSource>() {
        @Override
        publicObservableSource apply(Integer t)throwsException {
            returnObservable.just(t+50);
        }}).subscribe(System.out::println);   

FlatMap Change the Type

1
Observable.fromArray(1,2,3,4).flatMap(t -> Observable.just(Integer.toString(t+50))).subscribe(s -> System.out.println(s));

FlatMap Flowable

1
Flowable.fromArray(1,2,3,4).flatMap(t -> Flowable.just(t+50)).subscribe(System.out::println);

4. Schedulers

Schedulers provide asynchronous operations for the observables and define which thread they use. We will examine this topic in the next example when we look more closely at asynchronous calls but the next code snippet contains a simple example. In the example below the output written will be sent on a separate thread because of thesubscribeOn method. By passing theScheduler.io() parameter to this method a new thread will spin up to execute the write to output in aThreadPool.

1
Flowable.just("Hello world").subscribeOn(Schedulers.io()).subscribe(System.out::println);

5. Summary

In this example we explored the basics of processing data and events with RxJava.  We saw how to build a flowable, apply transformations to the items emitted by the flowable and how to subscribe to flowables.  Two areas conducive to working with RxJava are in cases were your UI is processing events or if you need to process asynchronous calls to services.

In the next example, we will take a deeper dive into asynchronous requests and the benefits of leveraging RxJava. You can take a lookhere.

4. Download the Source Code

Here we demonstrated how to use the basic RxJava operations.

Download
You can download the Eclipse project here: Reactive Java (RxJava) Tutorial: Introduction
Do you want to know how to develop your skillset to become aJava Rockstar?
Subscribe to our newsletter to start Rockingright now!
To get you started we give you our best selling eBooks forFREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to theTerms andPrivacy Policy

Thank you!

We will contact you soon.

Photo of Andy BeckAndy BeckApril 4th, 2017Last Updated: November 21st, 2020
0 311 5 minutes read
Photo of Andy Beck

Andy Beck

Andy is a Senior Software Engineer that has sixteen years of experience working on web, desktop and mobile applications. He holds a Bachelor and Master's degree in Computer Science from the University of Virginia and George Washington University respectively. He specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a financial technology organization where he supports mobile application services in Java.

Related Articles

Bipartite Graph

Java not equal Example

January 17th, 2020
Bipartite Graph

Java API Tutorial

October 26th, 2020
Bipartite Graph

Java Struct Example

January 8th, 2020
Bipartite Graph

Java Node Example

November 20th, 2019
Bipartite Graph

Java Swing MVC Example

January 26th, 2016
Bipartite Graph

How to call a method in Java

December 26th, 2019
Subscribe
Notify of
guest
I agree to theTerms andPrivacy Policy
The comment form collects your name, email and content to allow us keep track of the comments placed on the website. Please read and accept our website Terms and Privacy Policy to post a comment.

I agree to theTerms andPrivacy Policy
The comment form collects your name, email and content to allow us keep track of the comments placed on the website. Please read and accept our website Terms and Privacy Policy to post a comment.