-
Notifications
You must be signed in to change notification settings - Fork 6.3k
RxJava
One of the challenges in writing robust Android apps is the dynamic nature of changing inputs. In traditional imperative programming models, values have to be explicitly set on variables for them to be updated. If one dependent value changes, the value will not be updated without adding another line of code. Consider the following example:
// init variables
int i, j, k;
// Init inputs
i = 1;
j = 2;
// Set output value
k = i + j;
// Update a dependent value
j = 4;
k = ? // What should k be?
Traditional asynchronous programming approaches tend to rely on callbacks to update these changes, but this way can lead to a problem known as callback hell. Reactive programming (see an intro here) addresses these issues by providing a framework to describe outputs to reflect their changing inputs. RxJava, which is a port of the Reactive Extensions library from .NET, enables Android apps to be built in this style.
Here are a few advantages of using RxJava on Android:
-
Simplifies the ability to chain async operations. If you need to make an API call that depends on another API call, you will likely end up implementing this call in the callback of the first one. RxJava provides a way to avoid needing to creating layers of callbacks to address this issue. For this reason, RxJava became popular within Netflix in 2014 for abstracting away the complexities of performing dependent API calls.
-
Exposes a more explicit way for declaring how concurrent operations should operate. Although RxJava is single-threaded by default, RxJava helps enable you to define more explicitly what type of threading models should be used for both background and callback tasks. Since Android only allows UI updates on the main thread, using RxJava helps make the code more clear about what operations will be done to update the views.
-
Surfaces errors sooner. One issue with AsyncTask is that errors that occur on the background thread are hard to pass along when updating the UI thread using the
onPostExecute()
method. In addition, there are limitations in how many AsyncTasks can be dispatched concurrently as described in this blog post. RxJava provides a way to enable these errors to be surfaced. -
Provides powerful constructs for manipulating streams of data. One mindset shift in using RxJava is thinking about everything in terms of describing a data flows in the system. Click events generated by the user, network calls, data updates from external sources all can all be described as asynchronous streams of data. The power of RxJava is that it enables these streams to be transformed, filtered, or used to create new streams of data with only a few lines of code.
Setup your app/build.gradle
:
dependencies {
compile 'io.reactivex:rxjava:1.0.16'
compile 'io.reactivex:rxandroid:1.0.1'
}
RxJava defines the concept of an Observable and an Observer. An Observable is emitting values over time, either as finite or infinite duration of time. In the Android world, most of the time we are working with finite streams of data.
An Observer watches for result values emitted by the Observable . When these events occur, the role of the observer is to respond to these events. An Observable can be created from just any type of input. For instance, it can a set of strings that should be iterated:
Observable.just("a", "b", "c") // generate an observable
It can also be the network response from an API call, such as those exposed in Retrofit.
public interface MyApiEndpointInterface {
@GET("/users/{username}")
Observable<User> getUser(@Path("username") String username);
}
MyApiEndpointInterface apiService =
retrofit.create(MyApiEndpointInterface.class);
Observable<User> call = apiService.getUser(username);
To implement an observer, the following interface must be defined:
public interface Observer<T> {
void onCompleted(); // will not be called if onError() is called
void onError(Throwable e);
void onNext(T t);
}
Note that an Observer
is a generic type. It must be represent the type of value that the Observable
will emit. For an observer to start watching an observable that will generate string types, it must subscribe to it:
Observable.just("a", "b", "c").subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread. To define where the work is done, we can use observeOn()
with Retrofit:
Observable<User> call = apiService.getUser(username);
call.observeOn(AndroidSchedulers.mainThread())
The RxAndroid library also includes AndroidSchedulers.mainThread()
for allowing callbacks to be fired on the main UI thread.
Using schedulers relies on queuing the work through bounded or unbounded thread pools. Here are a few options available that come with RxJava. See this link for all the possible options.
Name | Description |
---|---|
Schedulers.computation() | fixed number of threads (= to # CPU's) |
Schedulers.immediate() | current thread |
Schedulers.io() | backed by a current |
Schedulers.newThread() | create a new thread |
Schedulers.tramponline() | schedule work on the current thread but put on a queue |
For a better understanding about how subscriptions can be chained and how RxJava works in general, it's best to first to understand what happens beneath the surfaces when this subscribe()
call is made. Beneath the covers Subscriber
objects are created. If we wish to chain the input, there are various operators that are available that map one Subscriber
type to another.
For more context, watch this video talk.
public Observable<Bitmap> getImageNetworkCall() {
// Insert network call here!
}
Subscription subscription = getImageNetworkCall()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onCompleted() {
// Update user interface if needed
}
@Override
public void onError() {
// Update user interface to handle error
}
@Override
public void onNext(Bitmap bitmap) {
// Handle result of network request
}
});
- https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module
- http://saulmm.github.io/when-Iron-Man-becomes-Reactive-Avengers2/
- http://blog.stablekernel.com/replace-asynctask-asynctaskloader-rx-observable-rxjava-android-patterns/
- https://www.youtube.com/watch?v=_t06LRX0DV0/
- https://vimeo.com/144812843
- http://code.hootsuite.com/asynchronous-android-programming-the-good-the-bad-and-the-ugly/
- https://www.youtube.com/watch?v=va1d4MqLUGY&feature=youtu.be/
- http://www.slideshare.net/TimoTuominen1/rxjava-architectures-on-android-android-livecode-berlin/
- https://www.youtube.com/watch?v=va1d4MqLUGY&feature=youtu.be/
- https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014
Created by CodePath with much help from the community. Contributed content licensed under cc-wiki with attribution required. You are free to remix and reuse, as long as you attribute and use a similar license.
Finding these guides helpful?
We need help from the broader community to improve these guides, add new topics and keep the topics up-to-date. See our contribution guidelines here and our topic issues list for great ways to help out.
Check these same guides through our standalone viewer for a better browsing experience and an improved search. Follow us on twitter @codepath for access to more useful Android development resources.