Functional on Android: Lambdas, Rx, and Streams in Your App

On Android, more and more developers are attracted to the functional programming-style concept of declarative data manipulation using lambdas. Java 8 has a new streams API, but it’s limited to Android N. Backports exist, but it’s RxJava that’s all the rage, with its elegant threading solution.

How do we use lambdas, streams, and Rx effectively on Android? What about orientation changes and background tasks? Check out Eric’s proposal from Mobilization 2016: stop worrying about the lifecycle, and cache your way into a blissful user experience.


Introduction (0:00)

I’ll be talking today about functional Android. What is the difference between functional and reactive? What do lambdas mean? How does it all come together?

I started using Rx by seeing that RetroFit supports it. You get this super elegant way of making network calls on the background threads and getting a background on your main threads. There are no async tests, no callbacks, and no boilerplate.

RateBeer and RxCupboard (1:14)

I got interested in it as part of my open source work, and I started playing around with it. If you know me, it’s probably because of these open source works: either through the RxCupboard library, or via one of my apps like RateBeer.

RateBeer app is an open source app. I wouldn’t say that it has the best architecture, but for me, it served as a real playground for Rx. Rx brings functional concepts I knew from my background in science, about functional programming and PASCAL, but it’s not really Java.

Lambdas in Java 8 (2:06)

Java is an object-oriented programming language. In Java 8, we have lambdas which bring ideas of streaming to Java and making transformations. This is reactive, right?

It’s true that lambdas come from functional programming languages, and we all know in our classes, for example when we write comparators to sort the collection, we have written code like this all the time.


List<String> numbers = Arrays.asList("5", "3", "4", "1", "2");

Collections.sort(numbers, new Comparator<String>() {
    @Override
    public int compare(String s1, String s2) {
        Integer i1 = Integer.valueOf(s1);
        Integer i2 = Integer.valueOf(s2);
        return i1.compareTo(i2);
    }
});

Trimming unnecessary bits… (2:21)

Lambdas are great things. Android Studio will even tell you if you are writing it wrong: it will give you a little gray line and will try to collapse code into something that’s much more concise, even if you’re not using lambdas. It only shows the code that’s functional.

…into lambda form (3:15)

Because we can do away with it, the compiler knows that you’re going to put a comparator or string in here, so we can remove this. We also know the method signature, so we can certainly remove that in the parameters. What we are left with is really the core of the code.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

Collections.sort(numbers, (s1, s2) -> {
    Integer i1 = Integer.valueOf(s1);
    Integer i2 = Integer.valueOf(s2);
    return i1.compareTo(i2);
}); 

You could use Rx without lambdas and lambda notation, but I suggest you should use RetroLambda or Jack compiler.

As an anonymous inner class (4:01)

Our lambda is written in one line of code, and we now have our comparator. If you are not using Java 8 natively, but an Android backport of lambdas or RetroLambda, it will still compile this into an anonymous inner class. The code is now concise.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

Collections.sort(numbers, (s1, s2) -> Integer.valueOf(s1).compareTo(Integer.valueOf(s2)));

As a functional reference (4:30)

You could even make it a little bit more concise. You refer to this method, but you can even write it as a functional reference. I think this is concise code, and we will continue to use this notation throughout.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

Collections.sort(numbers, this::compareAsInt);

private int compareAsInt(String s1, String s2) {
    return Integer.valueOf(s1).compareTo(Integer.valueOf(s2));
}

Streaming in Java 8 and RxJava (4:56)

Java 8 now has streams. You wouldn’t use a code where you loop over numbers; instead, you would already have this list of numbers and want to apply these two transformations, converting it to an integer or filtering the even numbers.

You would, of course, apply a mapping and a filter operator. We have a stream, and we map, and we filter, and we do something with the result.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

numbers.stream() // start streaming
    .map(Integer::valueOf) // convert to int
    .filter(i -> i % 2 == 0) // filter even
    .forEach(this::log);
    

Using StreamSupport… (6:09)

Even if you don’t want to or cannot use Java 8, you could simply use StreamSupport which backports this feature, because Java 8 is not readily available for us on Android. Although Android N did add support for streaming APIs in Android N, it means that you have to use the minimalistic end of Android N, so it will probably be a long time until can use that.

