Smarter Shared Kotlin Flows

Make the lifecycle available to the upstream Flow to skip unnecessary work

val results: StateFlow<SearchResult> =
queryFlow.mapLatest { query ->
repository.search(query)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000L),
initialValue = SearchResult.EMPTY
)

Synchronizing with the Lifecycle

LiveData

The core reason why LiveData deals with lifecycles better than Flow is because the lifecycle state is automatically propagated upstream through all the LiveData instances so they can all pause cooperatively while preserving their state. To be exact, the information that is propagated is a simplified and aggregated lifecycle state called the active state:

  • A LiveData instance becomes active when at least one of its observers enters the STARTED lifecycle state. This may happen when a subscribed observer enters the STARTED state or when a new started observer subscribes;
  • A LiveData instance becomes inactive when none of its observers remains in the STARTED lifecycle state. This may happen when a subscribed observer moves back from the STARTED state to the CREATED state or when a started observer unsubscribes.

SharedFlow

SharedFlow (and its subclass StateFlow) is a special kind of Flow that is able to broadcast the values emitted by a single source to multiple observers (called collectors) and replay the latest value to new observers, just like LiveData. A SharedFlow instance is both a collector and an emitter of values, and the upstream values are collected by SharedFlow independently from the downstream collectors.

  • A SharedFlow instance becomes active when the first collector subscribes;
  • A SharedFlow instance becomes inactive when the last collector unsubscribes.

An alternative approach

A meme of Morpheus (from “The Matrix” movie) saying “What if I told you you can split the upstream Flow in two”.
  • The upper part of the upstream Flow should be synchronized with the lifecycle and should stop being collected when the lifecycle is inactive;
  • The lower part of the upstream Flow should never stop being collected but will temporarily not receive any new value from the upper part while the lifecycle is inactive.
    This way, operators like distinctUntilChanged() could be used in the lower part to successfully filter out values re-emitted by the upper part when it restarts and avoid unnecessary work further down the chain.
val results: StateFlow<Result> =
someTriggerFlow() // upper part
.someOperator(someFormOfLifecycle)
.distinctUntilChanged() // lower part
.map { someExpensiveLoadingOperation(it) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Eagerly,
initialValue = Result.empty()
)

Injecting an Android Lifecycle in the ViewModel ?

Mr Boushaba suggested a new operator whenAtLeast() working in combination with a custom Android Lifecycle-aware ViewModel. A similar option is to use the flowWithLifecycle() operator provided by the AndroidX Lifecycle library in a standard ViewModel. Both these solutions require keeping a reference to an Android Lifecycle inside a ViewModel class. In my opinion this should be avoided for different reasons:

  • It makes the ViewModel more difficult to test because it adds an extra dependency to the Android-specific Lifecycle class;
  • It requires additional complex code to clear the Lifecycle references from the ViewModel after the LifecycleOwner gets destroyed, in order to prevent memory leaks;
  • It doesn’t take into account the fact that a single ViewModel may be observed by multiple components with different Lifecycles at the same time. For example it’s common to use a ViewModel to allow a Fragment to communicate with an Activity or another Fragment. Supporting these use cases would require reinventing much of what LiveData already handles out-of-the box;
  • There is a simpler solution.

Introducing the flowWhileShared() operator

It turns out we don’t need to deal with Android’s Lifecycle at all in the ViewModel because MutableSharedFlow already provides something equivalent. In the previous section, we mentioned that it tracks and exposes the current number of subscribed collectors through the subscriptionCount property and this can be used to infer the active state which is all we need. It is used internally by shareIn() and stateIn() but can also be used as input value for a custom Flow operator that we are going to call flowWhileShared():

@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when
(it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}
  • SharingStarted is used to emit a Flow of START and STOP commands in reaction to the changes of subscriptionCount. In 99% of the cases, the implementation we’re going to use is SharingStarted.WhileSubscribed() so that the START command will be emitted when subscriptionCount > 0 and the STOP command will be emitted when subscriptionCount == 0 to match the behavior of LiveData;
  • The flatMapLatest() operator is used to switch between the upstream Flow (this) and an empty Flow. Internally, this operator creates a child coroutine to collect the values of these Flows and cancels it on each new command.
The “Left Exit 12 Off Ramp” meme featuring flowWhileShared() as a car taking a sharp turn to the right on the highway to reach the exit, where the main road sign shows “this” and the right exit sign shows “emptyFlow()”.

Usage with SharedFlow or StateFlow

Unfortunately this new operator can’t be used in combination with shareIn() or stateIn() because they don’t expose the subscriptionCount field of MutableSharedFlow publicly. Instead we need to create and manage our own instance of MutableSharedFlow or MutableStateFlow.

fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
scope.launch {
producer(state.subscriptionCount).collect(state)
}
return
state.asStateFlow()
}
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.results.collect { data ->
displayResult(data)
}
}
}

Use cases

Let’s look at some real-world examples of how to use these new functions together.

Loading data in response to location updates

Suppose we have a locationFlow() function similar to the one described in this article which allows to receive location updates in the form of a cold Flow<Location>, actively using the device’s GPS while collected. We want to load a list of shops around the current location from an API and update that list every time the device moves by at least 30 meters to avoid unnecessary API calls. At the same time, we want to make sure the GPS resource is freed while the screen is not visible.

@OptIn(ExperimentalCoroutinesApi::class)
val location: StateFlow<List<Shop>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
locationClient.locationFlow(
interval = 20000L,
smallestDisplacement = 30f
)
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
)
.distinctUntilChanged { old, new ->
old.distanceTo(new) < 30f
}
.mapLatest { location ->
shopsRepository.getShopsAtLocation(location)
}
.catch { emit(emptyList()) }
}

Reloading data on invalidation

Whenever possible, an application should present the latest version of the data to the user without the need of a manual refresh. To achieve this, a repository can send an invalidation signal to the observer of the data after each completed write operation (create/update/delete), eventually triggering a refresh.

  • The screen is currently visible (no early refresh in the background);
  • The data has actually changed since the last load.
interface CustomerRepository {
val customersVersion: StateFlow<Int>

suspend fun getAllCustomers(): List<Customer>

suspend fun getActiveCustomers(): List<Customer>
}
private val _customersVersion = MutableStateFlow(0)suspend fun updateCustomerName(customerId: Long, newName: String) {
// Insert some code to update the data here
_customersVersion.update { it + 1 }
}
fun RoomDatabase.createVersionFlow(vararg tables: String): StateFlow<Int> {
val stateFlow = MutableStateFlow(0)
invalidationTracker.addObserver(object : InvalidationTracker.Observer(tables) {
override fun onInvalidated(tables: Set<String>) {
stateFlow.update { it + 1 }
}
})
return stateFlow.asStateFlow()
}
@OptIn(ExperimentalCoroutinesApi::class)
val activeCustomers: Flow<List<Customer>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
customersRepository.customersVersion
.flowWhileShared(
subscriptionCount, SharingStarted.WhileSubscribed()
)
.distinctUntilChanged()
.mapLatest {
customersRepository.getActiveCustomers()
}
}
Two timelines showing emitted values before and after applying the distinctUntilChanged() operator: the first timeline contains series of identical values and the second timeline only contains the first occurrence of each value.
Illustration from flowmarbles.com

Final tips

For many cases, the standard shareIn() and stateIn() operators should still be preferred over the complexity of flowWhileShared(). In particular:

  • For one-shot data loading;
  • When the source of the upstream Flow is a UI element on the screen like a refresh button, thus new values will never be emitted when the screen is invisible and the upstream Flow collection doesn’t need to be canceled;
  • When the cost of reloading the same data from the repository is cheap, for example because the implementation is backed by a shared cache. It’s then acceptable to reload data every time the screen becomes visible.

Conclusion

Using the new flowWhileShared() operator, we managed to synchronize a portion of an upstream Flow with the lifecycle of the downstream SharedFlow or StateFlow, allowing to skip unnecessary work with the help of an operator like distinctUntilChanged(). All of this without having to include any LiveData or Android-specific Lifecycle code in the ViewModel of Android applications.

Full source code

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Christophe Beyls

Android developer from Belgium, blogging about advanced programming topics.