360andev christina lee facebook

Intro to RxJava

RxJava is an incredibly powerful tool for making Android code easier to maintain and less bug prone, but as a new Android developer, it might not always be clear what exactly RxJava is doing and why doing things that way helps make code better. In this 360|AnDev talk we’ll discuss how things were traditionally done in Android, problematic areas with these approaches, and how RxJava can help solve some of these issues. In addition to providing the motivation for RxJava, this talk will also provide the audience with examples of the most common RxJava use cases as well as common pitfalls to avoid when integrating RxJava in a project for the first time. No prior Rx knowledge is assumed.


Introduction (0:00)

My name is Christina, and I’m here to talk to you about Intro to RX. Just a caveat before we dive in, I think a lot of intro presentations are of the form, here’s this thing, it’s really easy. Use this one or two lines and your life is better. I really, really wish I could give that presentation. I would love to give that presentation.

Unfortunately, RX isn’t something where that’s possible to do. RX is complex and it’s intriguing and it’s fun. It’s anything but simple, in a lot of respects. So, bear with me. There’s a lot of content. I’m going to do my best to make it understandable, but it’s not going to be simple. It’s not going to be one line, and I hope that’s alright.

Background (0:50)

Before we dive into what RX is, let’s take a look at the current landscape because if there is no problem, you don’t need a solution. Two things that interest us here today: Why do we need asynchronous work? And what is the state of that work on Android right now?

The first question is really easy to answer. It’s because users are our friends, and we care about them, and we want them to have a good experience. If you use asynchronous work, you can leverage the power of servers, and other really complicated back end processes that can do things that you can’t do if you’re limited to your mobile device. You can deliver awesome experiences to your users without slowing them down, freezing the main thread, or giving them janky performance.

As to the second point, there are a lot of tools for you to use. This is by no means a comprehensive listing. It barely scratches the surface of all the different things you could use to address asynchronous concerns on Android, but they’re some of the most popular ones. I’m sure a lot of you have heard of Async Task, and probably used it, but there’s also Futures, Event busses, and, of course, Observables, which we’re going to talk about today. Because this list isn’t comprehensive, I’ve linked to a More Info blog, if you want to go check that out after the talk. It goes through everything on Swift and Android, or almost everything.

I do want to caveat that when I say Futures here, I will use Futures in the canonical sense by just saying Futures. I do not mean the built-in Futures. I mean ListenableFutures. There is a lot wrong with the built-in version of Futures that can really trip you up. Here I’m using Futures to represent ListenableFutures, which use Futures under the surface, but add a lot of nice functionality to it.

We have a lot of tools, and because we have a lot of tools and we’re only going to end up using one or maybe two, we need a way to evaluate those tools. I’m not going to stand here and say that I have the answer for what the best evaluation metrics are, but I’m going to offer my perspective on it. Three of the things that I think about are when they run, how they run, and whom they affect. When I look at these three things, it really gives me some perspective on what tool I should use.

If we look at Async as an example, you can notice that AsyncTask need to be executed explicitly. This is awesome because it means that we have control of when it starts. The work can’t easily be modified in AsyncTask though, so in terms of how they run, you kick it off, and if you do want to compose things, you can get into a couple levels of callbacks. It’s not ideal, it’s hard to bring things together. In terms of output, a lot of the data is very “side effect”-y.

If you’re not familiar with the term side effect, one, it’s an awesome concept. I would encourage you to go look it up, but two, it has a corollary to what we use in the medical sense. If I go and take a cold medicine, the goal of that medication is to cure me of my cold. If it turns me green, that’s a side effect. That was not the goal. I don’t want to be green.

Similarly here, if you try to do something, and then it reads from the network, or it hits state, and that’s non-deterministic, that’s considered a side effect because you can’t always know exactly what’s going to happen based on your function signature and the like.


private class SomeTask extends
		AsyncTask<ParamsType, ProgressType, ResultType> {

	//some implementation here
	protected ResultType doInBackground(ParamsType params)

	protected void onPostExecute(ResultType result)
}

new SomeTask().execute(params)

If we look at the code for this, you can see that bottom line is where we’re executing this explicitly. You can see that there’s a doInBackground there, where we kick something off, and then once that finishes running, we get that callback in the onPostExecute, but it’s not quite sure how you would compose things from there, or change them once you kick it off.

If we have evaluation metrics, we should probably have some ideals that we’re shooting for, because otherwise what’s the point of having metrics? I’m going to offer some ideals, which I think we should shoot for.

First, explicit execution. If you can start something, you can control it. This is not always true with Futures, wherein if you create them, they’re running. You want to have as much control as possible. In this case, it’ll help you because you can create them and you can play with them before they ever kick off.

You also want easy thread management. This is a talk about asynchronous behavior. Thread management is key if you want it to be easy. You want them to be easily transformable and with as few side effects as possible.

I’m going to dive into each of these, but the basics are the same. Explicit execution means that you can play around with these observables. You can create them, store them, modify them, and right when you get them perfect, right when they have all of the data that you need, they look exactly the way you need, you can kick them off and not a moment before. Again, in terms of thread management, this is asynchronous work. If you can’t easily switch threads, or understand which thread you’re on, that’s bad because you’re probably going to make mistakes because of that.

