Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Understanding the Basics of RxJava

Save for later
  • 15 min read
  • 20 Jun 2017

article-image

In this article, by Tadas Subonis author of the book Reactive Android Programming, will go through the core basics of RxJava so that we can fully understand what it is, what are the core elements, and how they work.

Before that, let's take a step back and briefly discuss how RxJava is different from other approaches. RxJava is about reacting to results. It might be an item that originated from some source. It can also be an error. RxJava provides a framework to handle these items in a reactive way and to create complicated manipulation and handling schemes in a very easy-to-use interface. Things like waiting for an arrival of an item before transforming it become very easy with RxJava.

To achieve all this, RxJava provides some basic primitives:

  • Observables: A source of data
  • Subscriptions: An activated handle to the Observable that receives data
  • Schedulers: A means to define where (on which Thread) the data is processed

First of all, we will cover Observables--the source of all the data and the core structure/class that we will be working with. We will explore how are they related to Disposables (Subscriptions).

Furthermore, the life cycle and hook points of an Observable will be described, so we will actually know what's happening when an item travels through an Observable and what are the different stages that we can tap into.

Finally, we will briefly introduce Flowable--a big brother of Observable that lets you handle big amounts of data with high rates of publishing.

To summarize, we will cover these aspects:

  • What is an Observable?
  • What are Disposables (formerly Subscriptions)?
  • How items travel through the Observable?
  • What is backpressure and how we can use it with Flowable?

Let's dive into it!

(For more resources related to this topic, see here.)

Observables

Everything starts with an Observable. It's a source of data that you can observe for emitted data (hence the name). In almost all cases, you will be working with the Observable class. It is possible to (and we will!) combine different Observables into one Observable. Basically, it is a universal interface to tap into data streams in a reactive way.

There are lots of different ways of how one can create Observables. The simplest way is to use the .just() method like we did before:

Observable.just("First item", "Second item");

It is usually a perfect way to glue non-Rx-like parts of the code to Rx compatible flow.

When an Observable is created, it is not usually defined when it will start emitting data. If it was created using simple tools such as.just(), it won't start emitting data until there is a subscription to the observable. How do you create a subscription? It's done by calling .subscribe() :

Observable.just("First item", "Second item")
        .subscribe();

Usually (but not always), the observable be activated the moment somebody subscribes to it. So, if a new Observable was just created, it won't magically start sending data "somewhere".

Hot and Cold Observables

Quite often, in the literature and documentation terms, Hot and Cold Observables can be found.

Cold Observable is the most common Observable type. For example, it can be created with the following code:

Observable.just("First item", "Second item")
        .subscribe();

Cold Observable means that the items won't be emitted by the Observable until there is a Subscriber. This means that before the .subscribe() is called, no items will be produced and thus none of the items that are intended to be omitted will be missed, everything will be processed.

Hot Observable is an Observable that will begin producing (emitting) items internally as soon as it is created. The status updates are produced constantly and it doesn't matter if there is something that is ready to receive them (like Subscription). If there were no subscriptions to the Observable, it means that the updates will be lost.

Disposables

A disposable (previously called Subscription in RxJava 1.0) is a tool that can be used to control the life cycle of an Observable. If the stream of data that the Observable is producing is boundless, it means that it will stay active forever. It might not be a problem for a server-side application, but it can cause some serious trouble on Android. Usually, this is the common source of memory leaks.

Obtaining a reference to a disposable is pretty simple:

Disposable disposable = Observable.just("First item", "Second item")
        .subscribe();

Disposable is a very simple interface. It has only two methods: dispose() and isDisposed()

dispose() can be used to cancel the existing Disposable (Subscription). This will stop the call of .subscribe()to receive any further items from Observable, and the Observable itself will be cleaned up.

isDisposed() has a pretty straightforward function--it checks whether the subscription is still active. However, it is not used very often in regular code as the subscriptions are usually unsubscribed and forgotten.

The disposed subscriptions (Disposables) cannot be re-enabled. They can only be created anew.

Finally, Disposables can be grouped using CompositeDisposable like this:

