Introduction
Developing a complex Android app that has lots of network connections, user interactions, and animations often means writing code that is full of nested callbacks. Such code, sometimes called callback hell, is not only lengthy and hard to understand, but also error-prone. ReactiveX offers an alternative approach that is both clear and concise, to manage asynchronous tasks and events.
RxJava is a JVM implementation of ReactiveX, developed by NetFlix, and is very popular among Java developers. In this tutorial, you will learn how to use RxJava bindings for Android, or RxAndroid for short, in your Android projects.
1. Setting Up RxAndroid
To use RxAndroid in an Android Studio project, add it as a compile
dependency in the app module’s build.gradle.
compile 'io.reactivex:rxandroid:0.25.0'
2. Basics of Observers and Observables
When working with ReactiveX, you will be using observables and observers extensively. You can think of an observable as an object that emits data and an observer as an object that consumes that data. In RxJava and RxAndroid, observers are instances of the Observer
interface, and observables are instances of the Observable
class.
The Observable
class has many static methods, called operators, to create Observable
objects. The following code shows you how to use the just
operator to create a very simple Observable
that emits a single String
.
Observable<String> myObservable = Observable.just("Hello"); // Emits "Hello"
The observable we just created will emit its data only when it has at least one observer. To create an observer, you create a class that implements the Observer
interface. The Observer
interface has intuitively named methods to handle the different types of notifications it can receive from the observable. Here’s an observer that can print the String
emitted by the observable we created earlier:
Observer<String> myObserver = new Observer<String>() { @Override public void onCompleted() { // Called when the observable has no more data to emit } @Override public void onError(Throwable e) { // Called when the observable encounters an error } @Override public void onNext(String s) { // Called each time the observable emits data Log.d("MY OBSERVER", s); } };
To assign an observer to an observable, you should use the subscribe
method, which returns a Subscription
object. The following code makes myObserver
observe myObservable
:
Subscription mySubscription = myObservable.subscribe(myObserver);
As soon as an observer is added to the observable, it emits its data. Therefore, if you execute the code now, you will see Hello printed in Android Studio’s logcat window.
You might have noticed that we didn’t use the onCompleted
and the onError
methods in myObserver
. As these methods are often left unused, you also have the option of using the Action1
interface, which contains a single method named call
.
Action1<String> myAction = new Action1<String>() { @Override public void call(String s) { Log.d("My Action", s); } };
When you pass an instance of Action1
to the subscribe
method, the call
method is invoked whenever the observable emits data.
Subscription mySubscription = myObservable.subscribe(myAction1);
To detach an observer from its observable while the observable is still emitting data, you can call the unsubscribe
method on the Subscription
object.
mySubscription.unsubscribe();
3. Using Operators
Now that you know how to create observers and observables, let me show you how to use ReactiveX’s operators that can create, transform, and perform other operations on observables. Let’s start by creating a slightly more advanced Observable
, one that emits items from an array of Integer
objects. To do so, you have to use the from
operator, which can generate an Observable
from arrays and lists.
Observable<Integer> myArrayObservable = Observable.from(new Integer[]{1, 2, 3, 4, 5, 6}); // Emits each item of the array, one at a time myArrayObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer i) { Log.d("My Action", String.valueOf(i)); // Prints the number received } });
When you run this code, you will see each of the numbers of the array printed one after another.
If you’re familiar with JavaScript, Ruby, or Kotlin, you might be familiar with higher-order functions such as map
and filter
, which can be used when working with arrays. ReactiveX has operators that can perform similar operations on observables. However, because Java 7 doesn’t have lambdas and higher-order functions, we’ll have to do it with classes that simulate lambdas. To simulate a lambda that takes one argument, you will have to create a class that implements the Func1
interface.
Here’s how you can use the map
operator to square each item of myArrayObservable
:
myArrayObservable = myArrayObservable.map(new Func1<Integer, Integer>() { // Input and Output are both Integer @Override public Integer call(Integer integer) { return integer * integer; // Square the number } });
Note that the call to the map
operator returns a new Observable
, it doesn’t change the original Observable
. If you subscribe to myArrayObservable
now, you will receive squares of the numbers.
Operators can be chained. For example, the following code block uses the skip
operator to skip the first two numbers, and then the filter
operator to ignore odd numbers:
myArrayObservable .skip(2) // Skip the first two items .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { // Ignores any item that returns false return integer % 2 == 0; } }); // Emits 4 and 6
4. Handling Asynchronous Jobs
The observers and observables we created in the previous sections worked on a single thread, Android’s UI thread. In this section, I will show you how to use ReactiveX to manage multiple threads and how ReactiveX solves the problem of callback hell.
Assume you have a method named fetchData
that can be used to fetch data from an API. Let’s say it accepts a URL as its parameter and returns the contents of the response as a String
. The following code snippet shows how it could be used.
String content = fetchData("http://www.google.com"); // fetches the contents of google.com as a String
This method needs to run on its own thread, because Android does not allow network operations on the UI thread. This means you would either create an AsyncTask
or create a Thread
that uses a Handler
.
With ReactiveX, however, you have a third option that is slightly more concise. Using the subscribeOn
and observeOn
operators, you can explicitly specify which thread should run the background job and which thread should handle the user interface updates.
The following code creates a custom Observable
using the create
operator. When you create an Observable
in this manner, you have to implement the Observable.OnSubscribe
interface and control what it emits by calling the onNext
, onError
, and onCompleted
methods yourself.
Observable<String> fetchFromGoogle = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { String data = fetchData("http://www.google.com"); subscriber.onNext(data); // Emit the contents of the URL subscriber.onCompleted(); // Nothing more to emit }catch(Exception e){ subscriber.onError(e); // In case there are network errors } } });
When the Observable
is ready, you can use subscribeOn
and observeOn
to specify the threads it should use and subscribe to it.
fetchFromGoogle .subscribeOn(Schedulers.newThread()) // Create a new Thread .observeOn(AndroidSchedulers.mainThread()) // Use the UI thread .subscribe(new Action1<String>() { @Override public void call(String s) { view.setText(view.getText() + "\n" + s); // Change a View } });
You might still be thinking that the reactive approach isn’t drastically better than using the AsyncTask
or Handler
classes. You are right, you don’t really need ReactiveX if you have to manage only one background job.
Now consider a scenario that would result in a complex codebase if you used the conventional approach. Let’s say you have to fetch data from two (or more) websites in parallel and update a View
only when all the requests have completed. If you follow the conventional approach, you would have to write lots of unnecessary code to make sure that the requests completed without errors.
Consider another scenario in which you have to start a background job only after another background job has completed. Using the conventional approach, this would result in nested callbacks.
With ReactiveX’s operators, both scenarios can be handled with very little code. For example, if you have to use fetchData
to fetch the contents of two websites, fore example Google and Yahoo, you would create two Observable
objects, and use the subscribeOn
method to make them run on different threads.
fetchFromGoogle = fetchFromGoogle.subscribeOn(Schedulers.newThread()); fetchFromYahoo = fetchFromYahoo.subscribeOn(Schedulers.newThread());
To handle the first scenario in which both requests need to run in parallel, you can use the zip
operator and subscribe to the Observable
it returns.
// Fetch from both simultaneously Observable<String> zipped = Observable.zip(fetchFromGoogle, fetchFromYahoo, new Func2<String, String, String>() { @Override public String call(String google, String yahoo) { // Do something with the results of both threads return google + "\n" + yahoo; } });
Similarly, to handle the second scenario, you can use the concat
operator to run the threads one after another.
Observable<String> concatenated = Observable.concat(fetchFromGoogle, fetchFromYahoo); // Emit the results one after another
5. Handling Events
RxAndroid has a class named ViewObservable
that makes it easy to handle events associated with View
objects. The following code snippet shows you how to create a ViewObservable
that can be used to handle the click events of a Button
.
Button myButton = (Button)findViewById(R.id.my_button); // Create a Button from a layout Observable<OnClickEvent> clicksObservable = ViewObservable.clicks(myButton); // Create a ViewObservable for the Button
You can now subscribe to clicksObservable
and use any of the operators you learned about in the previous sections. For example, if you want your app to skip the first four clicks of the button and start responding from the fifth click onwards, you could use the following implementation:
clicksObservable .skip(4) // Skip the first 4 clicks .subscribe(new Action1<OnClickEvent>() { @Override public void call(OnClickEvent onClickEvent) { Log.d("Click Action", "Clicked!"); // Printed from the fifth click onwards } });
Conclusion
In this tutorial, you learned how to use ReactiveX’s observers, observables, and operators to handle multiple asynchronous operations and events. As working with ReactiveX involves functional, reactive programming, a programming paradigm most Android developers are not used to, don’t be too hard on yourself if you don’t get it right the first time. You should also know that ReactiveX code will be a lot more readable if you use a modern programming language, such as Kotlin, that supports higher-order functions.
To learn more about reactive extensions, I encourage you to browse the resources available at ReactiveX.
Comments