Mobilization deep cuts header

RxSwift: Deep Cuts

In the last few years, reactive programming ideas have taken the iOS community by storm. Now that the dust has settled, we are starting to run across sometimes, confusing and unreadable code. Libraries like RxSwift bring great power, but how can we bear the burden of this great responsibility? I’ll try to answer this question in my talk, drawing on the experience of the everyday use of RxSwift in a non-trivial app.


Introduction

My name is Krzysztof Siejkowski, and I will be talking about reactive programming using the RxSwift library. The theme of this talk is just how little we know about the observable the moment that we are subscribing to it.

This talk tells the story of one bug hunt and three big questions. Let’s meet the hero of our story, Alex. Alex is an iOS developer working on a popular app. He keeps his finger on the pulse of the community, use the latest and greatest tools, and always seeks the best solutions.

At a mobile development conference, the speaker proposed reactive programming as a better way to write asynchronous code in a unified way. Instead of using the whole pantheon of patterns, like target actions, completion blocks, delegations and so on, you can just use the same highly composable pattern called the observable - making the app easier to maintain and easier to reason about.

Alex found this paradigm fresh and fascinating, so they evangelized it amongst their coworkers. The whole team agreed to use RxSwift as the core of their architecture. Everything went smoothly until they began to get reports of a bug where the activity indicator just spins indefinitely when the reload button is pressed.

From my perspective, this is the type of bug most commonly encountered when using reactive programming: the data does not propagate properly.

Alex quickly found the actual lines of code that was responsible for handling the refreshing logic.

dataProvider.refreshData()
	.subscribe(
		onNext: { [weak self] in
			self!?update(with: $0)
		},
		onError: { [weak self] in
			if let error = $0 as? ReasonableError {
				self!?showUserMessage(with: error.reason)
			}
		}
	)
	.disposed(by: disposeBag)

First, the refreshData method is called on the dataProvider service, and it returns the observable that emits the refresh data. We subscribe to this observable by providing two closures that define how the events are consumed. If everything succeeds, the update method is called with the new data.

If anything fails there is an error with a localized message. What can go wrong with such a basic usage of RxSwift?

First, Alex asked if the self reference inside the subscription closure is nil at the moment that event arrives? If so, the update will be called on a nil object so nothing will ever happen. That would explain the lack of refreshing.

What References Does the Observable Carry?

To answer this, consider what happens under the hood when you create an observable and subscribe to it. First, an object is allocated in memory, and the type is a subclass of the observable type.

Here, the Just object is the source of events, and that flow of events is shaped by adding more operators: here, it’s map and filter. Each layer is a different observable instance of its own type, and each of those instances keeps their reference to the previous one. Together, they make a chain.

From the perspective of the client, all that can be seen is the latest, outermost observable. The underlying structure is completely hidden under the observable type.

Get more development news like this

2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-1-composed-observable

Many of the operators take closures of parameters, and those closures store the references to what has been used inside.

2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-2-closures

When you call subscribe on the observable, there is a cascade of new object creation. These objects are called sinks. Each contains logic required to perform the events coming from the previous sink. 2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-3-sinks

To perform this logic, the sinks must have all the required tools. For example, if there’s a filter sink, it has to have the filtering closure. This means that those closures are passed into the sinks, and sometimes it means that the observables are no longer required after subscription.

The moment you subscribe, there are other objects created called sink disposers. Sink disposers are objects that implement the disposable protocol with one method called dispose. This method contains the logic required to stop the subscription and clean up the resources.

So sink disposers are also structured in a chain. It passes the information from the top to the bottom - from the generation to the observer. As a result, sink disposers create a lot of reference cycles which keep the subscription indefinitely.

2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-4-reference-cycles

To break the cycle, you can call the dispose method on the disposable ojbect. This nils out the reference to its sink, and then it tells the sink disposer above it to do the same. As a result, the subscription ends, and everything is deallocated.

Memory Management Bugs