Disposable disposable = new CompositeDisposable(
        Observable.just("First item", "Second item").subscribe(),
        Observable.just("1", "2").subscribe(),
        Observable.just("One", "Two").subscribe()
);

It's useful in the cases when there are many Observables that should be canceled at the same time, for example, an Activity being destroyed.

Schedulers

As described in the documentation, a scheduler is something that can schedule a unit of work to be executed now or later. In practice, it means that Schedulers control where the code will actually be executed and usually that means selecting some kind of specific thread.

Most often, Subscribers are used to executing long-running tasks on some background thread so that it wouldn't block the main computation or UI thread. This is especially relevant on Android when all long-running tasks must not be executed on MainThread.

Schedulers can be set with a simple .subscribeOn() call:

Observable.just("First item", "Second item")
        .subscribeOn(Schedulers.io())
        .subscribe();

There are only a few main Schedulers that are commonly used:

  • Schedulers.io()
  • Schedulers.computation()
  • Schedulers.newThread()
  • AndroidSchedulers.mainThread()

The AndroidSchedulers.mainThread() is only used on Android systems.

Scheduling examples

Let's explore how schedulers work by checking out a few examples.

Let's run the following code:

Observable.just("First item", "Second item")
        .doOnNext(e -> Log.d("APP", "on-next:" + Thread.currentThread().getName() + ":" + e))
        .subscribe(e -> Log.d("APP", "subscribe:" + Thread.currentThread().getName() + ":" + e));

The output will be as follows:

on-next:main:First item
subscribe:main:First item
on-next:main:Second item
subscribe:main:Second item

Now let's try changing the code to as shown:

Observable.just("First item", "Second item")
        .subscribeOn(Schedulers.io())
        .doOnNext(e -> Log.d("APP", "on-next:" + Thread.currentThread().getName() + ":" + e))
        .subscribe(e -> Log.d("APP", "subscribe:" + Thread.currentThread().getName() + ":" + e));

Now, the output should look like this:

on-next:RxCachedThreadScheduler-1:First item
subscribe:RxCachedThreadScheduler-1:First item
on-next:RxCachedThreadScheduler-1:Second item
subscribe:RxCachedThreadScheduler-1:Second item

We can see how the code was executed on the main thread in the first case and on a new thread in the next.

Android requires that all UI modifications should be done on the main thread. So, how can we execute a long-running process in the background but process the result on the main thread? That can be done with .observeOn() method:

Observable.just("First item", "Second item")
        .subscribeOn(Schedulers.io())
        .doOnNext(e -> Log.d("APP", "on-next:" + Thread.currentThread().getName() + ":" + e))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(e -> Log.d("APP", "subscribe:" + Thread.currentThread().getName() + ":" + e));

The output will be as illustrated:

on-next:RxCachedThreadScheduler-1:First item
on-next:RxCachedThreadScheduler-1:Second item
subscribe:main:First item
subscribe:main:Second item

You will note that the items in the doOnNext block were executed on the "RxThread", and the subscribe block items were executed on the main thread.

Investigating the Flow of Observable

The logging inside the steps of an Observable is a very powerful tool when one wants to understand how they work. If you are in doubt at any point as to what's happening, add logging and experiment. A few quick iterations with logs will definitely help you understand what's going on under the hood.

Let's use this technique to analyze a full flow of an Observable. We will start off with this script:

private void log(String stage, String item) {
    Log.d("APP", stage + ":" + Thread.currentThread().getName() + ":" + item);
}



private void log(String stage) {
    Log.d("APP", stage + ":" + Thread.currentThread().getName());
}
Observable.just("One", "Two")
 .subscribeOn(Schedulers.io())
 .doOnDispose(() -> log("doOnDispose"))
 .doOnComplete(() -> log("doOnComplete"))
 .doOnNext(e -> log("doOnNext", e))
 .doOnEach(e -> log("doOnEach"))
 .doOnSubscribe((e) -> log("doOnSubscribe"))
 .doOnTerminate(() -> log("doOnTerminate"))
 .doFinally(() -> log("doFinally"))
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(e -> log("subscribe", e));

