Skip to content
Roger Hu edited this page Dec 18, 2015 · 91 revisions

Overview

One of the challenges in writing robust Android apps is the dynamic nature of changing inputs. In traditional imperative programming models, values have to be explicitly set on variables for them to be updated. If one dependent value changes, the value will not be updated without adding another line of code. Consider the following example:

// init variables
int i, j, k; 

// Init inputs
i = 1;
j = 2;

// Set output value
k = i + j;

// Update a dependent value
j = 4;
k = ?  // What should k be?

Traditional asynchronous programming approaches tend to rely on callbacks to update these changes, but this way can lead to a problem known as callback hell. Reactive programming (see an intro here) addresses these issues by providing a framework to describe outputs to reflect their changing inputs. RxJava, which is a port of the Reactive Extensions library from .NET, enables Android apps to be built in this style.

Advantages

Here are a few advantages of using RxJava on Android:

  • Simplifies the ability to chain async operations. If you need to make an API call that depends on another API call, you will likely end up implementing this call in the callback of the first one. RxJava provides a way to avoid needing to creating layers of callbacks to address this issue. For this reason, RxJava became popular within Netflix in 2014 for abstracting away the complexities of performing dependent API calls.

  • Exposes a more explicit way for declaring how concurrent operations should operate. Although RxJava is single-threaded by default, RxJava helps enable you to define more explicitly what type of threading models should be used for both background and callback tasks. Since Android only allows UI updates on the main thread, using RxJava helps make the code more clear about what operations will be done to update the views.

  • Surfaces errors sooner. One issue with AsyncTask is that errors that occur on the background thread are hard to pass along when updating the UI thread using the onPostExecute() method. In addition, there are limitations in how many AsyncTasks can be dispatched concurrently as described in this blog post. RxJava provides a way to enable these errors to be surfaced.

  • Provides powerful constructs for manipulating streams of data. One mindset shift in using RxJava is thinking about everything in terms of describing as data flows in the system. Click events generated by the user, network calls, data updates from external sources all can all be described as asynchronous streams of data. The power of RxJava is that it enables these streams to be transformed, filtered, or used to create new streams of data with only a few lines of code.

Setup

Setup your app/build.gradle:

dependencies {
  compile 'io.reactivex:rxjava:1.0.16'
  compile 'io.reactivex:rxandroid:1.0.1'
}

Creating Asynchronous Streams

RxJava defines the concept of an Observable and an Observer. An Observable is emitting values over time, either as finite or infinite duration of time. In the Android world, most of the time we are working with finite streams of data.

An Observer watches for result values emitted by the Observable . When these events occur, the role of the observer is to respond to these events. An Observable can be created from just any type of input. For instance, it can a set of strings that should be iterated:

Observable.just("a", "b", "c")  // generate an observable

Similar to the above example, the Retrofit library simply wraps a synchronous network call as an Observable type for use with RxJava. Declaring the endpoints as Observable automatically does this work.

public interface MyApiEndpointInterface {
  @GET("/users/{username}")
  Observable<User> getUser(@Path("username") String username);
}

We can then instantiate an instance of this interface and get back an Observable type:

MyApiEndpointInterface apiService =
    retrofit.create(MyApiEndpointInterface.class);

Observable<User> call = apiService.getUser(username);

To implement an observer, the following interface must be defined:

public interface Observer<T> {

    void onCompleted(); // will not be called if onError() is called

    void onError(Throwable e); 
    void onNext(T t);

}

Note that an Observer is a generic type. It must be represent the type of value that the Observable will emit. For an observer to start watching an observable that will generate string types, it must subscribe to it:

Observable.just("a", "b", "c").subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
                                                         
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {

     }
});

Schedulers

RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread. To define where the work is done, we can use observeOn() with Retrofit:

Observable<User> call = apiService.getUser(username);
call.observeOn(AndroidSchedulers.mainThread())

The RxAndroid library also includes AndroidSchedulers.mainThread() for allowing callbacks to be fired on the main UI thread.

Using schedulers relies on queuing the work through bounded or unbounded thread pools. Here are a few options available that come with RxJava. See this link for all the possible options.

Name Description
Schedulers.computation() fixed number of threads (= to # CPU's)
Schedulers.immediate() current thread
Schedulers.io() backed by a current
Schedulers.newThread() create a new thread
Schedulers.tramponline() schedule work on the current thread but put on a queue

Hot vs. Cold Observables

By default, Observables start to execute after the first subscriber is attached. Retrofit, for instance, by default operate in this way, which are known as hot observables. You can take a look at the Retrofit source code to see that the network request is made on the first subscription.

If you wish to change it so that multiple subscribers are attached before executing the request, you need to convert the Observable to an ConnectableObservable. To initiate the request, you need to call connect() on the observable:

Observable<User> call = apiService.getUser(username);

// convert Observable to ConnectedObservable, get a reference so we can call connect() later
ConnectableObservable<User> connectedObservable = call.publish();

Observer<User> observer = new Observer<User>() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(User user) {
      // do work here
   }
};        

// observer is subscribing
connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
// initiate the network request
connectedObservable.connect();

Chaining Observables

For a better understanding about how subscriptions can be chained and how RxJava works in general, it's best to first to understand what happens beneath the surfaces when this subscribe() call is made. Beneath the covers Subscriber objects are created. If we wish to chain the input, there are various operators that are available that map one Subscriber type to another.

For more context, watch this video talk.

Replacing AsyncTask

public Observable<Bitmap> getImageNetworkCall() {
    // Insert network call here!
}

Subscription subscription = getImageNetworkCall()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<Bitmap>() {

        @Override
        public void onCompleted() {
             // Update user interface if needed
        }

        @Override
        public void onError() {
             // Update user interface to handle error
        }

        @Override
        public void onNext(Bitmap bitmap) {
             // Handle result of network request
        }
});

References

Finding these guides helpful?

We need help from the broader community to improve these guides, add new topics and keep the topics up-to-date. See our contribution guidelines here and our topic issues list for great ways to help out.

Check these same guides through our standalone viewer for a better browsing experience and an improved search. Follow us on twitter @codepath for access to more useful Android development resources.

Clone this wiki locally