There are two properties of the RxSwift API that make it easy to create memory management related bugs.

  • It uses closures extensively, which have a lot of references.
  • The subscription is kept alive by itself. As a result, it contains reference cycles between sink disposers and sinks.

In RxSwift, if something is not deallocated properly, it’s leaked.

Fortunately, there are three simple rules to follow to minimize the possibility of any memory-related bugs in reactive programming with RxSwift.

Dispose Your Subscriptions

Always dispose your subscriptions. The method you choose for disposing is up to you in your particular situation.

Watch Your References

Avoid strong references and closures if possible, especially the strong reference to self.

Take Care with Instance Methods

Do not pass instance methods as operator parameters. 2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-5-instance-methods Instance methods always keep a strong reference to self. This is why you should always pass the closure.

Looking back at Alex’s the code, we can see that there is no possibility for self to be nil before the subscription finishes because of the dispose bag.

dataProvider.refreshData()
	.subscribe(
		onNext: { [weak self] in
			self ?update(with: $0)
		},
		onError: { [weak self] in
			if let error = $0 as? ReasonableError {
				self ?showUserMessage(with: error.reason)
			}
		}
	)
	.disposed(by: disposeBag)

Actually, Alex may just as well use unowned reference and it will be okay - [weak self] becomes [unowned self] - for the sake of readability. But that doesn’t solve the problem that the screen is not refreshing.

Call UI from Main Thread

Alex also considered that the bug is a threading issue. After all, the update method works on some UI elements, and calling UI elements from the background thread may lead to some strange and undefined behavior: sometimes crashes, or sometimes glitches.

There are some simple rules to follow to find out which thread the particular part of the observable stream will execute on.

How Schedulers Work

First, let’s quickly refresh the concept of schedulers in RxSwift. Schedulers are an abstraction over various methods of performing work in a particular context. They define where, when, and how the work will be executed. Many things can be seen as part of a work performing context. This is the protocol of the basic scheduler:

func schedule<StateType>(
	_ state: StateType,
	action: @escaping (StateType) !- Disposable
) !- Disposable

It takes the arguments _state and action, and it is up to the scheduler to decide where, how, and when the action closure will be executed by passing the _state as the first parameter.

If you write a custom scheduler, you can make a scheduler that doesn’t work at all, or that doesn’t work on Mondays, for example. In practice, however, schedulers are most often used as wrappers around the existing mechanism in the Cocoa API, like threading and queuing, operation queue, and dispatch queue.

Let’s look at some examples. The MainScheduler is for executing the work on the main queue:

let mainQueue = DispatchQueue.main

// simplified essence of schedule method
if DispatchQueue.isMain {
	action(state)
} else {
	mainQueue.async {
		action(state)
	}
}

This first checks whether it’s already on the main queue. If it is, it does the job right away. And if it isn’t, it uses the .async method to pass the job to be done into the main queue.

The serial DispatchQueue scheduler is very similar but it works on a non-main DispatchQueue.

// internal serial dispatch queue of given properties
let queue = DispatchQueue.global(qos: qos.qosClass)

// simplified essence of schedule method
queue.async {
	action(state)
}

Internally, it creates the serial queue with the parameters provided by the client in the initializer, then it always passes the work to be done to this queue using the .async method.

The OperationQueue scheduler does something very similar but uses the OperationQueue. You provide it with the OperationQueue for the initializer, and then it wraps the job into its BlockOperation system type, and then it is added to the OperationQueue:

// operation queue provided by client in the initializer
let operationQueue: OperationQueue

// simplified essence of schedule method
operationQueue.addOperation(
	BlockOperation {
		action(state))
	}
)

There are many other schedulers defined in RxSwift, but the idea is the same. For example, some can execute the job in a delayed manner, and others can do it repeatedly.

Operators and Schedulers

There is a set of operators that require you to pass in schedulers when you create them. What they do with the schedulers depends on the actual operator:

interval(1, scheduler)

delay(2, scheduler)

throttle(3, scheduler)

Other operators are scheduler-agnostic. You do not pass any information about the job execution context to them:

map { foo($0) }

flatMap { bar($0) }

filter { $0 != wanted }

Lastly, there are two scheduler-defining operators used for defining the operation context for the other operators in the chain: observeOn(scheduler) and subscribeOn(scheduler).

Suppose we have an observable that is composed entirely of scheduler-agnostic operators:

Observable
	.just(42)
	.filter { $0 > 33 }
	.map { "\($0)" }
	.subscribe { print($0) }
	.disposed(by: disposeBag)

There’s no information in this chain about the execution context. All the operations will be executed in the thread that the subscribe method was called on, because this is the only thread there is.

But you can change that using the observeOn operator. It defines what scheduler will be used for execution of all the operators down the road. The observable is always single-threaded because it’s a sequence of events - observeOn lets you switch the thread for this observable.

Observable
	.just(42)
	.observeOn(greenScheduler)
	.filter { $0 > 33 }
	.map { "\($0)" }
	.subscribe { print($0) }
	.disposed(by: disposeBag)

Note that the operators above the observeOn calls are still being executed on the subscription thread. So observeOn only changes anything for the operators down the road, never for those before it. You can use as many observeOn operators as you want. When using observeOn, it’s impossible to change the first execution context, generation thread.

There is another operator, called subscribeOn dedicated to that. It specifies what scheduler the subscription logic will be executed on. The generation of any event is part of the subscription logic, it defines the initial scheduler or the first execution context.

Event generation is a single step, so changing it along the way multiple times does not make sense. So, only the first call subscribeOn call matters and the others are ignored. Also, if you provide the scheduler that runs the generation into the generating operator explicitly, then any late subscribeOn calls are ignored because the scheduler is already defined. 2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-6-subscribeOn

These are the rules for keeping track of the execution context for scheduler-agnostic operators. But what about scheduler-using operators? The situation becomes less clear. First, let’s divide those scheduler-using operators into two groups. Some of them use schedulers for the generation of events, and others use them for internal work.

Operators that use schedulers for event generation are easier to reason about because you can almost always safely assume that they will be passing the events further in the execution context used for the generation. And the RxSwift library documentation even says to explicitly provide the scheduler to send elements on, for example:

// Scheduler to send elements on.
.fromScheduled([1, 2, 3], greenScheduler)

Operators using the schedulers for the internal work have fewer guarantees. Sometimes they pass events in the execution context that you’ve been giving them to do the work, and sometimes they pass in the execution context of the previous operator. Here are some examples:

// Scheduler to run the throttle timers on.
.throttle(1, blueScheduler)

// Scheduler to run the subscription delay timer on.
.delay(1, greenScheduler)

// Scheduler to run buffering timers on.
.buffer(timeSpan: 1, count: 3, scheduler: redScheduler)

To make things even harder, there is also nothing preventing the observable from changing the execution context from event to event. The simple example is a merge observable. Here is a merge observable that is composed of two observables working on different execution contexts - different schedulers:

Observable<Int>
.merge([
	Observable<Int>.interval(3, redScheduler),
	Observable<Int>.interval(5, greenScheduler),
])
.subscribe { [unowned self] in
	// sometimes red, sometimes green
	self.work(with: $0)
}
.disposed(by: disposeBag)

Nothing prevents you from writing code like this. The types must match, but the schedulers do not. This subscription closure sometimes executes in the red scheduler context, and sometimes in the green scheduler context, depending on the timers, and this can lead to bad bugs. How can we protect ourselves?

First, read the documentation to learn about the properties of the particular operator that uses the scheduler. It might state explicitly in the documentation what it uses the scheduler for, and sometimes that’s enough. Otherwise, you can read the source code.

If you are still in doubt, then ensure the execution context with the observeOn operator.