It can be seen that it has lots of additional and unfamiliar steps (more about this later). They represent different stages during the processing of an Observable. So, what's the output of the preceding script?:

doOnSubscribe:main
doOnNext:RxCachedThreadScheduler-1:One
doOnEach:RxCachedThreadScheduler-1
doOnNext:RxCachedThreadScheduler-1:Two
doOnEach:RxCachedThreadScheduler-1
doOnComplete:RxCachedThreadScheduler-1
doOnEach:RxCachedThreadScheduler-1
doOnTerminate:RxCachedThreadScheduler-1
doFinally:RxCachedThreadScheduler-1
subscribe:main:One
subscribe:main:Two
doOnDispose:main

Let's go through some of the steps. First of all, by calling .subscribe() the doOnSubscribe block was executed. This started the emission of items from the Observable as we can see on the doOnNext and doOnEach lines. Finally, the stream finished at termination life cycle was activated--the doOnComplete, doOnTerminate and doOnFinally.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at £15.99/month. Cancel anytime

Also, the reader will note that the doOnDispose block was called on the main thread along with the subscribe block.

The flow will be a little different if .subscribeOn() and .observeOn() calls won't be there:

doOnSubscribe:main
doOnNext:main:One
doOnEach:main
subscribe:main:One
doOnNext:main:Two
doOnEach:main
subscribe:main:Two
doOnComplete:main
doOnEach:main
doOnTerminate:main
doOnDispose:main
doFinally:main

You will readily note that now, the doFinally block was executed after doOnDispose while in the former setup, doOnDispose was the last. This happens due to the way Android Looper schedulers code blocks for execution and the fact that we used two different threads in the first case.

The takeaway here is that whenever you are unsure of what is going on, start logging actions (and the thread they are running on) to see what's actually happening.

Flowable

