Smarter Shared Kotlin Flows

val results: StateFlow<SearchResult> =
queryFlow.mapLatest { query ->
repository.search(query)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000L),
initialValue = SearchResult.EMPTY
)

Synchronizing with the Lifecycle

LiveData

  • A LiveData instance becomes active when at least one of its observers enters the STARTED lifecycle state. This may happen when a subscribed observer enters the STARTED state or when a new started observer subscribes;
  • A LiveData instance becomes inactive when none of its observers remains in the STARTED lifecycle state. This may happen when a subscribed observer moves back from the STARTED state to the CREATED state or when a started observer unsubscribes.

SharedFlow

  • A SharedFlow instance becomes active when the first collector subscribes;
  • A SharedFlow instance becomes inactive when the last collector unsubscribes.

An alternative approach

A meme of Morpheus (from “The Matrix” movie) saying “What if I told you you can split the upstream Flow in two”.
  • The upper part of the upstream Flow should be synchronized with the lifecycle and should stop being collected when the lifecycle is inactive;
  • The lower part of the upstream Flow should never stop being collected but will temporarily not receive any new value from the upper part while the lifecycle is inactive.
    This way, operators like distinctUntilChanged() could be used in the lower part to successfully filter out values re-emitted by the upper part when it restarts and avoid unnecessary work further down the chain.
val results: StateFlow<Result> =
someTriggerFlow() // upper part
.someOperator(someFormOfLifecycle)
.distinctUntilChanged() // lower part
.map { someExpensiveLoadingOperation(it) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Eagerly,
initialValue = Result.empty()
)

Injecting an Android Lifecycle in the ViewModel ?

  • It makes the ViewModel more difficult to test because it adds an extra dependency to the Android-specific Lifecycle class;
  • It requires additional complex code to clear the Lifecycle references from the ViewModel after the LifecycleOwner gets destroyed, in order to prevent memory leaks;
  • It doesn’t take into account the fact that a single ViewModel may be observed by multiple components with different Lifecycles at the same time. For example it’s common to use a ViewModel to allow a Fragment to communicate with an Activity or another Fragment. Supporting these use cases would require reinventing much of what LiveData already handles out-of-the box;
  • There is a simpler solution.

Introducing the flowWhileShared() operator

@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when
(it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}
  • SharingStarted is used to emit a Flow of START and STOP commands in reaction to the changes of subscriptionCount. In 99% of the cases, the implementation we’re going to use is SharingStarted.WhileSubscribed() so that the START command will be emitted when subscriptionCount > 0 and the STOP command will be emitted when subscriptionCount == 0 to match the behavior of LiveData;
  • The flatMapLatest() operator is used to switch between the upstream Flow (this) and an empty Flow. Internally, this operator creates a child coroutine to collect the values of these Flows and cancels it on each new command.
The “Left Exit 12 Off Ramp” meme featuring flowWhileShared() as a car taking a sharp turn to the right on the highway to reach the exit, where the main road sign shows “this” and the right exit sign shows “emptyFlow()”.

Usage with SharedFlow or StateFlow

fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
scope.launch {
producer(state.subscriptionCount).collect(state)
}
return
state.asStateFlow()
}
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.results.collect { data ->
displayResult(data)
}
}
}

Use cases

Loading data in response to location updates

@OptIn(ExperimentalCoroutinesApi::class)
val location: StateFlow<List<Shop>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
locationClient.locationFlow(
interval = 20000L,
smallestDisplacement = 30f
)
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
)
.distinctUntilChanged { old, new ->
old.distanceTo(new) < 30f
}
.mapLatest { location ->
shopsRepository.getShopsAtLocation(location)
}
.catch { emit(emptyList()) }
}

Reloading data on invalidation

  • The screen is currently visible (no early refresh in the background);
  • The data has actually changed since the last load.
interface CustomerRepository {
val customersVersion: StateFlow<Int>

suspend fun getAllCustomers(): List<Customer>

suspend fun getActiveCustomers(): List<Customer>
}
private val _customersVersion = MutableStateFlow(0)suspend fun updateCustomerName(customerId: Long, newName: String) {
// Insert some code to update the data here
_customersVersion.update { it + 1 }
}
fun RoomDatabase.createVersionFlow(vararg tables: String): StateFlow<Int> {
val stateFlow = MutableStateFlow(0)
invalidationTracker.addObserver(object : InvalidationTracker.Observer(tables) {
override fun onInvalidated(tables: Set<String>) {
stateFlow.update { it + 1 }
}
})
return stateFlow.asStateFlow()
}
@OptIn(ExperimentalCoroutinesApi::class)
val activeCustomers: Flow<List<Customer>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
customersRepository.customersVersion
.flowWhileShared(
subscriptionCount, SharingStarted.WhileSubscribed()
)
.distinctUntilChanged()
.mapLatest {
customersRepository.getActiveCustomers()
}
}
Two timelines showing emitted values before and after applying the distinctUntilChanged() operator: the first timeline contains series of identical values and the second timeline only contains the first occurrence of each value.
Illustration from flowmarbles.com

Final tips

  • For one-shot data loading;
  • When the source of the upstream Flow is a UI element on the screen like a refresh button, thus new values will never be emitted when the screen is invisible and the upstream Flow collection doesn’t need to be canceled;
  • When the cost of reloading the same data from the repository is cheap, for example because the implementation is backed by a shared cache. It’s then acceptable to reload data every time the screen becomes visible.

Conclusion

Full source code

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store