However, you could use a StreamSupport library with pretty much the same location and the same performance as well. Slightly less attractive perhaps, because you have to replicate this list, but that’s a really minor thing.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

StreamSupport.stream(numbers) // start streaming
    .map(Integer::valueOf)
    .filter(i -> i % 2 == 0)
    .forEach(this::log);
    

Get more development news like this

…or RxJava! (6:28)

Yet, how do these relate to Rx and reactive frameworks? If you would write this in a reactive way in RxJava, it looks pretty much the same. You map it with an Observable, we subscribe to it, and that’s it.


List numbers = Arrays.asList("5", "3", "4", "1", "2");

Observable.from(numbers) // create observable
    .map(Integer::valueOf)
    .filter(i -> i % 2 == 0)
    .subscribe(this::log); // start emitting
    

But there’s actually a vast difference between them. You could start out with streams in your application if you want to learn more about how these streams of data objects are going to be mapped and shown somewhere, but there’s no combination and no threading. You have no control over the threading.

Reactive Programming (7:05)

Reactive programming is much more than just the streaming API. It’s interesting to use it to just learn maybe, but the reactive aspect adds so much more. Even though it is just a source of data objects that are being emitted via an Observable, we also get the onComplete and the onError that we have seen.

These are very powerful concepts, and we get threading too. There are collections, but the collections just push items through a stream and it’s a one-time use. Reactive interfaces are different frameworks. Rx is just one example, you have Rx 2 and it’s release candidate 5 just now. So we’re sure going to be doing RxJava 2 a lot now.

Streaming and Reactive Interfaces (7:34)

Akka Stream exists, which is written in Scala, so you wouldn’t really easily pick that up in Android.

Reactor Core exists, which is also a really good reactive library, but you have to use proper Java 8 for it, so it’s not really useful on Android. Java 9 will add its own flavor called the Flow APIs. All Java 8 does right now is specify reactive interfaces: functional interfaces that other libraries built upon. With RxJava 2 there will be an interrupt with other reactive libraries.

The conclusion is still the same: use RxJava on Android, and not streams or one of the other reactive frameworks.

Now we’ll get to some more code. Part of the beauty of using Rx on Android is in the transformations of your data and the operators that we can apply, but definitely also the threading.

Observables in RxJava (9:21)

We use Rx all the time because we can do such elegant threading, but by nature, unless you do something manually, it’s not multi-threaded at all or it will be synchronous.

Here, if we just map numbers (which are the IDs), and we do some blocking network call, and we filter it out and we apply this into some log, it’s still all synchronous code. If you do this on the menu, it will still block.


List ids = Arrays.asList(5, 3, 4, 1, 2);

Observable.from(ids)
    .map(i -> Network.blockingCall(i))
    .filter(item -> item.shouldShow())
    .subscribe(this::log);
    

RateBeer example (9:58)

We get some information from our beer database. We call the network and use RetroFit, because it has native support for Rx. With RetroFit, you just get beer, give an ID, and then you have an Observable. Then you map it. Maybe you only want to filter out beers that have a rating higher than 80% and only show those in the interface.

These numbers, which we originally had on our list, will get emitted one by one through the chain and will get transformed into something we’ll show in our interface.


List ids = Arrays.asList(5, 3, 4, 1, 2);

Observable.from(ids)
    .map(i -> Network.getBeer(i))
    .filter(beer -> beer.rating > 0.8F)
    .subscribe(this::showBeer);
    

Non-blocking Observables(10:48)

One thing to notice is that since it’s still synchronous. What we really want to do is indicate to our Rx where we want to switch threads. We subscribe to the IO thread because we want to start emitting items on that thread. There, we do a mapping to get the bare information from the network, and we filter it. We’re still on the IO thread, and then we observe this back to the main thread.

That’s the moment you switch to the next thread. We can do this multi-threading very beautifully on Android. We’re no longer blocking the user. Not only that, we’re actually doing reactive pooling.

Reactive pooling means that if you, for example, apply an operator take(1), we only need one item that we want to show in our interface: the first beer in this list of five IDs that has a rating higher than 80%.