Finally, you can use RxCocoa Traits, like Driver or ControlProperty, because they are guaranteed to work in the main scheduler context.

With Alex’s bug, because there is no way of asking the refresh data observable “what is your execution context?”, Alex decided to choose the safest option and just add the observeOn operator before subscription: dataProvider.refreshData() .observeOn(MainScheduler.instance)....

This, however, did not stop the bug from happening. But what if the observer closure is never executed because the event is never received? How can we know whether the observable is emitting an event? How can we know whether it will ever emit any event?

What is the Protocol?

There are many ways that events might not get passed to the observer. The subscription may end before events are emitted, there might be a bug in the implementation of the generation logic, or the observable stream might be defined in a way that it filters out some class of events, and they are not propagated.

An observable generates a sequence of events, and there is a basic set of requirements that each of those sequences must keep. They always consist of zero or more next events, and the event that closes the sequence - that is, the last element of the sequence - is either completed or error.

.next(data)
.completed
.error(error)

One very basic question is: will the sequence of events ever finish? Will a completed or error event ever be emitted? And if it does finish, and we are absolutely 100% sure it will finish, we can ignore memory management. Here’s some example code to prove that the subscription will end because there will be a completed or error event emitted by the observable:

// you may ignore disposable (though I recommend not to!)
_ = observable
	.subscribe(onNext: { [weak self] in
		self!?work(on: $0)
	})

Here, however, is a non-finishing observable:

let disposable = observable
	.subscribe(onNext: { [weak self] in
		self!?work(on: $0)
	})

// ensure subscription disposed
disposable.dispose()
disposable.disposed(by: disposeBag)

If it does not finish, then it’s our job to ensure that the subscription will be disposed. So whether the communication protocol of the observable finishes or does not finish actually defines whether we use the disposable object or not.

Another question we may ask is: how many events can we expect from the observable? If it’s a one-off sequence, which means it contains only one piece of data, then each time we want to fetch new data, we need to create a new subscription. In a way, we are pulling the data using the observable as a tool whenever we want it.

let serial = SerialDisposable()

// call each time you want the fresh data
func fetchFreshData() {
	serial.disposable = observable
		.subscribe(onNext: { [unowned self] in
			self.work(on: $0)
	})
}

If it’s a long sequence, meaning there will be multiple events, then it could be the observable’s job to push the data to you. Then we subscribe only once, and expect that we will be given the information at the right moment in the future:

let disposeBag = DisposeBag()

// call only once in the object lifetime
func listenForFreshData() {
	observable
		.subscribe(onNext: { [unowned self] in
			self.work(on: $0)
	})
	.disposed(by: disposeBag)
}

How does it start emitting? One possibility is that the moment you subscribe to the observable, it immediately and synchronously calls the observer closure with the first piece of data. And this is, for example, the logic that BehaviorSubject has. Sometimes you subscribe at a time when you are not ready to handle the update yet, and you have to skip the first piece of information:

// if not ready to handle data at subscribe time
dataProvider
	.data()
	.skip(1)
	.subscribe(onNext: { [unowned self] in
		self.work(on: $0)
	})
	.disposed(by: disposeBag)

When the opposite situation occurs, and the observable does not push you the first piece of information, then you can cache it in another object and expose it to clients so that it can propagate the data to its UI.

As an example, the PublishSubject keeps this protocol. It does not provide you the last data, but only the following updates:

// provide separate method for fetching initial data
let initialData = dataProvider.getInitialData()

dataProvider
	.dataUpdates()
	.subscribe(onNext: { [unowned self] in
		self.work(on: $0)
	})
	.disposed(by: disposeBag)

Exposing Communication Protocols

How can we prevent ourselves from getting lost in this maze of hidden communication protocols? The best thing is to shine some light on them, and there are multiple ways to do that.

First, expose the protocol in the type system. This is something that both RxSwift and RxCocoa does already. There are a number of traits in RxCocoa that keep some communication protocols’ properties. For example, some of these are Driver, Signal, and ControlProperty.

