





















































In this article written by Nickolay Tsvetinov, author of the book Learning Reactive Programming with Java 8, this article will present RxJava (https://github.com/ReactiveX/RxJava), an open source Java implementation of the reactive programming paradigm. Writing code using RxJava requires a different kind of thinking, but it will give you the power to create complex logic using simple pieces of well-structured code.
In this article, we will cover:
(For more resources related to this topic, see here.)
Reactive programming is a paradigm that revolves around the propagation of change. In other words, if a program propagates all the changes that modify its data to all the interested parties (users, other programs, components, and subparts), then this program can be called reactive.
A simple example of this is Microsoft Excel. If you set a number in cell A1 and another number in cell 'B1', and set cell 'C1' to SUM(A1, B1); whenever 'A1' or 'B1' changes, 'C1' will be updated to be their sum.
Let's call this the reactive sum.
What is the difference between assigning a simple variable c to be equal to the sum of the a and b variables and the reactive sum approach?
In a normal Java program, when we change 'a' or 'b', we will have to update 'c' ourselves. In other words, the change in the flow of the data represented by 'a' and 'b', is not propagated to 'c'. Here is this illustrated through source code:
int a = 4; int b = 5; int c = a + b; System.out.println(c); // 9 a = 6; System.out.println(c); // 9 again, but if 'c' was tracking the changes of 'a' and 'b', // it would've been 6 + 5 = 11
This is a very simple explanation of what "being reactive" means. Of course, there are various implementations of this idea and there are various problems that these implementations must solve.
The easiest way for us to answer this question is to think about the requirements we have while building applications these days.
While 10-15 years ago it was normal for websites to go through maintenance or to have a slow response time, today everything should be online 24/7 and should respond with lightning speed; if it's slow or down, users would prefer an alternative service. Today slow means unusable or broken. We are working with greater volumes of data that we need to serve and process fast.
HTTP failures weren't something rare in the recent past, but now, we have to be fault-tolerant and give our users readable and reasonable message updates.
In the past, we wrote simple desktop applications, but today we write web applications, which should be fast and responsive. In most cases, these applications communicate with a large number of remote services.
These are the new requirements we have to fulfill if we want our software to be competitive. So in other words we have to be:
Let's think about how to accomplish this:
If the application is event-driven, it can be decoupled into multiple self-contained components. This helps us become more scalable, because we can always add new components or remove old ones without stopping or breaking the system. If errors and failures are passed to the right component, which can handle them as notifications, the application can become more fault-tolerant or resilient. So if we build our system to be event-driven, we can more easily achieve scalability and failure tolerance, and a scalable, decoupled, and error-proof application is fast and responsive to users.
The Reactive Manifesto (http://www.reactivemanifesto.org/) is a document defining the four reactive principles that we mentioned previously. Each reactive system should be message-driven (event-driven). That way, it can become loosely coupled and therefore scalable and resilient (fault-tolerant), which means it is reliable and responsive (see the preceding diagram).
Note that the Reactive Manifesto describes a reactive system and is not the same as our definition of reactive programming. You can build a message-driven, resilient, scalable, and responsive application without using a reactive library or language.
Changes in the application data can be modeled with notifications, which can be propagated to the right handlers. So, writing applications using reactive programming is the easiest way to comply with the Manifesto.
To write reactive programs, we need a library or a specific programming language, because building something like that ourselves is quite a difficult task. Java is not really a reactive programming language (it provides some tools like the java.util.Observable class, but they are quite limited). It is a statically typed, object-oriented language, and we write a lot of boilerplate code to accomplish simple things (POJOs, for example). But there are reactive libraries in Java that we can use. In this article, we will be using RxJava (developed by people in the Java open source community, guided by Netflix).
You can download and build RxJava from Github (https://github.com/ReactiveX/RxJava). It requires zero dependencies and supports Java 8 lambdas. The documentation provided by its Javadoc and the GitHub wiki pages is well structured and some of the best out there. Here is how to check out the project and run the build:
$ git clone [email protected]:ReactiveX/RxJava.git $ cd RxJava/ $ ./gradlew build
Of course, you can also download the prebuilt JAR. For this article, we'll be using version 1.0.8.
If you use Maven, you can add RxJava as a dependency to your pom.xml file:
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.0.8</version> </dependency>
Alternatively, for Apache Ivy, put this snippet in your Ivy file's dependencies:
<dependency org="io.reactivex" name="rxjava" rev="1.0.8" />
If you use Gradle instead, update your build.gradle file's dependencies as follows:
dependencies { ... compile 'io.reactivex:rxjava:1.0.8' ... }
Now, let's take a peek at what RxJava is all about. We are going to begin with something well known, and gradually get into the library's secrets.
As a Java programmer, it is highly possible that you've heard or used the Iterator pattern. The idea is simple: an Iterator instance is used to traverse through a container (collection/data source/generator), pulling the container's elements one by one when they are required, until it reaches the container's end. Here is a little example of how it is used in Java:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1) Iterator<String> iterator = list.iterator(); // (2) while(iterator.hasNext()) { // 3 // Prints elements (4) System.out.println(iterator.next()); }
Every java.util.Collection object is an Iterable instance which means that it has the method iterator(). This method creates an Iterator instance, which has as its source the collection. Let's look at what the preceding code does:
In this example, our program consumes the items from the List instance using the Iterator instance. It pulls the data (here, represented by strings) and the current thread blocks until the requested data is ready and received. So, for example, if the Iterator instance was firing a request to a web server on every next() method call, the main thread of our program would be blocked while waiting for each of the responses to arrive.
RxJava's building blocks are the observables. The Observable class (note that this is not the java.util.Observable class that comes with the JDK) is the mathematical dual of the Iterator class, which basically means that they are like the two sides of the same coin. It has an underlying collection or computation that produces values that can be consumed by a consumer. But the difference is that the consumer doesn't "pull" these values from the producer like in the Iterator pattern. It is exactly the opposite; the producer 'pushes' the values as notifications to the consumer.
Here is an example of the same program but written using an Observable instance:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1) Observable<String> observable = Observable.from(list); // (2) observable.subscribe(new Action1<String>() { // (3) @Override public void call(String element) { System.out.println(element); // Prints the element (4) } });
Here is what is happening in the code:
Instances of the RxJava Observable class behave somewhat like asynchronous iterators, which notify that there is a next value their subscribers/consumers by themselves. In fact, the Observable class adds to the classic Observer pattern (implemented in Java—see java.util.Observable, see Design Patterns: Elements of Reusable Object-Oriented Software by the Gang Of Four) two things available in the Iterable type.
These listeners can be attached using the subscribe(Action1<? super T>, Action1 <Throwable>, Action0) method. Let's expand the Observable instance example by adding error and completed listeners:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); Observable<String> observable = Observable.from(list); observable.subscribe(new Action1<String>() { @Override public void call(String element) { System.out.println(element); } }, new Action1<Throwable>() { @Override public void call(Throwable t) { System.err.println(t); // (1) } }, new Action0() { @Override public void call() { System.out.println("We've finnished!"); // (2) } });
The new things here are:
We saw how we can use the Observable instances and that they are not so different from something familiar to us—the Iterator instance. These Observable instances can be used for building asynchronous streams and pushing data updates to their subscribers (they can have multiple subscribers).This is an implementation of the reactive programming paradigm. The data is being propagated to all the interested parties—the subscribers.
Coding using such streams is a more functional-like implementation of Reactive Programming. Of course, there are formal definitions and complex terms for it, but this is the simplest explanation.
Subscribing to events should be familiar; for example, clicking on a button in a GUI application fires an event which is propagated to the subscribers—handlers. But, using RxJava, we can create data streams from anything—file input, sockets, responses, variables, caches, user inputs, and so on. On top of that, consumers can be notified that the stream is closed, or that there has been an error. So, by using these streams, our applications can react to failure.
To summarize, a stream is a sequence of ongoing messages/events, ordered as they are processed in real time. It can be looked at as a value that is changing through time, and these changes can be observed by subscribers (consumers), dependent on it. So, going back to the example from Excel, we have effectively replaced the traditional variables with "reactive variables" or RxJava's Observable instances.
Now that we are familiar with the Observable class and the idea of how to use it to code in a reactive way, we are ready to implement the reactive sum, mentioned at the beginning of this article.
Let's look at the requirements our program must fulfill:
The first piece of code represents the main body of the program:
ConnectableObservable<String> input = from(System.in); // (1) Observable<Double> a = varStream("a", input); (2) Observable<Double> b = varStream("b", input); ReactiveSum sum = new ReactiveSum(a, b); (3) input.connect(); (4)
There are a lot of new things happening here:
This code is responsible for building dependencies in the program and starting it off. The a and b values are dependent on the user input and their sum is dependent on them.
Now let's look at the implementation of the from(InputStream) method, which creates an Observable instance with the java.io.InputStream source:
static ConnectableObservable<String> from(final InputStream stream) { return from(new BufferedReader(new InputStreamReader(stream))); // (1) } static ConnectableObservable<String> from(final BufferedReader reader) { return Observable.create(new OnSubscribe<String>() { // (2) @Override public void call(Subscriber<? super String> subscriber) { if (subscriber.isUnsubscribed()) { // (3) return; } try { String line; while(!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) { // (4) if (line == null || line.equals("exit")) { // (5) break; } subscriber.onNext(line); // (6) } } catch (IOException e) { // (7) subscriber.onError(e); } if (!subscriber.isUnsubscribed()) // (8) subscriber.onCompleted(); } } }).publish(); // (9) }
This is one complex piece of code, so let's look at it step-by-step:
This illustrates a simplified way to turn Java's IO streams into Observable instances. Of course, with this main loop, the main thread of the program will block waiting for user input. This can be prevented using the right Scheduler instances to move the logic to another thread.
Now, every line the user types into the terminal is propagated as a notification by the ConnectableObservable instance created by this method. The time has come to look at how we connect our value Observable instances, representing the collectors of the sum, to this input Observable instance. Here is the implementation of the varStream(String, Observable) method, which takes a name of a value and source Observable instance and returns an Observable instance representing this value:
public static Observable<Double> varStream(final String varName, Observable<String> input) { final Pattern pattern = Pattern.compile("\^s*" + varName + "\s*[:|=]\s*(-?\d+\.?\d*)$"); // (1) return input .map(new Func1<String, Matcher>() { public Matcher call(String str) { return pattern.matcher(str); // (2) } }) .filter(new Func1<Matcher, Boolean>() { public Boolean call(Matcher matcher) { return matcher.matches() && matcher.group(1) != null; // (3) } }) .map(new Func1<Matcher, Double>() { public Double call(Matcher matcher) { return Double.parseDouble(matcher.group(1)); // (4) } }); }
The map() and filter() methods called on the Observable instance here are part of the fluent API provided by RxJava. They can be called on an Observable instance, creating a new Observable instance that depends on these methods and that transforms or filters the incoming data. Using these methods the right way, you can express complex logic in a series of steps leading to your objective:
This is how the values a and b are represented by streams of double values, changing in time. Now we can implement their sum. We implemented it as a class that implements the Observer interface, because I wanted to show you another way of subscribing to Observable instances—using the Observer interface. Here is the code:
public static final class ReactiveSum implements Observer<Double> { // (1) private double sum; public ReactiveSum(Observable<Double> a, Observable<Double> b) { this.sum = 0; Observable.combineLatest(a, b, new Func2<Double, Double, Double>() { // (5) public Double call(Double a, Double b) { return a + b; } }).subscribe(this); // (6) } public void onCompleted() { System.out.println("Exiting last sum was : " + this.sum); // (4) } public void onError(Throwable e) { System.err.println("Got an error!"); // (3) e.printStackTrace(); } public void onNext(Double sum) { this.sum = sum; System.out.println("update : a + b = " + sum); // (2) } }
This is the implementation of the actual sum, dependent on the two Observable instances representing its collectors:
Here is sample of what the output of this example would look like:
Reacitve Sum. Type 'a: <number>' and 'b: <number>' to try it. a:4 b:5 update : a + b = 9.0 a:6 update : a + b = 11.0
So this is it! We have implemented our reactive sum using streams of data.
In this article, we went through the reactive principles and the reasons we should learn and use them. It is not so hard to build a reactive application; it just requires structuring the program in little declarative steps. With RxJava, this can be accomplished by building multiple asynchronous streams connected the right way, transforming the data all the way through its consumer.
The two examples presented in this article may look a bit complex and confusing at first glance, but in reality, they are pretty simple.
If you want to read more about reactive programming, take a look at Reactive Programming in the Netflix API with RxJava, a fine article on the topic, available at http://techblog.netflix.com/2013/02/rxjava-netflix-api.html. Another fine post introducing the concept can be found here: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754.
And these are slides about reactive programming and RX by Ben Christensen, one of the creators of RxJava: https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014.
Further resources on this subject: