Smarter Shared Kotlin Flows

Make the lifecycle available to the upstream Flow to skip unnecessary work

Christophe Beyls
12 min readJun 6, 2022

This is the second part of a series of articles about using Kotlin Flow on Android.

In the first part, we described the main limitation of Kotlin Flow when used inside ViewModel classes:

When a SharedFlow or StateFlow using the SharingStarted.WhileSubscribed() strategy is collected again after the user navigates back to an Activity or Fragment, its source upstream Flow will always restart from scratch, sometimes resulting in unnecessary work being performed when the previously cached data was still valid.

val results: StateFlow<SearchResult> =
queryFlow.mapLatest { query ->
repository.search(query)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000L),
initialValue = SearchResult.EMPTY
)

In the above example, repository.search() will be executed again even if the latest value of queryFlow didn’t change in the meantime. This means a potential unnecessary network request or database query.

LiveData doesn’t suffer from this issue because its observers don’t need to unsubscribe when they become inactive: LiveData is lifecycle-aware and will postpone the delivery of new results until it becomes active, while also ensuring that the same result will never be delivered to the same observer twice (even when it becomes active again). For more details I invite you to read the full article.

At the end of this first part, we concluded that there was no simple and correct way to avoid performing this unnecessary work when relying only on the standard shareIn() or stateIn() operators.

In this second part, we are going to solve that efficiency problem by designing a new Flow operator that will allow SharedFlows to integrate better with the lifecycle.

Synchronizing with the Lifecycle

LiveData

The core reason why LiveData deals with lifecycles better than Flow is because the lifecycle state is automatically propagated upstream through all the LiveData instances so they can all pause cooperatively while preserving their state. To be exact, the information that is propagated is a simplified and aggregated lifecycle state called the active state:

  • A LiveData instance becomes active when at least one of its observers enters the STARTED lifecycle state. This may happen when a subscribed observer enters the STARTED state or when a new started observer subscribes;
  • A LiveData instance becomes inactive when none of its observers remains in the STARTED lifecycle state. This may happen when a subscribed observer moves back from the STARTED state to the CREATED state or when a started observer unsubscribes.

When applying a LiveData transformation like map() or switchMap(), the upstream LiveData instance automatically inherits the active state of the downstream LiveData instance.

When inactive, LiveData instances will automatically postpone delivering new results to observers downstream. Additionnaly, LiveData implementations may be notified in order to stop doing active work. For example, the LiveData coroutine builder will cancel its coroutine block when it becomes inactive.

SharedFlow

SharedFlow (and its subclass StateFlow) is a special kind of Flow that is able to broadcast the values emitted by a single source to multiple observers (called collectors) and replay the latest value to new observers, just like LiveData. A SharedFlow instance is both a collector and an emitter of values, and the upstream values are collected by SharedFlow independently from the downstream collectors.

Flow and SharedFlow do not support the Android Lifecycle class directly, but AndroidX Lifecycle APIs like Lifecycle.repeatOnLifecycle() must be used in the UI layer to synchronize the Flow collection with the Android Lifecycle: Flows must be collected only when the UI is in the STARTED lifecycle state or higher and the collecting coroutine should be canceled as soon as the UI becomes invisible.

SharedFlow implementations like MutableSharedFlow are also able to expose the current number of subscribed collectors which, when used in combination with the Lifecycle synchronization APIs mentioned above, can be used to infer something equivalent to the active state of LiveData:

  • A SharedFlow instance becomes active when the first collector subscribes;
  • A SharedFlow instance becomes inactive when the last collector unsubscribes.

The shareIn() and stateIn() extension functions are the simplest ways of creating a SharedFlow instance from a source upstream Flow. They accept a SharingStarted argument which is the strategy used to start and stop collecting the upstream Flow (from a separate coroutine) in response to the changes of “active state”.

In conclusion, SharedFlow can be made lifecycle-aware, but when using these limited APIs the only thing that can be done in response to an active state change is to launch or cancel a coroutine collecting the entire upstream Flow. The upstream Flow cannot just pause and resume where it left off: canceling the coroutine means losing the entire internal state of the current collection and having to restart the upstream Flow from scratch during the next collection.

This makes it impractical to use operators like distinctUntilChanged() in the upstream Flow because the last emitted value will be forgotten as soon as the coroutine collecting the Flow gets canceled by the SharedFlow strategy. Flow operators are not designed to work across multiple collections.

An alternative approach

A meme of Morpheus (from “The Matrix” movie) saying “What if I told you you can split the upstream Flow in two”.

We need to find a trade-off. Instead of canceling the coroutine collecting the entire upstream Flow, we should keep it always active but split the upstream Flow into 2 parts:

  • The upper part of the upstream Flow should be synchronized with the lifecycle and should stop being collected when the lifecycle is inactive;
  • The lower part of the upstream Flow should never stop being collected but will temporarily not receive any new value from the upper part while the lifecycle is inactive.
    This way, operators like distinctUntilChanged() could be used in the lower part to successfully filter out values re-emitted by the upper part when it restarts and avoid unnecessary work further down the chain.
val results: StateFlow<Result> =
someTriggerFlow() // upper part ↑
.someOperator(someFormOfLifecycle)
.distinctUntilChanged() // lower part ↓
.map { someExpensiveLoadingOperation(it) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Eagerly,
initialValue = Result.empty()
)

This approach was proposed by Hicham Boushaba in his article “Making cold Flows lifecycle-aware” which served as a starting point for this one.

To make the upper part of the Flow lifecycle-aware, we need to use an operator taking some form of lifecycle as input, launching a child coroutine to collect it when the lifecycle is active and canceling the coroutine when inactive, while forwarding the values downstream to the lower part.

Injecting an Android Lifecycle in the ViewModel ?

Mr Boushaba suggested a new operator whenAtLeast() working in combination with a custom Android Lifecycle-aware ViewModel. A similar option is to use the flowWithLifecycle() operator provided by the AndroidX Lifecycle library in a standard ViewModel. Both these solutions require keeping a reference to an Android Lifecycle inside a ViewModel class. In my opinion this should be avoided for different reasons:

  • It makes the ViewModel more difficult to test because it adds an extra dependency to the Android-specific Lifecycle class;
  • It requires additional complex code to clear the Lifecycle references from the ViewModel after the LifecycleOwner gets destroyed, in order to prevent memory leaks;
  • It doesn’t take into account the fact that a single ViewModel may be observed by multiple components with different Lifecycles at the same time. For example it’s common to use a ViewModel to allow a Fragment to communicate with an Activity or another Fragment. Supporting these use cases would require reinventing much of what LiveData already handles out-of-the box;
  • There is a simpler solution.

Introducing the flowWhileShared() operator

It turns out we don’t need to deal with Android’s Lifecycle at all in the ViewModel because MutableSharedFlow already provides something equivalent. In the previous section, we mentioned that it tracks and exposes the current number of subscribed collectors through the subscriptionCount property and this can be used to infer the active state which is all we need. It is used internally by shareIn() and stateIn() but can also be used as input value for a custom Flow operator that we are going to call flowWhileShared():

@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when (it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}

The implementation is straightforward and relies on two existing Flow building blocks:

  • SharingStarted is used to emit a Flow of START and STOP commands in reaction to the changes of subscriptionCount. In 99% of the cases, the implementation we’re going to use is SharingStarted.WhileSubscribed() so that the START command will be emitted when subscriptionCount > 0 and the STOP command will be emitted when subscriptionCount == 0 to match the behavior of LiveData;
  • The flatMapLatest() operator is used to switch between the upstream Flow (this) and an empty Flow. Internally, this operator creates a child coroutine to collect the values of these Flows and cancels it on each new command.
The “Left Exit 12 Off Ramp” meme featuring flowWhileShared() as a car taking a sharp turn to the right on the highway to reach the exit, where the main road sign shows “this” and the right exit sign shows “emptyFlow()”.

Note: for simplicity, the special “stop and reset replay cache” command which acts on the cache of the parent SharedFlow is ignored and treated like a regular “stop” command.

Usage with SharedFlow or StateFlow

Unfortunately this new operator can’t be used in combination with shareIn() or stateIn() because they don’t expose the subscriptionCount field of MutableSharedFlow publicly. Instead we need to create and manage our own instance of MutableSharedFlow or MutableStateFlow.

To reduce boilerplate code and avoid repetition, we can create a custom StateFlow factory function:

fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
scope.launch {
producer(state.subscriptionCount).collect(state)
}
return state.asStateFlow()
}

This function immediately launches a coroutine from the provided scope and passes the subscriptionCount as argument to an upstream Flow producer lambda. The produced Flow is then collected by the MutableStateFlow (which implements FlowCollector<T>) and finally the MutableStateFlow is returned as a read-only StateFlow.

At the end of this article you will find the source code of an optimized version of this function (minimizing the number of classes generated by the compiler) and a variant for SharedFlow.

Of course this entire system only works as intended if all the SharedFlows and StateFlows in the ViewModel are collected from the UI layer using the AndroidX Lifecycle APIs mentioned above, for example:

viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.results.collect { data ->
displayResult(data)
}
}
}

Use cases

Let’s look at some real-world examples of how to use these new functions together.

Loading data in response to location updates

Suppose we have a locationFlow() function similar to the one described in this article which allows to receive location updates in the form of a cold Flow<Location>, actively using the device’s GPS while collected. We want to load a list of shops around the current location from an API and update that list every time the device moves by at least 30 meters to avoid unnecessary API calls. At the same time, we want to make sure the GPS resource is freed while the screen is not visible.

The ViewModel will contain a StateFlow builder similar to this:

@OptIn(ExperimentalCoroutinesApi::class)
val location: StateFlow<List<Shop>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
locationClient.locationFlow(
interval = 20000L,
smallestDisplacement = 30f
)
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
)
.distinctUntilChanged { old, new ->
old.distanceTo(new) < 30f
}
.mapLatest { location ->
shopsRepository.getShopsAtLocation(location)
}
.catch { emit(emptyList()) }
}

(Error handling has been simplified for the demonstration)

The stateFlow() factory function takes a producer lambda returning the upstream Flow as last argument. That lambda allows subscriptionCount to be used in the upstream Flow.

The main part of the upstream Flow will be collected with no interruption as long as the ViewModel stays in memory. However the part before flowWhileShared(), in this case locationFlow(), will only be collected while the StateFlow is active (has at least one subscription) so the GPS will stop when it becomes inactive. Thanks to the SharingStarted.WhileSubscribed(1000L) strategy, the collection will be canceled one second after the UI is hidden and the number of subscriptions reaches 0. That extra delay allows to avoid the cost of restarting location updates from scratch during configuration changes (when a new UI instance will resume the Flow collection almost immediately).

When the user leaves the screen and comes back to it later, the collection of locationFlow() will restart and distinctUntilChanged() (which is still being collected with no interruption) will take care of filtering out new locations that are too close to the previously emitted one. This way, we avoid the unnecessary work of calling the API again if the device did not move more than 30 meters since the last time the UI was visible.

The latest result will still be cached by StateFlow and automatically replayed to the current UI when it starts collecting it.

We managed to create the most efficient implementation solving that problem without having to rely on LiveData.

Reloading data on invalidation

Whenever possible, an application should present the latest version of the data to the user without the need of a manual refresh. To achieve this, a repository can send an invalidation signal to the observer of the data after each completed write operation (create/update/delete), eventually triggering a refresh.

To achieve maximum efficiency, the data should only be reloaded from storage or network if:

  • The screen is currently visible (no early refresh in the background);
  • The data has actually changed since the last load.

A natural way to represent data that can be invalidated is to return a Flow of results from the repository, rather than a single result.

However since we want to avoid refreshing the data while the screen is not visible, we need to cancel the collection of that Flow during that time. And since each Flow collection is self-contained, it will always start by loading the data, no matter if it has changed or not since the previous collection from the same screen.

For example, this kind of Flow is created by the Room Jetpack library to provide support for observable data sets in your DAOs. The Room implementation performs a database query when the Flow collection starts, then every time a related table is updated. This means that an unnecessary database query is performed every time a screen becomes visible again and the data set hasn’t changed in the meantime.

By comparison, the LiveData implementation generated by Room is more efficient because it will only perform a database query if a table has changed since the last time the LiveData was active.

To make the Kotlin Flow code more efficient we need to use versioning.

The version of the data set should be exposed as a StateFlow<Int> in the repository, along with functions to retrieve current snapshots of the data. For example:

interface CustomerRepository {
val customersVersion: StateFlow<Int>

suspend fun getAllCustomers(): List<Customer>

suspend fun getActiveCustomers(): List<Customer>
}

The repository may be backed by a remote API or local storage like a database; it doesn’t matter.

Every time the data set is updated successfully, the version number must be incremented atomically:

private val _customersVersion = MutableStateFlow(0)

suspend fun updateCustomerName(customerId: Long, newName: String) {
// Insert some code to update the data here
_customersVersion.update { it + 1 }
}

Tip: if your repository is backed by a Room database, you don’t need to keep track of changes manually, you can use Room’s InvalidationTracker to monitor a list of tables:

fun RoomDatabase.createVersionFlow(
scope: CoroutineScope,
vararg tables: String
): Flow<Int> {
return flow {
var version = 0
invalidationTrackerFlow(*tables).collect {
emit(version++)
}
}.stateIn(
scope = scope,
started = SharingStarted.Eagerly,
initialValue = null
).filterNotNull()
}

All the pieces of the puzzle are finally put together in the ViewModel:

@OptIn(ExperimentalCoroutinesApi::class)
val activeCustomers: Flow<List<Customer>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
customersRepository.customersVersion
.flowWhileShared(
subscriptionCount, SharingStarted.WhileSubscribed()
)
.distinctUntilChanged()
.mapLatest {
customersRepository.getActiveCustomers()
}
}

Thanks again to the distinctUntilChanged() operator, the current version is compared against the previous one (if any) as soon as the StateFlow becomes active, and the data is only loaded if the version number changed. Problem solved.

Two timelines showing emitted values before and after applying the distinctUntilChanged() operator: the first timeline contains series of identical values and the second timeline only contains the first occurrence of each value.
Illustration from flowmarbles.com

Final tips

For many cases, the standard shareIn() and stateIn() operators should still be preferred over the complexity of flowWhileShared(). In particular:

  • For one-shot data loading;
  • When the source of the upstream Flow is a UI element on the screen like a refresh button, thus new values will never be emitted when the screen is invisible and the upstream Flow collection doesn’t need to be canceled;
  • When the cost of reloading the same data from the repository is cheap, for example because the implementation is backed by a shared cache. It’s then acceptable to reload data every time the screen becomes visible.

Conclusion

Using the new flowWhileShared() operator, we managed to synchronize a portion of an upstream Flow with the lifecycle of the downstream SharedFlow or StateFlow, allowing to skip unnecessary work with the help of an operator like distinctUntilChanged(). All of this without having to include any LiveData or Android-specific Lifecycle code in the ViewModel of Android applications.

Full source code

If you liked this article and found it helpful, please share it on social networks and post your feedback in the comments section. I’m curious to learn what you think about this pattern and how it could be improved. Expect reading about more uses cases for Kotlin Flow on Android in a following part.

Special thanks to Hicham Boushaba who gave me the original idea for writing this blog post.

--

--

Christophe Beyls
Christophe Beyls

Written by Christophe Beyls

Android developer from Belgium, blogging about advanced programming topics.

Responses (7)