Flowable can be regarded as a special type of Observable (but internally it isn't). It has almost the same method signature like the Observable as well.

The difference is that Flowable allows you to process items that emitted faster from the source than some of the following steps can handle. It might sound confusing, so let's analyze an example.

Assume that you have a source that can emit a million items per second. However, the next step uses those items to do a network request. We know, for sure, that we cannot do more than 50 requests per second:

understanding-basics-rxjava-img-0

That poses a problem. What will we do after 60 seconds? There will be 60 million items in the queue waiting to be processed. The items are accumulating at a rate of 1 million items per second between the first and the second steps because the second step processes them at a much slower rate.

Clearly, the problem here is that the available memory will be exhausted and the programming will fail with an OutOfMemory (OOM) exception.

For example, this script will cause an excessive memory usage because the processing step just won't be able to keep up with the pace the items are emitted at.

PublishSubject<Integer> observable = PublishSubject.create();


observable
        .observeOn(Schedulers.computation())
        .subscribe(v -> log("s", v.toString()), this::log);



for (int i = 0; i < 1000000; i++) {
    observable.onNext(i);
}
private void log(Throwable throwable) {
    Log.e("APP", "Error", throwable);
}

By converting this to a Flowable, we can start controlling this behavior:

observable.toFlowable(BackpressureStrategy.MISSING)
        .observeOn(Schedulers.computation())
        .subscribe(v -> log("s", v.toString()), this::log);

Since we have chosen not to specify how we want to handle items that cannot be processed (it's called Backpressuring), it will throw a MissingBackpressureException. However, if the number of items was 100 instead of a million, it would have been just fine as it wouldn't hit the internal buffer of Flowable. By default, the size of the Flowable queue (buffer) is 128.

There are a few Backpressure strategies that will define how the excessive amount of items should be handled.

Drop Items

Dropping means that if the downstream processing steps cannot keep up with the pace of the source Observable, just drop the data that cannot be handled. This can only be used in the cases when losing data is okay, and you care more about the values that were emitted in the beginning.

There are a few ways in which items can be dropped.

The first one is just to specify Backpressure strategy, like this:

observable.toFlowable(BackpressureStrategy.DROP)

Alternatively, it will be like this:

observable.toFlowable(BackpressureStrategy.MISSING)
        .onBackpressureDrop()

A similar way to do that would be to call .sample(). It will emit items only periodically, and it will take only the last value that's available (while BackpressureStrategy.DROP drops it instantly unless it is free to push it down the stream). All the other values between "ticks" will be dropped:

observable.toFlowable(BackpressureStrategy.MISSING)
        .sample(10, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.computation())
        .subscribe(v -> log("s", v.toString()), this::log);

Preserve Latest Item

Preserving the last items means that if the downstream cannot cope with the items that are being sent to them, stop emitting values and wait until they become available. While waiting, keep dropping all the values except the last one that arrived and when the downstream becomes available to send the last message that's currently stored.

Like with Dropping, the "Latest" strategy can be specified while creating an Observable:

observable.toFlowable(BackpressureStrategy.LATEST)

Alternatively, by calling .onBackpressure():

observable.toFlowable(BackpressureStrategy.MISSING)
        .onBackpressureLatest()

Finally, a method, .debounce(), can periodically take the last value at specific intervals:

observable.toFlowable(BackpressureStrategy.MISSING)
        .debounce(10, TimeUnit.MILLISECONDS)

Buffering

It's usually a poor way to handle different paces of items being emitted and consumed as it often just delays the problem.

However, this can work just fine if there is just a temporal slowdown in one of the consumers. In this case, the items emitted will be stored until later processing and when the slowdown is over, the consumers will catch up. If the consumers cannot catch up, at some point the buffer will run out and we can see a very similar behavior to the original Observable with memory running out.

Enabling buffers is, again, pretty straightforward by calling the following:

observable.toFlowable(BackpressureStrategy.BUFFER)

or

observable.toFlowable(BackpressureStrategy.MISSING)
        .onBackpressureBuffer()

If there is a need to specify a particular value for the buffer, one can use .buffer():

observable.toFlowable(BackpressureStrategy.MISSING)
        .buffer(10)

Completable, Single, and Maybe Types

Besides the types of Observable and Flowable, there are three more types that RxJava provides:

  • Completable: It represents an action without a result that will be completed in the future
  • Single: It's just like Observable (or Flowable) that returns a single item instead of a stream
  • Maybe: It stands for an action that can complete (or fail) without returning any value (like Completable) but can also return an item like Single

However, all these are used quite rarely. Let's take a quick look at the examples.

Completable

Since Completable can basically process just two types of actions--onComplete and onError--we will cover it very briefly.

Completable has many static factory methods available to create it but, most often, it will just be found as a return value in some other libraries. For example, the Completable can be created by calling the following:

Completable completable = Completable.fromAction(() -> {
    log("Let's do something");
});

Then, it is to be subscribed with the following:

completable.subscribe(() -> {
    log("Finished");
}, throwable -> {
    log(throwable);
});

Single

Single provides a way to represent an Observable that will return just a single item (thus the name). You might ask, why it is worth having it at all? These types are useful to tell the developers about the specific behavior that they should expect.

To create a Single, one can use this example:

Single.just("One item")

The Single and the Subscription to it can be created with the following:

Single.just("One item")
        .subscribe((item) -> {
            log(item);
        }, (throwable) -> {
            log(throwable);
        });

Make a note that this differs from Completable in that the first argument to the .subscribe() action now expects to receive an item as a result.

Maybe

Finally, the Maybe type is very similar to the Single type, but the item might not be returned to the subscriber in the end.

The Maybe type can be created in a very similar fashion as before:

Maybe.empty();

or like

Maybe.just("Item");

However, the .subscribe() can be called with arguments dedicated to handling onSuccess (for received items), onError (to handle errors), and onComplete (to do a final action after the item is handled):

Maybe.just("Item")
        .subscribe(
                s -> log("success: " + s),
                throwable -> log("error"),
                () -> log("onComplete")
        );

Summary

In this article, we covered the most essentials parts of RxJava.

Resources for Article:


Further resources on this subject: