Creating a Reactive Data Layer with Realm and RxJava2

Realm is an object database that provides the ability to define your schema with classes, supports links directly between objects (instead of relations via foreign keys and join tables), and is known for its easy-to-use lazy-loaded data access.

With Realm’s new major version release, there are lots of new, exciting features available! Along with a major upgrade to Realm Sync and the addition of partial sync, there is support for RealmList of primitives (like RealmList<String>), and also in-built RxJava2 support.

To understand how to use Realm effectively with RxJava2, first we must delve into why Realm is suitable to be exposed as a reactive event source, and how to use its reactivity to the fullest.

Previously: Realm notifications and coarse grained change listeners

Even from the earliest versions of Realm, it was possible to add a change listener to a Realm instance, or a RealmResults that was obtained from the Realm by executing a query. This is also what the RxJava1 support was based on.

// from 0.86.0 - 0.88.3
Realm realm;
RealmResults<Task> results;

// when a write happens, this change listener will be called.
RealmChangeListener realmChangeListener = new RealmChangeListener() {
    @Override
    public void onChange() {
        Log.i("TAG", "Change occurred!");
    }
};

public void onCreate(Bundle bundle) {
    //...
    realm = Realm.getDefaultInstance();
    realm.addChangeListener(realmChangeListener);
    results = realm.where(Task.class).findAll();
    results.addChangeListener(realmChangeListener);
    // ...
}

public void onDestroy() {
    // ... remove listeners, etc.        
    realm.close();
}

These listeners were called when a Realm was written to, and when a query’s class or any of its linked classes were modified in a transaction, respectively.

Realm’s auto-updating mechanism

Realm manages that when a write transaction occurs on any thread, then this change event is dispatched to every thread that is associated with a run-loop (such as the Looper in Android) - the Realm is updated, and all RealmResults are updated along with it.

public class MyActivity extends AppCompatActivity {
        
    Realm realm;
    RealmResults<Task> results; // kept as strong reference!
    RealmChangeListener<RealmResults<Task>> listener = new RealmChangeListener<RealmResults<Task>>() {
        
    @Override
    public void onChange(RealmResults<Task> results) {
            if(results.isLoaded()) {
                // results is always up to date here
                // after a write to Realm from ANY thread!
                updateUi(results);
            }
        }
    }

    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.my_activity);
        ButterKnife.bind(this);

        realm = Realm.getDefaultInstance();
        results = realm.where(Task.class)
                                .findAllSortedAsync(TaskFields.ID); 
                                  // TaskFields is generated 
                                  // using https://github.com/cmelchior/realmfieldnameshelper
        results.addChangeListener(listener);
    }

    protected void onDestroy() {
        super.onDestroy();
        results.removeChangeListener(listener);
        results = null;
        realm.close();
    }

    private void updateUi(List<MyObject> objects) {
        // do something
    }
}

Get more development news like this

This works well, because each RealmResults is lazy-loaded, and data is read only when a given managed RealmObject is accessed. But as they just point to the data, then when the Realm is updated, so is the data that the RealmResults points to. Therefore, it changes in place, and we need to be notified of these changes. That is what the change listener is for!

Earlier RxJava support

The current RxJava1 support was added to Realm in 0.86.0, and can be seen as realm.asObservable() or realmResults.asObservable(). These methods expose the previously mentioned listeners as rx.Observable. This allowed us to react to changes whenever the Realm was written to, on threads where Realm is able to deliver notifications.

We can see this in RealmObservableFactory:

return Observable.create(new Observable.OnSubscribe<Realm>() {
    @Override
    public void call(final Subscriber<? super Realm> subscriber) {
        // ...
        final Realm observableRealm = Realm.getInstance(realmConfig);
        final RealmChangeListener<Realm> listener = new RealmChangeListener<Realm>() {
            @Override
            public void onChange(Realm realm) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(observableRealm);
                }
            }
        };
        observableRealm.addChangeListener(listener);
        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                observableRealm.removeChangeListener(listener);
                observableRealm.close();
            }
        }));
        subscriber.onNext(observableRealm);
    }
});

Of course, this is also why onCompleted() was never called - a listener doesn’t just stop emitting events, right?

Now: Fine-grained change listeners and collection changes

Since Realm 3.1, these so-called coarse grained change listeners became fine-grained change listeners, meaning that they are only called if that given RealmResults (or one of its RealmList field’s element) was modified.

On top of that, the exact changes are evaluated as well - that on a given position, an insertion / deletion / modification occurred (allowing proper RecyclerView animation, for example).

We can see this in action if we check the source code for the RealmRecyclerViewAdapter from realm-android-adapters.

public abstract class RealmRecyclerViewAdapter<T extends RealmModel, VH extends RecyclerView.ViewHolder>
        extends RecyclerView.Adapter<VH> {
    
    private OrderedRealmCollection<T> adapterData;

    private OrderedRealmCollectionChangeListener createListener() {
        return new OrderedRealmCollectionChangeListener() {
            @Override
            public void onChange(Object collection, OrderedCollectionChangeSet changeSet) {
                // null Changes means the async query returns the first time.
                if (changeSet == null) {
                    notifyDataSetChanged();
                    return;
                }
                // For deletions, the adapter has to be notified in reverse order.
                OrderedCollectionChangeSet.Range[] deletions = changeSet.getDeletionRanges();
                for (int i = deletions.length - 1; i >= 0; i--) {
                    OrderedCollectionChangeSet.Range range = deletions[i];
                    notifyItemRangeRemoved(range.startIndex, range.length);
                }

                OrderedCollectionChangeSet.Range[] insertions = changeSet.getInsertionRanges();
                for (OrderedCollectionChangeSet.Range range : insertions) {
                    notifyItemRangeInserted(range.startIndex, range.length);
                }

                if (!updateOnModification) {
                    return;
                }

                OrderedCollectionChangeSet.Range[] modifications = changeSet.getChangeRanges();
                for (OrderedCollectionChangeSet.Range range : modifications) {
                    notifyItemRangeChanged(range.startIndex, range.length);
                }
            }
        };
    }

    // ...

RxJava2 support with Flowable and changeset Observable

With all that in mind, the new major version of Realm introduces realm.asFlowable(), realmResults.asFlowable(), and realmResults.asChangesetObservable().

As RealmResults are always updated to be the latest version, it makes sense that you would need it as a Flowable with LATEST backpressure strategy.

return Flowable.create(new FlowableOnSubscribe <Realm>() {
    @Override
    public void subscribe(final FlowableEmitter<Realm> emitter) throws Exception {
        // ...
        final Realm observableRealm = Realm.getInstance(realmConfig);
        final RealmChangeListener<Realm> listener = new RealmChangeListener<Realm>() {
            @Override
            public void onChange(Realm realm) {
                if (!emitter.isCancelled()) {
                    emitter.onNext(realm);
                }
            }
        };
        observableRealm.addChangeListener(listener);

        // Cleanup when stream is disposed
        emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
            @Override
            public void run() {
                observableRealm.removeChangeListener(listener);
                observableRealm.close();
            }
        }));

        // Emit current value immediately
        emitter.onNext(observableRealm);
    }
}, BackpressureStrategy.LATEST);

For change sets, it might be useful to know every collection change to handle animations properly, so it is exposed as Observable.

Can I pass RealmResults between threads yet?

No, that limitation still stands. After all, RealmResults is a collection of thread-local proxy views to the database (and the RealmResults is a thread-local proxy view as well), one that sees the objects that satisfy the query conditions, and are available in that given local Realm instance that it was created from.

This however is also why any data access in Realm is lazy-loaded.

Because of Realm’s MVCC (multi-version concurrency control) architecture, each view is thread-local (meaning each thread only sees a given version of the Realm), therefore passing managed objects and managed results between threads isn’t really possible*.

*Async queries do actually pass the results from a background thread to the looper thread, but Realm manages that for you in the background automatically.

The good news is that if you use Realm as intended (which is to open and close the Realm instance for the duration of that thread), then you’ll be able to utilize the simplest form of threading for your local data access: not trying to do “threading things” at all!