The first time you get the data from the network, you’ll compare this to the first item with a lower than 80% rating, so it’s not emitted. The second item goes through. RxJava will request a new item from the original source, which is your Observable numbers, number three. Now say we make the network request with this number, and if it represents a beer that has a rating higher than 80%, this item will get emitted further.


List ids = Arrays.asList(5, 3, 4, 1, 2);

Observable.from(ids)
    .subscribeOn(io())
    .map(i -> Network.getBeer(i))
    .filter(beer -> beer.rating > 0.8F)
    .take(1) // only request 1
    .observeOn(mainThread())
    .subscribe(this::showBeer);
    

The operator take means that we can stop after one item. None of the other IDs will be used to start a network thread. It’s concise and efficient code because now that it’s all on one line, you can have a singleton somewhere that provides the Observable to get data from the network. You can also put this in the network class if you get beer, and then somewhere else in your application where you actually want to use it, you can apply the take one operator on this Observable.

RxBinding and RxAndroid (13:19)

Definitely use RxAndroid. You see that I post to the main thread, and the main thread concept comes from Android, so you need RxAndroid. RxBinding is another really cool library that allows you to bind Rx streams to Android views. You can both use it as a source to create a new source of emissions, and you can also use it to apply.

Some people recommend to not use Rx for UI, and I agree with that as it concerns the Subscription, I still quite like it on the source. We can make one chain where we transform this user input to some action on the screen, and in the meantime, we can use a scanning operator which basically adds one plus every time the user clicks on a button. RxView.clicks basically creates a click list for you and starts emitting events, one emission every time the user clicks. You map this, and now we’re just counting the number of clicks. We map it to a string and we show it.


RxView.clicks(button1)
    .scan(0, (integer, click) -> integer + 1) // running count
    .map(i -> Integer.toString(i)) // int to string
    .subscribe(RxTextView.text(textview1));
    

Switching threads (14:59)

You could just apply a click listener, but what if you did it on a different thread? We now want to subscribeOn the computation thread. RxView.clicks will emit on the computation thread, we do this complex operation of counting one extra and then we apply… and we get an exception.

CalledFromWrongThreadException? (15:27)

Why do we get this exception? We are still in the computation thread, and we cannot modify our fields on this thread, so we add the observeOn the main thread. Now we added the main thread, and does it work?

IllegalStateException? (15:50)

No. Illegal state exception, but why? We cannot modify views on the thread other than the main thread, but we can also not apply the clicks. My recommendation generally is that you should always write the subscribeOn as the very first one.

First of all, I would apply them where you also subscribe, so if you have a source somewhere where you would just create an Observable, I wouldn’t necessarily apply operators there. Maybe to switch threads, but the moment that you subscribe, I would always put the subscribeOn as the first item.

It’s the source, it subscribes, so it starts emitting there. We use the main thread to start our emissions. And then of course, when you apply an observeOn, that’s the moment you switch to another thread, so that’s in line with your chain.

Combining UI and Rx (17:14)

There are things we can do with this RxBinding now since we have our widgets and UI events generating emissions as an Observable. Now we have a stream of view events, therefore we can maybe combine them. We have two checkboxes here and a text field so that every time user taps one of the checkboxes to turn it on or off, an emission is made. And also the text field, every time the user types something in this edit text box, an emission is made. And then we can use Rx to filter, to map it, to apply any operators we want.


Observable.combineLatest(
    RxCompoundButton.checkedChanges(agreeTermsCheck),
    RxCompoundButton.checkedChanges(noCommentCheck),
    RxTextView.textChanges(reviewEdit.map(s -> s.length() > 80),
        (agreeTerms, noComment, hasReview) -> agreeTerms && (noComment || hasReview))
            .subscribe(RxView.enabled(submitButton)));
            

In this case, if the text changes, first of all, map it to something that checks whether the length of this comment is sufficient, for example 80 characters. And then, in the end, combine these resources to say very concisely that the submit button should be enable if you have agreed to the terms and conditions and if you either checked the check box for no, I have no comment, or made a comment that’s 80 characters long.

Delayed emissions (18:39)

Another example: what if we have a text field, which is used as search input, do we want to launch a new query every time the user enters some text or removes some text? No, typically we don’t care about every individual character, so we want to wait a little bit before this user has eventually stopped typing and now we want to you know to do the actual query.


RxTextView.textChanges(searchEdit)
    .debounce(1, TimeUnit.SECONDS, mainThread()) // delay emission
    .subscribe(this::startQuery));
    

We can apply debounce operator, which waits every time that we get an emission, so in this case changes in the text. Every time, it waits up to one second before it’s emitted, and only the last item will be emitted, so if I touch, type three characters, then only the last full emission of these three characters will be pushed down the stream.

Switching threads on and off the UI (19:41)

When we directly flatmap these three characters that the user typed in, for which we waited one second, we directly start a network request, of course we have to do some threading because we are subscribing this on the main thread and we are observing this on the main thread back again and now what we have in one line of code is a mechanism where we can delay the actual network queries, do the network query on the background thread, and then show them back on the interface again.


RxTextView.textChanges(searchEdit)
    .subscribeOn(mainThread()) // on ui
    .debounce(1, TimeUnit.SECONDS, io())
    .flatMap(query -> Network.searchBeer(query.toString()))
    .observeOn(mainThread()) // to ui
    .subscribe(this::showBeers);
    

I think this is a really great example of how you can use Rx even as a source for your Observables transform it and show it again.

Some words of warning, first of all, you can see that debounce takes in a scheduler, so you explicitly have to have to give it one or it’ll pick one for you.

Another thing is that since this is called a hot Observable is because the user can still type every time that he wants, enter more text, never finishes, so there’s no onComplete necessary in this stream. The onComplete will never be called. Depending on what operators you want to apply at the end, this might be a problem. For example, if you apply the two list operator, which gathers just all the data, all the emissions, puts them in a list for you and it lists all the emissions as a list, this relies on the fact that your stream actually completes. Since our stream never completes, because the user can keep on putting new information in there, this wouldn’t work, so you’re going to have to use two lists here.

Event buses in RxJava (21:43)

Another thing that you can use RxJava for is something like an event bus. We all know event buses, they’re very simple really to use. They are definitely simple in Rx. You apply what’s called a Subject, a Subject is both an Observable as well as a Subscriber. That sounds complex, but how it works is exactly like an event list. You can do this Subject, you can submit items to it with onNext, you just call onNext on it, and this will then lead to all and every Subscriber that we have on this Subject will get this emission.


BehaviorSubject syncProgress = BehaviorSubject.create();

syncProgress.onNext(0.15F);
syncProgress.subscribe(RxView.progress(progressBar))

syncProgress.map(f -> f >= 1F).subscribe(RxView.visibility(syncedItemList)); 

So for example, in a background service, we are doing some synchronization on Android, and we have this Subject, so we push to this Subject we’re now synchronized at 15%. Everywhere in your interface that is concurrently subscribed, will get this event and perhaps update progress further than 15% or something else we can do is map it and to make sure that the synchronizer is finished 100% and only show the interface if this happens. BehaviorSubject is a specific Subject. It remembers the last value you’ve sent to it, so every time that a user subscribes to it, it will also emit the last item that you sent to it, which is very useful in this case, because the moment that you subscribe in your activity to this BehaviorSubject to be given the syncProgress; you want to have the last current, well-known stage.

Caching in memory with different Subjects (23:31)

There are other types of Subjects. There’s a ReplaySubject that remembers all everything that you emitted to it. Sometimes that’s useful.

There’s also a PublishSubject that’s more like your traditional event bus. It remembers nothing, so every time you send something to it, it’s forgotten and of course, all your current Subscribers get this event, but if something else in the future you subscribe to it, there is no information yet.

And you can see that it’s already sort of a caching going on and unwriting caching I think can be solved quite elegantly in Rx. Why do I say there’s already some caching going on? This last synchronization value is temporarily stored there. You do need to have it somewhere accessible and that last value is cached and readily available and if you would use ReplaySubject, it would even have all the values cached in the memory of course.

Implementing the ReplaySubject cache (24:35)

