What is Reactive Programming?
Reactive programming is programming with asynchronous data streams. It enables to create streams of anything – events, fails, variables, messages and etc. By using reactive programming in your application, you are able to create streams which you can then perform actions while the data emitted by those created streams.
Observer Pattern
The observer pattern is a software design pattern which defines a one-to-many relationship between objects. It means if the value/state of the observed object is changed/modified, the other objects which are observing are getting notified and updated.
ReactiveX
ReactiveX is a polyglot implementation of reactive programming which extends observer pattern and provides a bunch of data manipulation operators, threading abilities.
RxJava
RxJava is the JVM implementation of ReactiveX.
- Observable – is a stream which emits the data
- Observer – receives the emitted data from the observable
- onSubscribe() – called when subscription is made
- onNext() – called each time observable emits
- onError() – called when an error occurs
- onComplete() – called when the observable completes the emission of all items
- Subscription – when the observer subscribes to observable to receive the emitted data. An observable can be subscribed by many observers
- Scheduler – defines the thread where the observable emits and the observer receives it (for instance: background, UI thread)
- subscribeOn(Schedulers.io())
- observeOn(AndroidSchedulers.mainThread())
- Operators – enable manipulation of the streamed data before the observer receives it
- map()
- flatMap()
- concatMap() etc.
Example usage on Android
Tools, libraries, services used in the example:
- Libraries:
- ButterKnife – simplifying binding for android views
- RxJava, RxAndroid – for reactive libraries
- Retrofit2 – for network calls
- Fake online rest API:
- https://jsonplaceholder.typicode.com/
- /users – all users
- /users/{id}/todos – todo list of user with specified id
- /todos – todo list of all users
- https://jsonplaceholder.typicode.com/
- Java object generator from JSON file
What we want to achieve is to fetch users from 1. show in RecyclerView and load todo list to show the number of todos in the same RecyclerView without blocking the UI.
Here we define our endpoints. Retrofit2 supports return type of RxJava Observable for network calls.
@GET("/users")
Observable<List<User>> getUsers();
@GET("/users/{id}/todos")
Observable<List<Todo>> getTodosByUserID(@Path("id") int id);
@GET("/todos")
Observable<List<Todo>> getTodos();
Let’s fetch users:
- .getUsers – returns observable of a list of users
- .subscribeOn(Schedulers.io()) – make getUser() performs on background thread
- .observeOn(AndroidSchedulers.mainThread()) – we switch to UI thread
- flatMap – we set data to RecyclerView and return Observable user list which will be needed in fetching todo list
private Observable<User> getUsersObservable() {
return ServicesProvider.getDummyApi()
.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap((Function<List<User>, ObservableSource<User>>) users -> {
adapterRV.setData(users);
return Observable.fromIterable(users);
});
}
Now, fetch todo list of users using the 2nd endpoint.
Since we are not going to make another call, we don’t need Observable type in return of this method. So, here we use map() instead of flatMap() and we return User type.
private Observable<User> getTodoListByUserId(User user) {
return ServicesProvider.getDummyApi()
.getTodosByUserID(user.getId())
.subscribeOn(Schedulers.io())
.map(todoList -> {
sleep();
user.setTodoList(todoList);
return user;
});
}
Now, fetch todo list of users using the 3rd endpoint.
The difference to the 2nd endpoint is that this returns a list of todos for all users. Here we can see the usage of filter() operator.
private Observable<User> getAllTodo(User user) {
return ServicesProvider.getDummyApi()
.getTodos()
.subscribeOn(Schedulers.io())
.flatMapIterable((Function<List<Todo>, Iterable<Todo>>) todoList -> todoList)
.filter(todo -> todo.getUserId().equals(user.getId()) && todo.getCompleted())
.toList().toObservable()
.map(todoList -> {
sleep();
user.setTodoList(todoList);
return user;
});
}
- .flatMapIterable() – is used to convert Observable<List<T>> to Observable<T> which is needed for filter each item in list
- .filter() – we filter todos to get each user’s completed todo list
- .toList().toObservable() – for converting back to Observable<List<T>>
- .map() – we set filtered list to user object which will be used in next code snippet
Now, the last step, we call the methods:
getUsersObservable()
.subscribeOn(Schedulers.io())
.concatMap((Function<User, ObservableSource<User>>) this::getTodoListByUserId) // operator can be concatMap()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(User user) {
adapterRV.updateData(user);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "completed!");
}
});
- subscribeOn() – makes the next operator performed on background
- concatMap() – here we call one of our methods getTodoListByUserId() or getAllTodo()
- .observeOn(), .subscribe() – every time the user’s todo list is fetched from api in background thread, it emits the data and triggers onNext() so we update RecyclerView in UI thread
- Left
- getTodoListByUserId()
- flatMap()
- Right
- concatMap()
- getAllTodo() – filter usage
Difference between flatMap and concatMap is that the former is done in an arbitrary order but the latter preserves the order
Disposable
When an observer subscribes to an observable, a disposable object is provided in onSubscribe() method so it can later be used to terminate the background process to avoid it returning from callback to a dead activity.
private CompositeDisposable disposables = new CompositeDisposable();
observableobject.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.dispose();
}
Summary
In this post, I tried to give brief information about reactive programming, observer pattern, ReactiveX library and a simple example on android.
Why should you use RxJava in your projects?
- less boilerplate code
- easy thread management
- thread-safety
- easy error handling
Gitlab Repository
Example sourcecode: https://gitlab.com/47northlabs/public/android-rxjava