We want that to be easy because it’s readable and because it’s better for you as a coder. In terms of easily composable, it would be great if asynchronous work was one and done. We kick something off, network requests, it relies on nothing, we don’t need to do anything when it comes back, we do it and forget about it, and great. But, the world doesn’t work that way, and often things are interdependent, and if they’re interdependent, you want to be able to play nicely together. If you hit the cache and then the network, you don’t want to have intermediate steps there. You want to be able to chain those together in a way that makes sense. If your asynchronous library is easily composable, it’ll introduce less room for error, and that’s a good thing, that’s a win for us.

Get more development news like this

Lastly, minimizing side effects. The fewer side effects you have, the more easily you can reason about your code, the more easily you can read it, and you can bring someone on to understand it as well because it doesn’t depend on state, it’s deterministic, and it’s just much easier to talk about.

Rx (6:46)

I set these standards. You probably know where this is going. I personally think that RX meets a lot of these and I’m going to tell you why. RX is explicitly started. You have to subscribe to it if you want anything to happen. Before that time, you’re still building or creating or storing, and things aren’t kicked off yet.

It is very easy with RX to understand which thread you are on because there are single line operators, subscribeOn and observeOn, where you can assign the thread. If you’re reading code, you’ve seen observeOn IO thread, you know that’s happening on the IO thread. They’re easy to transform, and to filter, and to map, and to do all sorts of other operations on. This is where they really shine. They’re also really good at minimizing side effects. This is asynchronous work, so they’re never going to get rid of them. You’re hitting the network, you’re doing very state bound stuff by definition, but they do a really good job of minimizing that.

Fast facts about RX. One, RX is for asynchronous events. It’s for composing them in particular. Two, RX stands for Reactive Extensions. Three, RX comes in a lot of different flavors. I’m giving a talk today in the context of Android for RxJava, but you can do this in JavaScript. You can do this in Swift. You can do this in many different flavors.

The three key points of RX are the ones that are on screen now. RX has observables, it has LINQ queries, and it has schedulers. LINQ queries are based on these concepts, even though it doesn’t explicitly use them. Between the observables and the queries and the schedulers, we get the world that is RX.

To dive in there, observables represent asynchronous data streams. This is usually a part that’s hard for people to understand when they first come to RX, so I’m going to really dive into that later. But for now, just assume that they’re streams. Queries allow you to compose those streams with operators, and schedulers, unsurprisingly, allow you to manage concurrency within those streams.

To review: These are the three core parts of RX. We can represent asynchronous data streams, we can query and combine those streams with operators, and we can manage the concurrency of those streams with schedulers.

Observables (9:12)

To jump into the first piece of that: Observables. Observables, like I said, are streams of data. They’re pull based, with the caveat that you can use some more advanced features in RX to do some push based stuff, but in general, they’re pull based. You can create, store them, pass them around, etc., because they are explicitly executed. That last point is what I really, really want to drive home.

Observables are here because they help us hide away all of the complexity that can come up with threading and synchronization. It’s not that that complexity is gone, it’s that this library will handle it for you, and we hope that they’ll handle it better because that’s their full-time job. RX is usually very difficult primarily because of the concepts of streams and observables.

I was talking to some people last night. I asked them how they thought about streams, how it finally clicked for them, and someone said the Factory Analogy is what really made this make sense. I’m going to throw this out here. Hopefully it’ll help some people. If not, just think of them as streams, but hopefully this will help.

The Factory Analogy says that you start with some raw material. Maybe you’re making cars, so you have steel come into your factory. And this is the raw material, this is our network request. We’re starting somewhere. And then, once you have that raw material, you send it down a couple of conveyor belts. You might pound it into sheet metal, or shape it into the car’s body, or do something like that. You also might introduce some other resources from a different factory. Maybe you need rubber to make tires. As you progress down all of these conveyor belts, you shape it, and you add to it, and eventually, when this car gets pushed out the door, you have your final product, which in our case is where we get the data out of the stream.

So we put something in, some raw material, we shape it in many different ways, and then at the end, we get the product that we want.

Two key phases of the observable lifecycle, as I’ll call it, is you want to put data into them, and you want to get data out. Why else would you have a stream? This is for data. Looking at that first part: putting data in. There are a lot of different ways that you can do that, and I’ll show you a few of them. The simplest is by saying observable.just.


Observable.just("Hello World!")

Here is a string, I only want this, so I’m going to put that thing in.


val names: Array<String> =
	arrayOf("Christina", "Nicole", "Alison")

//Will output Christina --> Nicole --> Alison --> X
Observable.from(names)

The second, you can use observables with iterables. I’ve made an array here of me and some of my friends’ names: Christina, Nicole, and Alison. I can say Observable.from this iterable, and what would come out of that observable, is Christina, Nicole, Alison, and then it’s done.


