Room With RxJava
| | | | |

Android Room Persistent And RxJava2

We all want to make our app reactive, less boilerplate code, the power of asynchronous and observable queries. The good news is Room library supports async queries which return LiveData or RxJava2 observable types. The LiveData and Observables allow you to get an automatic update whenever the data changes.

Now let’s say we need to read all the Employes from our local database. To get employees from the database, we could write the following query in DAO class like this.

@Query(“SELECT * FROM employee”)
fun getAllEmployes() : List<Employe>

 

This query has three disadvantages:

  1. It is a blocking synchronous call.
  2. We need to manually call this query from a Background Thread which is a hard process and listen back in Main Thread.
  3. We need to manually call this method every time when our employee list modified.

So, like I said earlier Room provide asynchronous queries with the help of RxJava Maybe, Single, and Flowable objects.

To start using Room with RxJava2, add the following dependency in the build. gradle file.

// RxJava with Room
implementation “android.arch.persistence.room:rxjava2:$current_version”

 

Note: By the way, I wrote a pretty good article on how to work with Room Persistence go and check it out.

 

Flowable

So, we’re going to use our previous example of reading all employees from the database. In DAO we specify for the method output type Flowable.

@Query("SELECT * FROM employee")
fun getAllEmployees() : Flowable<List<Employee>>

Subscribe and get the data.

roomDb.employeeDao().getAll()
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(object : Consumer<List<Employee>>()
           @Override
           public void accept(List<Employee> employees){
                 // showListOfEmployess(employees);
           }
       })

The subscribeOn in the case of Flowable will not be needed. The query of the database will not be executed in the UI Thread. But in order to read the stream in the UI, we use observeOn with Main Thread. Now with any data modify in the database, we will receive the latest data in the accept method. Most important we don’t need to call again each time for changes.

Let’s take another example where we need to compose only one record. In this example, we’re also returning the Flowable.

@Query("SELECT * FROM employee WHERE id = :id")
fun getEmployeeById(long id) : Flowable<Employee>

Subscribe and get the data.

roomDb.employeeDao().getById(employeeId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Employee>() {
       @Override
       public void accept(Employee employee) {
            // showEmployee(employee) 
    }
});

If the employee found in the database, it will come to accept method immediately after the subscription. Also whenever the employee modifies in the database, it will also come to accept.

Problem:

If there is no record, immediately after the subscription nothing will come, and even if later employee appears in the database. With this, our Flowable will not send us anything at all. Thus, it will look like it is still executing the request.

The above example can be corrected as follows.

@Query("SELECT * FROM employee WHERE id = :id")
fun getEmployeeById(long id) : Flowable<List<Employee>>

Although we only expect one record if there is no record, then at least we’ll get an empty list instead of complete silence.

Single

Consider the same example with a single record query, but using Single. In DAO we specify method output type as Single.

@Query("SELECT * FROM employee WHERE id = :id")
fun getEmployeeById(long id) : Single<Employee>

Subscribe and get the data.

roomDb.employeeDao().getById(employeeId)
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new DisposableSingleObserver<Employee>() {
           @Override
           public void onSuccess(Employee employee) {
                 // showEmployee(employee)
           }
       });

If such a record exists in the database, it will come to onSuccess. After that, Single will be considered complete and on subsequent updates of this record, nothing will come again even if the record modifies.

Note: Unlike Flowable, with Single, you must use onSubscribe to specify the thread to execute the query. Otherwise, an error will occur on the onError: java.lang.IllegalStateException. Cannot access database on the Main Thread.

Problem:

If there is no such entry in the database, then we get an error EmptyResultSetException query returned an empty result. After that, Single will be considered complete, and even if such a record appears in the database, then we will not be coming again.

Maybe

Consider the same example with a single record query, but using Maybe. Once we call subscribe method on Maybe it is considered completes. In DAO we specify method output type Maybe.

@Query("SELECT * FROM employee WHERE id = :id")
fun geEmployeetById(long id) : Maybe<Employee>

Subscribe and get the data.

roomDb.employeeDao().getById(employeeId)
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new DisposableMaybeObserver<Employee>() {
           @Override
           public void onSuccess(Employee employee) {
               // showEmployee(employee)
           }
      });

You see with Maybe you also need to specify in which thread you want to execute the query and in which thread you want to observe the result. If an employee found in the database it will come to the onSuccess method. After the subscription the Maybe will be considered completes. It will never be able to listen to the modification of employee.

If there is no employee found in the database then we got onComplete.

Protip

To update the data into the database you can simply return Unit or Int because for update Room did not supports Observables. But you can wrap the method inside the Completable. Let’s see how we can update the employee.

@Update
fun updateEmployee(emp : Employee )

Call method and update the data.

Completable.fromAction {
    roomDb
       .employeeDao()
       .updateEmployee(updatedEmployee)
}.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            // data updated
        }

Conclusion

Single and Maybe are suitable for one-time data retrieval. The difference between that Maybe is admitting that there may be a record. And Single is more logical if the record should be in the database if not you will get an error. Flowable is more suitable if you plan to listen to the data automatically when it changes.

That’s it! This is my demonstration about how we can efficiently use RxJava2 and Room together. I hope you guys have learned something from this post. If you have a question regarding this post please do comment below.

Thank you for being here and keep reading.

Similar Posts

5 Comments

  1. onError: java.lang.IllegalStateException. Cannot access database on the Main Thread.

    I am getting this same error

    1. Hey HariniPasupathy,
      This error will come if you try to access your database in MainThread and for that you need to use onSubscribe method with Scheduler.io().

  2. Hi. You wrote very useful article. Thank you. But I don’t quite understand the case when we doesn’t need to get updates for List as opposite to Flowable case. What we need to use as an observable?

    1. If you do not want to get updates you can easily use Single instead of Observable. Single is considered complete after the result come to onSuccess on the other hand with Observable you will get notified every-time whenever the records updates.

  3. awesome article, come up with more articles like this…and some big real-time example…
    Thanks…

Comments are closed.