Sometimes in memory, you don’t want to use the Subject, because it’s not something that is living throughout your application all the time. In that case we can apply the cache operator. If we have a source, a network call again, we can apply the cache operator. What this does is basically caching every value that it receives, and so that all the Subscribers get this same value.

So here we have two Subscriptions, normally without the cache, this would make two network calls, but since we applied cache operator, we only have one network call and the same value will be sent to both Subscriptions.


Observable cached = Network.getBeer(8)
    .replay(30, TimeUnit.MINUTES).autoConnect();

cached.subscribe(this::showBeerHere);
cached.subscribe(this::showBeerThere);

Cache is implemented in Rx as the replay operator plus an auto connect. Why would you rewrite a cache as a replay auto connect?

Replays accept arguments that can say, “how long do we want to cache this?” A cache is there indefinitely, and here we may want a 10-minute cache. And after 10 minutes, it will lose this value, so you would have a new Subscription to the same Observable. It will say, “well I no longer have a value I have to make a new network request.”

Lifecycle management (26:04)

Now, I’m sure you can think of many ways how this can be used on Android, but one way we can use it is in the life cycle because on Android we have this beautiful system, where if you rotate your device, the whole activity stops and destroyed and recreated.

A brilliant choice and we have to deal with every day.

Now, how can we use this caching? Even if you don’t really want to cache anything beyond the scope of using the user having to use this activity, it would still be nice to have this value readily available on your memory if we just rotate it, do as simple as rotating the device.

What you do is simply keeping this cached Observable, or if you prefer to use a Subject, somewhere that is persistent in memory. So you could use a singleton, maybe you would have a repository pattern where you have like a network and object and every time you ask get beer, you would also cache this value. Maybe you would prefer to put everything in a fragment if fragments are your thing.

Using onRetainNonConfiguration instances(27:37)

You could put it in a nonConfiguration instance as well. What you put in is the Observable itself is cached.

How does it work? Well, we have an onCreate of our activity, normally you would just do like the Observable, threading Subscription. Now we rotate the device and we have to redo the network call.

How do we solve this? Using the cache operator is one suggestion, so we have the network Observable cached. We store this value temporarily in a local field, and then during the onRetainNonConfiguration instance we put it in there. Of course, I only have one Subject so I just could put it in there directly, but you could use a simple wrapper class that contains multiple directs. And Android will contain this during the reconfiguration of your activity and you can get it out again in your onCreate. The next time when your onCreate is called, this you get last onRetainNonConfiguration instance, which will be non-null, so this is the cached Observable. We apply it and magic, the value that you already have cached in there will be re-used.


private Observable beerCall;
protected void onCreate(Bundle savedInstanceState) {
    ...
    beerCall = (Observable) getLastNonConfigurationInstance();

    if (beerCall == null) {
        beerCall = Network.findByBrewer("Piwoteka").cache();
    }

    beerCall
        .subscribeOn(io())
        .observeOn(mainThread())
        .subscribe(this::showBeers);
}

@Override
public Object onRetainNonConfigurationInstance() {
    return beerCall;
}

So what happens if the network call was not complete, or maybe not even started yet? It will just start a network call as you would expect. So this is still all memory caching, however sometimes we want to have a little bit of a longer persistence.

RxCupboard (29:15)

What about database caching? I wrote a wrapper around Cupboard (a very simple way of replicating cursors).

It will also happily generate the whole database for you with all the tables, and do upgrades. While I like Cupboard, I wouldn’t recommend it for huge projects. In lots of cases where you have one or two tables and you just want to use plain old objects, we don’t want to write our own CREATE TABLE statements in such cases. We want minimal boilerplate.

Cupboard does rely on heavily on defaults. For example, you have to have a primary key that is _ID. Rx tables have a primary key.

This is basically all the code that you have to write to create a connection with your database to generate the tables. Technically, you have to override the new SQLiteOpenHelper yourself and call it through two methods, but it’s about four lines of code, to create this whole object. And the RxCupboard report that I wrote basically just replicates around Cupboard and makes everything Observable.

Basic queries (30:31)

You could do something like RxDB where you query all the elements from the beer class, so the beer class now automatically is a beer table in your database, and I get all the items as an Observable of beer. And this supports reactive pools, so if eventually I apply an operator and we filter and we only take one, it will not convert all the objects in your database table into POJOs. No, it will only do this for the beer that you need.

Database operations (31:24)

You can do selections of course, where statements, and you can get a specific concrete item using get. We can also store data using Rx.put. It just accepts an item from your stream and happily puts it in a database. And you could use it either as an Observable in your Subscription to use it as an action or maybe use the action as a side effect. Or you could put it somewhere in your stream and everything that goes through your stream will automatically be stored in the database, so how do we use that?

Emitting database changes (31:54)

Say we have our network call again, everything that’s just mapped through, like everything has gone through the stream, I just map it, I put it in the database and I can still do the rest with it afterward I can just apply the operators as usual. Two lists I do here, any other operator you want to do, but now we have all the data in the database.

And since we are putting and deleting always through RxCupboard, it can emit these changes itself as a new Observable source. If your code changes it will provide you any database change that will happen, or perhaps the change is only in one specific class.

So what we can do then is, for example, we want to request the actual item that has changed by calling entity, or in case we’re only interested in database and search, we do something like this. We say ofType is just like the Rx operator ofType filters only the database insertions, so what this stream produces is only the bare objects that are added to the database. So if something background process for use synchronizing and adding objects there all the time, without any connection at all, I can have my activity an Observable stream that gets all these new additions directly, readily available for you to show.


Observable changes = rxdb.changes();

Observable changes = rxdb.changes(Beer.class);

Observable addedBeers = rxdb.changes(Beer.class)
    .ofType(DatabaseChange.DatabaseInsert.class)
    .map(insertion -> (Beer) insertion.entity()); // cast :(
    

Unfortunately, due to type erasure, we need to cast here. It’s a little bit ugly, maybe I will have to introduce instead of changes, also some insertions that always only generate database insertions, then we don’t have to cast. It’s a bit ugly.

Caching in database (33:57)

How can we use that to cache instead of a memory cache then?

Well, he had this RxDB.get to have a database source, an Observable that emits all the items readily available for the database. In this case, we’re only interested in one item, and we also as a backup, provide a way to ask for the same item via the network. And if we do get to feed the network, we directly put this back into the database.

How do we combine them? We take the concat operator for Rx. We supply it with DB and fresh Observables, we subscribe to it and we’re done. So in the case that the object was already readily available in the database, it will be emitted and otherwise, we’ll have to get to the network. But there’s one thing missing here.

It would pull out too. It will still do the network request because now nothing is stopping the second Observable from starting and making this network request and we didn’t want it, so we have to say first, so what will happen is this concatenation operator, it will first start subscribing to the DB Observable source. It emits this item, the first operator says, “first, you’re only interested in one, stop now and it will never even have to start the network request.”

Using multiple sources (35:44)

A little bit differently, if you would want to use both sources, both because you have an offline data already available, but you also want to show any information that’s live, then you could do something like this: you create all the items by Brewery Piwoteka, you are from Łódź, you’re interested in all his beers, but maybe they release some new beers. So you also want this to do live network requests. We merge both together and then we get both from the database and from the live source.

What is the problem with this approach? It will emit both, so if the same item is in the database, in the network, it will emit both and you probably don’t want that. You want to do something distinct. But it can still be that the item is different. The way that Rx compares if an item is distinct, yes or no, is by using =.


protected void onCreateBundle(savedInstanceState) {
    ...
    Observable db = rxdb.query(Beer.class, "brewer = ?", Piwoteka);

    Observable fresh = Network.findByBrewer("Piwoteka")
        .map(rxdb::putRx);

    Observable.merge(db, fresh)
        .distinct()
        .subscribeOn(io())
        .observeOn(mainThread())
        .subscribe(this::showBeers);
}

Here Be Dragons (37:15)

Be very much aware of leaking Subscriptions, not every time you’re going to leak. You have to do something to leak, for example, to have a hot Observable like generating click events, maybe longer running operations. It’s sort of safe to assume that almost all cases you can leak data if you do not unsubscribe, so if you’re worried and you don’t want to deal with it, always unsubscribe, make sure it’s done synchronously, so if you subscribe in start, unsubscribe in stop and do the same in resume and pass.

You also have the RxLifecycle library, which can help a lot with this. Basically, you don’t have to manually unsubscribe, but it will unsubscribe the moment that the view disappears, but this will only obviously work if your activity life cycle is known. So if you have to extend your activity from a basic activity, but still good. Take a look at RxLifecycle.

Do not use .create() (38:19)

Another thing is to never, ever, ever create your own Observables using create. Always use from or just or because like for example if you think like oh, I’m going to generate like I have this click listener and I’m going to put this, like make sure that this is a source that I can use in an Observable, never do this in the top because basically you have to manually deal with backpressure, unsubscribing, just use fromEmitter and it’s very elegant. Actually, you can even specify manually a backpressure mode if you shoot, if there’s a problem with backpressure, should you drop the items, should you cache them or something else.

Be careful about blocking code (39:04)

Be very aware of blocking code because the top one here if I’m just going to create a source with just, I might think oh, this is brilliant, but this will not wait for the execution of the blocked call until you subscribe. This will directly execute the blocked call. Why? Well, why wouldn’t it? I mean you’re calling this code right here. It’s not inside in your class, so what you want to do is you want to defer this, historically we used to do this with the defer operator, but then you had to use defer and just. Now there is a fromCallable operator, which basically means that it will wait until you subscribe to this Observable by performing this network request.

RxJava 2 (39:53)

I already hinted that there is a release candidate 5 right now. At first, you won’t find that everything in the world has changed. However, I’ve heard that there are a couple of big differences that you should be aware of if you are interested in starting using RxJava 2. It’s not more complex or anything, except beware that we no longer have just an Observable, we also have a Flowable. They’re basically the same thing, except the Observable does not support backpressure anymore whereas a Flowable does.

Now backpressure is a complex topic I did not discuss, but if you are a little bit aware of it, basically if something at the source starts sending events up your stream and somewhere up your stream you cannot handle it all the time, you have a problem with backpressure and then either you have to drop the source emissions or you have to cache them somehow.

For example, if the user starts clicking frantically on some button or you have some other source that is going too quick for you, maybe you’re just randomly emitting numbers unless you implement backpressure in some way and RxJava 1 implements backpressure in practically every operator itself for you. RxJava 2 does the same, but only Flowable and not an Observable, and you can also switch from the one to the other. Subjects still exist, but you have Processors and Subjects are the backpressure aware version of a Processor.

Also, if you do subscribe, historically of course as we’ve seen it return a Subscription. It no longer does this. If you do need the Subscription because you want to unsubscribe (and you do), use subscribeWith instead of subscribe.

Recommendations (42:28)

Definitely use RetroFit because it’s beautiful. Not only the API itself, but in combination with RxJava.

Android-ReactiveLocation is a way to have location changes from the user, the GPS from the free-to-play services. As a source, you can really elegantly stream this every time the location of the user changes and you filter it out that the change has to be over a certain distance and you do a network request.

RxTuples is recommended, but it’s more if you’re combining Observables, you’ll run into certain types where you want to use it.

RxCupboard I can recommend because I wrote it myself. However, I have to say RxCupboard is based on Cupboard which is such a simple tool that if you want to do more serious databasing, perhaps you want to use something like greenDAO or Realm: they are both cool and adding Rx now as well.

Jack Compiler for Lambdas (44:31)

It’s true that lambdas were not supported, but Java 8 supports them and you can use either RetroLambda, which sometimes people are scared of, or Jack Compiler. Android N introduced this, but it’s backported as well, so if you use Jack Compiler, you can use lambdas in the backport since Android 2.3. My RateBeer app uses lambdas all over the place. I switched to Jack and got zero errors, and everything was happy again.

Next Up: RxJava #4 – Adopting RxJava on Airbnb Android

General link arrow white

Eric Kok

Eric works as a contractor at KBC Bank in Belgium, creating the next mobile banking app. He is also known for his open-source work published under the name 2312. He builds apps, such as RateBeer, and libraries, such as RxCupboard. He loves brewing and drinking craft beer, chatting about Android, and building with Legos with my son.

Edited by Billy Leet