String[] names = {"Christina", "Nicole", "Alison"

//Will output: -> Christina -> Nicole -> Alison -> x
Observable.from(names)

The most heavyweight of any of the creations is using .create.


Observable.create<String> { s ->
	s.onNext("I created an observable!")
	s.onCompleted()
}

You’ll see here, that what happens is, I get some subscription S, and I’m particularly managing what’s being sent down. In this case, I say subscription, send in onNext with this string I created in observable. I don’t want to put any more strings in there, so let’s just tell you that you’re completed. Then I say subscription, you’re complete. This is an observable right here. Nothing has yet happened, but I’ve built one, and the behavior of it would be, it emits the string, and then it completes itself.


Observable.create(new OnSubscribe<String>() {
	@Override
	public void call(Subscriber<? super String> subscriber)
		subscriber.onNext("I created an Observable!"
		subscriber.onCompleted();
	}
})

You might notice that that was not in Java. This is actually written in Kotlin. I will be using Kotlin so I can fit all the code on the slide, but I’m going to try to explain as much as possible.

There is also that onComplete there, and that’s a little bit crazy because we’re doing all of these onNexts, and we were using arrays, and all of a sudden, there’s onComplete. What does complete even mean? That’s not even a putting data in thing, that’s a getting data out thing, and you’re right. This is a really opportune time to switch over to the getting data out part of observables because without understanding how you receive data on the other end of a stream, it’s really hard to understand how you should put data into the stream.

What do we need from a stream? Primarily, we want data. We want a way to say, hey, what’s the next piece of data? I hit the network, what did I get back? We also want to know, is there anything left to do? Because if there’s not, I’m done, I’m going to move on, I’m going to recycle my assets. We really want to be able to know: should I still care about this stream or can I move on?

Lastly, because the world is not perfect (the bane of our existence), did any errors happen that I should know about? What does that look like in code? When I want the next value, I can call onNext. When I want to know if something’s completed, I can call onComplete, and when I want to know if something’s error-ed, I can call onError.

Hopefully these names make a lot of sense. They’re very readable. It’s one of my favorite parts about RX. Nothing should surprise you here. Next value, onNext. Completion, onComplete. Error, onError.


Observable.create<String> { s ->
	val expensiveThing = doExpensiveComputationHere()
	s.onNext(expensiveThing)

	val otherExpensiveThing = doOtherExpensiveComputation()
	s.onNext(otherExpensiveThing)

	//all done
	s.onComplete()
}

What does this look like in practice? Again, we have that top level function create, so we’re saying observable.create. In this instance, I’ve typed it with string, so this would be an observable that emitted a bunch of different strings. You could, you know, put a different type signature in there.

Again, reading this Kotlin as if it’s pseudo-code, we have some subscription, and I have a function that’s very expensive. Maybe it’s reading from disk or something. Let me go ahead and do that and get the value back. Once I have that value, let me put it onto that stream by calling onNext. We call the function, get a value, and we say, s.onNext that value. Maybe I’m not done, maybe now I want to hit the network, or I want to do another disk read, or who knows what. There’s some other expensive function again. I’ll call that, and then I’ll get the value, and I can push that on the stream as well using onNext again.

Maybe I only have those two expensive things, maybe I have more. In this case, we’re done after those two calls, so then I’ll tell the stream, it’s complete, and I’ll say onComplete. To go into each of those steps a little bit more, you can call onNext as little or as often as you want. Now, I have an asterisk on the screen because it’s not entirely a truthful statement. And to that end, I’ve linked below to some performance issues that you can encounter if you do this too much. For an intro to RX talk, you probably will not run into that, but it’s something to note, that if you’re doing really heavy processing, you could run into some performance things, and you want to be careful with this, and handle them correctly.

Pop quiz with onNext. I really want to make sure people get it. Can anybody tell me what this stream looks like? What would the output be? It’s not a trick question. It’s really as easy as you think it is. Two and three, exactly, that’s it! It just puts two, puts three, and then it’s done! Similarly, output for this. It’s going to look like apple, banana, orange, pineapple, and I’m done. And to really drive this point home, you can do this as well. You can build up some binary numbers. It’s going to look exactly the same. You’re going to output those one by one, and then the stream’s going to close. I feel like there’s a lot of mysteriousness around onNext. It really is just that simple.

Moving on to onError. It’s really key, this one point that I’m about to make. If there’s one thing that you take away from this, I hope that it’s this because it’ll save you a lot of trouble. onError and onComplete both end streams. They are both terminal events for streams. The only difference between them is that when you call onError, the stream is ending because something went wrong; when you call onComplete, the stream is ending because you finished your work, there’s nothing to do, move along. But they both end the stream, and the only difference is why the stream is ending.

One great thing about onError in RX is that all errors are handled in a single place. You can combine a bunch of different observables, any error that happens will be surfaced in your onError. If it happens far upstream, if it happens downstream, it’ll all come to you in the form of the onError. This is different from things like Futures, which have individual success and failure callbacks, and then maybe you need to tell another Future about this Future’s error, so how do you propagate this, and blah, blah, blah. This propagates down to the lowest level, all errors are here. If you find an error, it’ll be here. Look here if you want errors. That’s the gist of it.


Observable.create<String> { s ->
	try {
		val result = functionThatMightError()
		s.onNext(result)
	} catch (e: Error) {
		s.onError(e)
	}
	s.onCompleted()
}

Returning to that first point about terminal events, here’s an example that I want to walk through to just drive home this point. So, we have an observable, we’ve created it. We put some try-catch in there because there’s a function that might error. I don’t know what that’s doing, maybe it’s hitting the network, but it could error. So, in the case that we assume that that function succeeds, we call it, we have an onNext, it doesn’t hit the cache, and then we do onComplete. So that output, it calls onNext with whatever value that was, and then it calls onComplete. But this is not the case if that function errors.


Observable.create<String> { s ->
	try {
		val result = functionThatMightError()
		s.onNext(result)
	} catch (e: Error) {
		s.onError(e)
	}
	s.onCompleted() //<--NOPE
}

This is exactly the same observable creation as before, but this time let’s assume that function that might error, does error. You might expect that it would call onNext, and then onError, and then onComplete. But remember, onError is a terminal event for streams, so what you get is onError. That’s something that’s probably unexpected, so take a moment, let it sink in. After onError that stream is complete, there’s nothing to call onComplete for, because that’s the successful stream ending. We’ve already ended it in a non-successful way. onComplete really is what you think it is. It tells you that the stream is done. Go ahead and clean up after yourself, recycle anything that you’re holding onto and then move on.

A review: We have our onNext, we have our onComplete, and we have onError. Each of those do what you would expect them to do: propagating the next piece of data, telling you that a stream is complete, or telling you that some part of that stream had an error, and here it is.

If you remember when I talked about LINQ queries, the key point there is that the thing that really distinguishes observables from a lot of different approaches to asynchronous work is that they really shine in terms of being able to compose them and transform them in different ways that you need to in your code.

Operators (21:40)

We’re going to dive into operators right now. There are so many operators. I couldn’t even begin to tell you all of them. You have operators from filtering, and for taking a few and then skipping a few, and for mapping, and for FlatMapping, and for anything that you could imagine, there are operators for that. What do they look like in practice?


Observable.from([1,2,3,4]).map { num ->
	num + 1
}

Output: 2 --> 3 --> 4 --> 5

If you look at this screen, the observable.from of an array is exactly what we saw earlier in the presentation. If we have that observable on its own, it would emit the numbers 1, 2, 3, 4, and then it would cease.


Integer[] numbers = {1, 2, 3, 4};
Observable<Integer> obs = Observable.from(numbers).map(
	new Func1<Integer, Integer>() {
		@Override
		public Integer call(Integer integer) {
			return integer + 1;
		}
	}
);

//Outputs: 2 --> 3 --> 4 --> 5 --> x

It would call its onComplete. Here though, we’re adding one extra step, which is a map. This map has a variable that comes in, so the function is given some variable called number. What we want to do is we want to say, return number plus one. One will come down, and we’ll have one, and we’ll say, but we don’t want one anymore, we want one plus one, so two will be emitted. Then two will come down the stream, and we’ll have two plus one, so three. And all of a sudden, an observable that used to emit 1, 2, 3, 4 now emits 2, 3, 4, 5.


Observable.from([1,2,3,4]).filter { num ->
	num % 2 == 0
}

Output: 2 --> 4

We can also do filter. Here, we have the number, and we return a Boolean expression, which is if this number mod two is equal to zero then pass it on. If it’s not, then that doesn’t get emitted on the stream. Here mod two is one, so it’s not going to get passed on. Two mod two is zero, so it will. So our output goes 1, 2, 3, 4; and after the filter, becomes 2 and 4.


Integer[] numbers = {1, 2, 3, 4};
Observable<Integer> obs = Observable.from(numbers).filter(
	new Func1<Integer, Boolean>() {
		@Override
		public Boolean call(Integer integer) {
			return integer % 2 == 0;
		}
	}
);

//Output: 2 --> 4 --> x

This is just a “give me evens” number function. We can also do things like combining and merging observables.


val schoolFriendsObs =
	Observable.from(arrayOf("Mo", "Dave"))
val workFriendsObs =
	Observable.from(arrayOf("Nicole", "Alison"))

val allFriendsObs = Observable.merge(
	schoolFriendsObs,
	workFriendsObs
)

I don’t have that many friends. They all fit on the slide. You can see the first array is my school friends. I have Mo and Dave. They’re great. Then I have some work friends. I have Nicole and Alison. There’s an observable for them too. Then at the bottom, I combine all my friends. Let me create an observable from the merged output of my school friends and of my work friends. This output looks like this.


//Note: Output is only in the same
//order because input was synchronous
I/RxKotlinHelper: onNext(Mo)
I/RxKotlinHelper: onNext(Dave)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)
I/RxKotlinHelper: onComplete()