There are more traits in the core RxSwift library. For example, there are Single, Completable, Maybe, etc. types. The whole purpose of these types is to expose information about the communication protocol: e.g. how many events will be sent, whether it finishes or not.

Using traits whenever you can make the protocol much easier to understand. But sometimes there are no traits for your situation. In these cases, you might write your own wrappers. That’s fine. Or you can just use plain old comments in your documentation to explain the protocol of the particular observable to other developers.

For example, here’s a complicated domain logic observable.

/*
Returns an Observable that emits at most three times,
starting with the first event emitted immediately
and synchronously upon subscription.
Times of other two events are not guaranteed.
May not complete, but never errors out.
Doesn’t cache any data.
*/

public func thirdTimeLucky() !- Observable<Data>

Use Conventions

Often, when you are working on a big app and on a big team, there are common patterns and conventions that develop, such as the communication protocol.

As an example, the following is part of a service that provides data for view models. And the team agreed, just like a convention, that the best way is to expose two methods giving the observables:

final class DataProvider {

private let proxySubject = PublishSubject<Data>()

var data: Observable<Data> {
	return proxySubject.asObservable()
}

func refreshData() !- Observable<Void> {
	return networkService
		.requestData()
		.do(onNext: { proxySubject.onNext($0) }
		.map { _ in }
	}
}

The refreshData method does not give you data. It causes a refresh, and tells you whether the refresh succeeded or errored.

Code Distance

The last method of dealing with protocols is to limit the scope of the observable so that the number of places influenced by changes or characteristics in the protocol is limited. This limits the potential for bugs.

There’s one concept that I’ve found extremely useful when thinking about limiting scope. The concept is called code distance. When there’s an action in the code, what matters is the distance at which the action cause some reaction. So in other words, it shows whether relationships and couplings are local, or if things that are conceptually unrelated are allowed to influence each other.

I like to think about the actual code distance in terms of the app architecture. So, if the change in the network layer is not visible anywhere in the persistence or view models, but it influences the view layer, it means there’s a long code distance relationship here. 2017-11-15-Krzysztof-Siejkowski-RxSwift-Deep-Cuts-7-long-code-distance There is one pattern that I’ve grown to dislike: NotificationCenter. It lives in these long code distance relationships!

However, if the change in any layer can be seen somewhere nearby, it means that the logic is kept on a shortcode distance. For example, if a change in the observable protocol in the network layer affects the services layer or the persistence layer, it indicates that the code distance is short.

The rule of thumb is to keep the observable in as small a scope as possible because it will help you reason about what causes the event, what the guarantees are and properties of the event, and what effects are created by the event.

Conclusion

In the meantime, Alex has discovered and fixed the source of the bug! The problem was in the onError closure. It’s very closely related to code distance. When working on a network service, one of the team members forgot to ensure that all errors thrown conform to the ReasonableError type. In fact, there was nothing wrong with the subscription code:

dataProvider.refreshData()
	.observeOn(MainScheduler.instance)
	.subscribe(
		onNext: { [unowned self] in
			self.update(with: $0)
		},
	onError: { [unowned self] in
		if let error = $0 as? ReasonableError {
			self.showUserMessage(with: error.reason)
		}
	}
)
.disposed(by: disposeBag)

Next Up: iOS Paradigms #10: Reactive Programming with RxSwift

General link arrow white

About the content

This talk was delivered live in October 2017 at Mobilization. The video was transcribed by Realm and is published here with the permission of the conference organizers and speakers.

Krzysztof Siejkowski

Krzysztof (or Chris) is an iOS developer at Polidea, a hardware-friendly software house in Warsaw, Poland. He’s a co-organizer of Mobile Warsaw, a community for mobile developers, and a Swift enthusiast. A cultural anthropologist by training, he tries to see programming techniques from a humanistic perspective.

4 design patterns for a RESTless mobile integration »

close