No AsyncTask needed. No more manual callbacks in onPostExecute() (then causing memory leaks on rotation).

Just define a RealmResults, subscribe for changes (or expose it as Flowable) on the main thread, and it will automatically reflect any change made from any background thread write. Update the UI when change occurs.

And on background threads, just access the RealmResults via the synchronous query API.

Using Realm as the core of a reactive data layer

When people architect their data layer, the common setup is that there is a remote data source, a local data source, and an in-memory cache. For each data access, there is a selection between them by the so-called “repository”. However, as the repository doesn’t emit change events, any invalidation and refreshing is done manually.. This is the so-called “cold” repository implementation.

Cold Repository Pattern

Well with Realm, we don’t need any of that complexity! As Realm is lazy-loaded and always points to the same data, there is no need for an in-memory cache.

As we would like to design the application not to have to re-download the data when navigating back (even after putting the app in background for a long time), we ought to read from the local data source if it’s available (it’s faster than remote access!), and the synchronization should happen seamlessly in the background if necessary. Update the UI with the new data when done.

Therefore, instead of trying to read with a one-off operation from the cache, the local data store, or the remote store; we can instead just subscribe for changes to the local datasource and be up to date no matter what operation modified the database, where and when! This is the “hot”, reactive repository implementation.

Hot Repository Pattern

With that, we’ve turned our data access reactive.

Now we can simplify our data loading logic from something like this traditional SQLite-based non-reactive solution:

public void start() {
    loadTasks(false);
}

public void loadTasks(boolean forceUpdate) {
    loadTasks(forceUpdate, true);
}

/**
 * @param forceUpdate   Pass in true to refresh the data in the {@link TasksDataSource}
 * @param showLoadingUI Pass in true to display a loading icon in the UI
 */
private void loadTasks(boolean forceUpdate, final boolean showLoadingUI) {
    if (showLoadingUI) {
        dataLoading.set(true);
    }
    if (forceUpdate) {
        mTasksRepository.refreshTasks();
    }

    mTasksRepository.getTasks(new TasksDataSource.LoadTasksCallback() {
        @Override
        public void onTasksLoaded(List<Task> tasks) {
            List<Task> tasksToShow = new ArrayList<Task>();

            // We filter the tasks based on the requestType
            for (Task task : tasks) {
                switch (mCurrentFiltering) {
                    case ALL_TASKS:
                        tasksToShow.add(task);
                        break;
                    case ACTIVE_TASKS:
                        if (task.isActive()) {
                            tasksToShow.add(task);
                        }
                        break;
                    case COMPLETED_TASKS:
                        if (task.isCompleted()) {
                            tasksToShow.add(task);
                        }
                        break;
                    default:
                        tasksToShow.add(task);
                        break;
                }
            }
            if (showLoadingUI) {
                dataLoading.set(false);
            }
            mIsDataLoadingError.set(false);
            notifyPropertyChanged(BR.empty);
            updateUI(tasksToShow);
        }

        @Override
        public void onDataNotAvailable() {
            mIsDataLoadingError.set(true);
        }
    });
}

To this:

Realm realm;
RealmResults<Task> liveResults;

public void start() {
    realm = Realm.getDefaultInstance();
    reloadTasks();
}

public void reloadTasks() {
    if(liveResults != null && liveResults.isValid()) {
        liveResults.removeChangeListener(this);
    }
    dataLoading.set(true);
    liveResults = getFilteredResults(realm);
    liveResults.addChangeListener(this);
}

public void stop() {
    liveResults.removeChangeListener(this);
    liveResults = null;
    realm.close();
}

private RealmResults<Task> getFilteredResults(Realm realm) {
    switch(selectedFilter) {
        case ALL_TASKS:
            return tasksRepository.getTasks(realm);
        case ACTIVE_TASKS:
            return tasksRepository.getActiveTasks(realm);
        case COMPLETED_TASKS:
            return tasksRepository.getCompletedTasks(realm);
        default:
            throw new IllegalArgumentException("Invalid filter type [" + selectedFilter + "]");
    }
}

@Override
public void onChanged(RealmResults<Task> tasks) {
    if(!tasks.isLoaded()) {
        return; // loading...
    }
    notifyPropertyChanged(BR.empty);
    dataLoading.set(false);
    updateUi(tasks);
}

Much simpler, right? The big benefit here is that we no longer need to call a loadTasks() method manually. If any change happens, the UI is updated automatically.

But what if I want to eagerly evaluate the results on a background thread, pass it to the UI thread, and still retain Realm’s reactivity?

Luckily, while it’s a bit tricky, it’s not impossible. All you need is your own HandlerThread that you use as Realm’s scheduler for executing asynchronous queries. That way, Realm can keep your results up to date even on a background looper thread.

Scheduler looperScheduler;
Scheduler writeScheduler;

public TaskRepository() {
    // read scheduler
    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler = AndroidSchedulers.from(handlerThread.getLooper());
    }

    // write scheduler
    writeScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
}

// this sample assumes that both Realm's models, and "immutable domain models" exist.
// As they are separate, a mapping between them is required.
// However, this mapping results in eager evaluation of results.
public Observable<List<Task>> getTasks() {
    return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<DbTask> dbTasks = realm.where(DbTask.class).findAllSortedAsync(DbTaskFields.ID, Sort.ASCENDING);
        final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<Task> tasks = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(tasks);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(dbTasks.isValid()) {
                dbTasks.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        dbTasks.addChangeListener(realmChangeListener);
    })
    .subscribeOn(looperScheduler)
    .unsubscribeOn(looperScheduler);
}

public void insertTask(Task task) {
    Single.create((SingleOnSubscribe<Void>) singleSubscriber -> {
        try(Realm r = Realm.getDefaultInstance()) { // <-- auto-close
            r.executeTransaction(realm -> {
                realm.insertOrUpdate(taskMapper.toRealm(task));
            });
        }
    }).subscribeOn(writeScheduler.getScheduler()).subscribe();
}

Of course, eagerly evaluating every element in the RealmResults at once by copying it from the Realm means that we might need to re-introduce the in-memory cache, and eager evaluation is not as suitable for very large data sets either.

In fact, eager evaluation can be a lengthy operation depending on the size of the results, in which case we might want to pass the actual reading (copying) operation to another thread specifically for reading, and different from the thread you listen for notifications on.

But normally, you’d just want to stay on the same thread, and use Realm’s in-built Rx2 support instead.

Conclusion

With Realm’s Rx2 support, we can leverage the power of reactive event streams, and provide a unified interface for our data access - while providing a reactive data layer, of course.

// can be singleton too!
public class RealmTaskRepository implements TaskRepository {
    @Override
    public Flowable<List<Task>> getTasks() {
        try(Realm realm = Realm.getDefaultInstance()) {
            RealmQuery<Task> query = realm.where(Task.class);
            Flowable<RealmResults<Task>> flowable;
            if(realm.isAutoRefresh()) { // for looper threads. Use `switchMap()`!
                flowable = query
                      .findAllSortedAsync(TaskFields.ID)
                      .asFlowable()
                      .filter(RealmResults::isLoaded);
            } else { // for background threads
                flowable = Flowable.just(query.findAllSorted(TaskFields.ID));
            }
            // RealmResults<T> is a list, so we can return it as a List<T> 
            // but the compiler needs coercing
            
            // noinspection unchecked
            return (Flowable)flowable;
        }
    }
}

On top of the ability to expose Realm’s results as a Flowable, we can ditch managing an in-memory cache entirely, thanks to Realm’s lazy loading. No manual cache invalidation, as Realm manages that for us.

With that, we can listen for changes, and update the views when that happens. Turning data access reactive can be easy - Realm gives you the tools you need, out of the box.

Next Up: Demo: Build a Reactive App Using the Realm Platform

General link arrow white

About the content

This content has been published here with the express permission of the author.


Gabor Varadi

Gabor Varadi, otherwise known as Zhuinden, or EpicPandaForce, is an Android developer, and author of simple-stack. Long-time Realm evangelist. Admires elegant code, favors single-Activity design, and loves to discuss application architecture.

4 design patterns for a RESTless mobile integration »

close