We have Mo, we have Dave, we have Nicole, we have Alison, and then it’s done.

Schedulers (24:42)

All the elements of both observables are represented in this output, and then we complete because there’s nothing left to process. Now, I left a note here for you. The reason these are in order is because it’s an observable with names that are strings, so it’s synchronous. If you’re doing network stuff, it’s perfectly valid for it to be Nicole, Mo, Dave, and then Alison. They don’t have to be ordered like that.

Moving on to that last part, the schedulers. This is where it gets fun because we are talking about asynchronous work. If you do not subscribe, nothing happens. Again, you can see that asterisk because I’m lying to you. There’s this caveat here. Advanced performance stuff. You probably don’t need to know about it right now, but there are a few considerations for this if you are doing something that really matters for it.

But in our context, in intro to Java context, if you do not subscribe, nothing happens. All of those slides in the past where I was saying observable.create thing, then I put output, that was not totally true. That was the output that those observables would emit, but they did not actually emit it because I never said subscribe on them. I created an object that if you were to subscribe to, it would’ve emitted those things.

Something about subscribers. They can take a different number of functions. They can take no functions if you want to kick off that work and you don’t care about what it returns. They can take three functions, if you want to give it onNext, onComplete, and onError. A maximum of three. A minimum of zero.

I’m going to stand up here and really strongly suggest that you always pass at least an onError, because if you don’t, your stack trace is going to be completely unreadable, and you’re really going to hate yourself. Please, at least, pass an onError to it. It’s really important, if anything goes wrong, for you to be able to understand what went wrong.

Then, in terms of saying where work should execute, we have these two different operators that we can use, which are subscribeOn and observeOn and I’m going to go into those later. But just know that that’s how we’re going to manage where work is run.

What does this look like in code?


val names = arrayOf("Christina", "Nicole", "Alison"

Observable.from(names).subscribe(
	{ next ->
		Log.i(TAG, "onNext($next)")
	}
)

I have an array with the same names as always. Say observable.from this array, and then that last line is what’s new. I say .subscribe, and then in this case, I’m passing it only one function. I’m passing it an onNext function, and that line there says, the next piece of data I get, print it out. That’s it.

The output looks like this:


I/RxKotlinHelper: onNext(Christina)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)

It prints the names and then nothing else happens. Now, if we want to be more rigorous, we can pass all three functions.


val names = arrayOf("Christina", "Nicole", "Alison"

Observable.from(names)
	.subscribe(
		{ next ->
			Log.i(TAG, "onNext($next)")
		},
		{ error ->
			Log.i(TAG, "onError($error)")
		},
		{
			Log.i(TAG, "onCompleted()")

This is a full function suite. We pass it onNext, which is that first function there. We pass it onError, log the error, and then that last one is onComplete, and we’ll log onComplete. This output should look familiar.


I/RxKotlinHelper: onNext(Christina)
I/RxKotlinHelper: onNext(Nicole)
I/RxKotlinHelper: onNext(Alison)
I/RxKotlinHelper: onCompleted()

We’ve added one line here though, which is, it does the onNext, so as it used to, but now we’ve added an onComplete, so when the completion event happens, we have a print. There’s that extra print at the bottom that says onCompleted.

What I have thus far ignored is that .subscribe will return a subscription. You may not need to use that subscription, but it should shock no one that if you have a reference to a subscription, you can call unsubscribe on it, and that looks a little something like this:


val sub: Subscription = SomeObs.subscribe{ next ->
	Log.i(TAG, "onNext($next)")
}
sub.unsubscribe()

I call subscribe on some observable, let’s say it’s the one I created on the last slide, and then I print something that happens in onNext. Again it would print out names or something, and then if I don’t care about that work, and I don’t want to listen for it anymore, I can call unsubscribe because I have the reference to this subscription. You don’t need the reference, but it’s really good in terms of cleaning up after yourself if you keep hold of it when you subscribeOn something.

This is the meat of observables and of schedulers and of all the subscription processes. What about the threads? Where is my work running?

To recap, we have these two operators that we can use to tell observables where they should be doing work. subscribeOn should only be used once. You can add it more than once, but it’s not going to matter. The subscribeOn that happens closest to the point at which you create your observable is going to take precedent. If you call subscribeOn in a first line on main thread, and then 10 lines down, you call it on the IO thread, it will subscribe on the main thread. It’s going to ignore anything downstream of that. You should use it once, and that’s it. It defaults also to the thread if you do not explicitly say where it should subscribe on. It defaults to the thread where you create the observable. Oftentimes this is main thread, but if you create an observable on a computation thread of some sort, you should probably put a subscribeOn to make sure it’s running where you want because it will default to the computation thread as the place where it runs.

The key point about subscribeOn is that that first block of code is always going to be executed on your subscribeOn. At the point at which you subscribe, code will start running on that thread. Maybe down the line, you’re doing other stuff, and you want to change the thread with an observeOn, that’s fine. But subscribeOn tells you where your code starts executing. That very first block will always run on subscribeOn until you get a chance to change it, if you so choose.

That leads us to observeOn. Maybe you’ve subscribed on one thread, maybe it’s a computation thread, but now you want to surface something and put it in the UI, so you want to switch back to the main thread. That’s where observeOn comes in handy. You can use this as many times as needed.

There’s the dreaded caveat again. There’s some backpressure issues. Again, if you’re not doing really performant required things, you probably won’t run into this, but if you are, it’s in the notes.

It is important to understand with observeOn that it affects everything downstream. If I put an observable on line two of a ten-line observable chain, it will effect lines three through ten. Unless I put another observable on line six, or observeOn on line six, in which case, that observable now will effect everything downstream of that. Everything below your observeOn will happen on whatever thread you tell it to observeOn. And we can look at that in code.

Subscribe/Observe on (30:33)

I’m going to explain this line by line.


Observable.from(arrayOf("Red", "Orange", "Blue"))
	.doOnNext { color ->
		Log.i(TAG, "Color $color pushed through
				on ${Thread.currentThread()}")
	}.observeOn(Schedulers.io()).map { color ->
		color.length
	}.subscribe { length ->
		Log.i(TAG, "Length $length being recieved
			on ${Thread.currentThread()}")
	}

We start with an observable. You’ve seen this a million times at this point. It’s going to emit a bunch of colors. That’s it. Now I’m going to add a doOnNext. You haven’t seen this yet, but doOnNext is somewhat self-explanatory. When onNext fires, I’m going to do this function that I provide. In this case, the function that I provide is just a log. I want it to print out what thread it’s occurring on. That’s all that’s going on there. Print the thread when onNext happens.

Now, maybe something else needs to happen, and I don’t want it to be on that thread anymore, so I put an observeOn, and I say, let’s switch our thread over to IO. Then I want to do something like map it. I have a string of colors. Maybe I only care about the length of the string. That’s what’s important. I’ll say: take this string, and then, instead of having a string, give me back the string’s length instead. Then you get the full finished product. I add that subscribe and it prints out the thread that I’m finally getting my output on. What does this output look like?


I/Rx: Color Red pushed through on Thread[main,5,main]
I/Rx: Color Orange pushed through on Thread[main,
I/Rx: Color Blue pushed through on Thread[main,5

I/Rx: Length 3 being
	recieved on Thread[RxIoScheduler-2,5,main]
I/Rx: Length 6 being
	recieved on Thread[RxIoScheduler-2,5,main]
I/Rx: Length 4 being
	recieved on Thread[RxIoScheduler-2,5,main]

It starts on the main thread because I didn’t give it a subscribeOn, and I wrote this observable on the main thread of my sample app. After I switch over to the IO thread, unsurprisingly, it starts emitting things on the IO thread. You can see in those bottom three lines, it says that we’re on the IO scheduler. Now, let’s say I want to use a explicit subscribeOn.


Observable.from(arrayOf("Red", "Orange", "Blue"))
	.doOnNext { color ->
		Log.i(TAG, "Color $color pushed through
			on ${Thread.currentThread()}")
	}.observeOn(Schedulers.io())
	.map { color -> color.length }
	.subscribeOn(Schedulers.computation())
	.subscribe { length ->
		Log.i(TAG, "Length $length being recieved
			on ${Thread.currentThread()}")
}

I don’t want this to be run on the main thread. This is exactly the same observable we just built up line by line, but there’s a subscribeOn call there, and I put it on the computation thread. How does that change things?


//Output trimmed to fit

I/Rx: Red pushed on Thread[RxComputationScheduler]
I/Rx: Length 3 recieved on Thread[RxIoScheduler]

I/Rx: Orange pushed on Thread[RxComputationScheduler]
I/Rx: Length 6 recieved on Thread[RxIoScheduler]

I/Rx: Blue pushed on Thread[RxComputationScheduler]
I/Rx: Length 4 recieved on Thread[RxIoScheduler]

Now you can see that the very first work that was done, it was done on the computation scheduler. After that, once we switch over to the IO thread and we map it to the links, that’s happening on the IO scheduler. This is one of those examples where you can see that we can start work in one place, we can switch over to a different scheduler, and it can be done elsewhere. I can also add a different observeOn to put it on main thread and you would see that ripple through the output as well.

Rx and Android (32:46)

RX and Android. I haven’t talked about Android at all. I’ve been going through observables and RxJava in general. Let’s talk about how this can impact your life as Android developers. Again, this is just like operators. I can’t tell you all the ways you can use RxJava in your app because the sky’s the limit. Whether you should be using them in places is a totally different question, but you can use it in many different places.

Here are a few somewhat contrived examples. You can bind clicks, and then filter for a certain quadrant, or you can see if people double tapped by using an aggregator on your observable, given a time frame. You can FlatMap cache hits with the network. You can handle auth flows in a single stream, so you get user input and you send it to the network, and then you get a response back. Maybe hit the network a second time, fetch user details. All of that can happen in a single stream.

I’m going to walk through a few examples, but before I do, I just want to say, they’re here to give you flavor on what this can look like in an Android app. If you don’t understand all of them, that’s okay. The idea here is to get a notion of what is possible.


fun Button.debounce(length: Long, unit: TimeUnit) {
	setEnabled(false)

Observable.timer(length, unit)
	.observeOn(AndroidSchedulers.mainThread())
	.subscribe {
		setEnabled(true)
	}
}

Here’s an example of debouncing a button. I’m not sure why you would need to do this, but you can do it! What’s happening here is I’m adding something to a button class called debounce. I can give it some unit of time and then I can start with saying setEnabled to false. I’ll set a timer observable. It’ll fire. When it fires, I setEnabled to true.

In the time that setEnabled is false, users won’t be able to click on this button. Then after my time has elapsed, and onNext has been called, and Enabled(false) is turned to true, then users will be able to click again.


fun size(): Observable<Int> = Database.with(ctx)
	.load(DBType.photoDetails)
	.orderByTs(Database.SORT_ORDER.DESC)
	.limit(24)
	.map { it.size() }

I used to work for a photo-sharing app and we had a bunch of photos stored in our database. One case that we used - this is a pared down example of it - is I would get my database with the context, and I would say: hey, load up some photo details for me. I really care about the most recent ones though. Can you order those by time stamp for me? And then, oh, by the way, I can’t deal with that many at once, so just give me like the last 24. That’s all I really care about. Then, maybe all I care about is the size of those photos. I want to do something with the UI, who knows. Then that last line says, hey, instead of photo details, give me the size of the photo. Map it to its size.

This may look very dense. It is. If you try to do this without RX, this would be a five page slide. It would just go on and on. I would be flipping forever. The fact that you can do this in six lines is incredible. This is really complicated stuff and it’s pretty readable.


return Observable.create<Unit> { s ->
	val outputFile = writeOutputFile(mediaFile)

	when (type) {
		Type.Photo -> addPicToGallery(ctx, outputFile)
		Type.Video -> addVideoToGallery(ctx, outputFile)
		else -> {
			s.onError(Error("Unexpected download type!"))
	}
}
s.onNext(Unit)
s.onCompleted()

You can do the canonical example of IO. You can maybe download something. We have some function that writes an output file. I don’t know what’s going on there, but we get a file back. Maybe we check the type. If it’s a photo, put it somewhere. If it’s a video, put it somewhere else. If it’s none of those, then what is going on here? Throw an error. And then, in this case right here, we’re not sending data through the stream because we don’t care.

What we’re using this observable from, is that toast that comes up and says “download completed” or “download failed.” We don’t need to send that file down the stream because we have no use for it. All we care about is, did this complete or did it fail?

You’ll notice there that we have the onError. If it errors, if we don’t know what type of file, we don’t know what to do with it, and if not, we just fire onNext with no argument. Unit is the null type here, so it’s saying fire onNext, but I’m not going to give you any argument to your onNext function. It’s just going to exist. Then, we say, we’re complete here. Wrap it up. You can see also there’s a subscribeOn IO, so you can put a subscribeOn.


fun codeObservable(): Observable<String?> {
	val filter = IntentFilter(SmsUtility.INTENT_VERIF_CODE)
	return ContentObservable.fromLocalBroadcast(this
		.map { intent ->
			intent.getStringExtra(SmsUtility.KEY_VERIF_CODE)
	}
}

There are also ways to lift things from local broadcasts. All of those apps that give you codes, they text you codes, and then you have to input them into your app, and that’s how you authenticate. Android’s great because you can scare people by just reading their text messages. This is an example of that happening.

Someone sends you a code. You auto-read their text message, pull that code out and get it in a LocalBroadcast. But LocalBroadcast can be a little bit clumsy, so let’s go ahead and transform that into an observable. What’s happening here is we have the IntentFilter to get the verification code and we give that to a ContentObservable.fromLocalBroadcast.

We don’t need the intent here. That’s not what we’re interested in. What we really care about is that verification code, so I can make sure it’s the right one. That last line says: you gave me an intent, but really all I care about is the verification code. Pull that out of that dictionary and you can see that the return type is a String?. That means that String could exist or it could be null. If that key doesn’t exist in my intent, it’s going to return a null. If it does exist, it’s going to return the code, and I can do further checking.


timedAuthObservable
	.observeOn(Schedulers.io())
	.flatMap { code ->
		userModel.sendVerifyResponse(code)
	}.flatMap {
		userModel.getSuggUsername().onErrorReturn {
	}.observeOn(AndroidSchedulers.mainThread())
	.subscribe({ suggestedUsername -->
		//update UI with suggested username
})

You can also build up really complex onboarding flows. This is pared down in pseudo-code, but a timedAuthObservable is that situation that you run into where you input your credentials, and it’s seeing if they’re valid, so you see the spinner. But then maybe you’re in bad network, so it will come up with connection failed, and you hate yourself. That’s a timedAuthObservable.

You can put that on the IO thread because you’re hitting the network. Then maybe you want to do things downstream. What this is, is not important. I’ve added some pseudo-code, like you want to verify your response, and then now that you’re logged in, you want to get a suggested username, so they can finish onboarding with their suggested username.

What’s going on here is less important than how it’s going on, which is that your entire auth flow is now on these eight lines of code, and you’re managing where that code is running very easily by using observeOn and subscribeOn.


//Verify with backend, then prepare data for UI
override fun getVerifiedData(code: String):
Observable<Unit>

	return UserService.noAuthClient
		.verifyUser(authToken, code)
		.flatMap {
			UserService.authClient.fetchUserDetails()
		}.map{ data ->
			loadableUserState.loadFromData(data)
		}.observeOn(AndroidSchedulers.mainThread())
}

You can also do things like this. If you’re not authed, kick off an IO event to get authed, and then once you’re authed, you can then call a function, now that you’re authed, that hits an authed response. This is not easy to do without observables because you have something that’s getting the authentication and then you store that, and then you kick off this totally separate task that does the authed result for you.

Here it doesn’t matter that you’re un-authed to start because by the time you get to that FlatMap user service auth client, you are authed because that’s the way streams work. You go from the top of the stream to the bottom, so that by the time that function gets hit, you’re authed. Or an error was thrown, and you’ll know that the error is thrown. This is really powerful and may not look like much, but you can start out as a not logged in user, and you can compose that with logged in stuff downstream after your IO.

Conclusion (39:56)

Let’s summarize. I’m not here to tell you that RX is the only game in town and that it’s always the right solution. As you’ve seen in this presentation, there’s a lot of things that are non-trivial about it and that can be confusing. That’s not my point. My point here is that RX is one of the most powerful games in town. While it may not be right for everything, if you’re doing really high performance stuff, if you’re doing stuff where you hit the network, or you’re reading from disk a lot, or you’re doing a lot of asynchronous work, it really can be the right tool for the job.

It’s a heavyweight, but that comes with a lot of benefits, which is that it can do a lot of stuff. There’s a steep learning curve. I get that. It sucks. It’s like walking on glass as we were talking about last night. It’s unpleasant. But once you master it, it can really help you reduce errors.

Once you learn to think in this way, things become a lot easier to read because they’re mapped together in a logical chain of events. You can see that something hits the network, gets transformed, and then is put on the UI thread to be used by your UI. And you can see that all in one place. It’s not going off and disappearing somewhere. You’re not storing it in state. It’s all in a chain. That chain represents an entire action and you aren’t introducing extra opportunities for errors by using stateful storage.

Lastly, I get that streams can be a huge paradigm shift. We don’t think like this often because it’s not how most apps are built, but if you can switch over to thinking of data as streams, your entire app’s data flow can be brought to the surface. Right now, when you’re using other async primitives, what can happen is you send something off to a thread here, and then maybe there’s a thread here, and then this activity is doing something, and you can have asynchronous stuff going on in a bunch of different places.

What happens if you use threads is that it forces those to be connected and so you can easily write a GUI. These already exist, where you kick off a network event, and you can see exactly where it flows through your app, all the way to the point that you stick it in your UI, or put it somewhere else to be used later. The first call to the network, the transformations when you’re bringing in other data, or filtering data or reducing data, all of that can be surfaced in a GUI. That’s powerful because it gives you a way to talk about your app that everybody can understand.

If you have a road map, you know where you’re going, and that’s really useful, not just for developers, but for product people. To tell them how the pipeline works, where you’re getting your data, how it’s changing, and where it ends up.

Lastly, this is the best summary that I can give you about RX. If you have a car, if you have some old clunker of a car, and all you’re doing is going to the store, that’s absolutely fine. The streets are what? 25 MPH speed limits? You can make that. Maybe the seat are not so comfortable, but you can adjust to fix that. Maybe it doesn’t start up, and you have to jiggle the key, but that’s fine. It works, and you get there, and it’s great.

However, if you have to go cruising for road trips, or you’re getting out on the freeway, I guarantee that if you bring a Tesla, and you bring an old car, you will notice the difference because the power is there, the safety is there. They’ve learned a lot in the last 20 years about driving, so that Tesla, I mean have you seen its safety record? It’s world renowned. And that’s because they’ve watched what’s happened before, they’ve corrected for those mistakes, and they’ve built it into new technology.

If you’re going to the store with your Tesla, you probably won’t notice that much of a difference because 25 MPH is not what that car is built for. You don’t need to go zero to 60 to get to the grocery store. It may not be your best option. But if you’re driving across the country, if you were doing something bigger, something more complicated, something longer lasting, if you were doing something more monumental, the Tesla is what you want. It fits that use case. It’ll make your life more enjoyable. You’ll be safer doing it, and it’s just going to be, quite frankly, a lot more fun.

Thank you!

Next Up: RxJava #2 – RxJava for the Rest of Us

General link arrow white

About the content

This talk was delivered live in July 2016 at 360 AnDev. The video was recorded, produced, and transcribed by Realm, and is published here with the permission of the conference organizers.

Christina Lee

Christina is currently the Android lead at Highlight. In addition to trying to convince the world to use Kotlin on a daily basis, she also enjoys building beautiful UIs, extolling the virtues of Rx, and reading well documented APIs.

4 design patterns for a RESTless mobile integration »

close