Issue
I have two coroutines that run in parallel. One is a producer who changes the global state and sends an event to everyone in the SharedFlow
, and the other is a consumer who must receive the state. If the state already exists, then just print it, and if not, then subscribe to the SharedFlow
and collect the state in the future through a collect
method.
I discovered that my code is not thread-safe because in the line where the comment may occur the case that in parallel on this moment the new state will be emitted and will not be able to receive new state. So we missing the new state because we have't subscribed on flow yet. Please tell me how I can change the code so that the code becomes thread safe and we could get the new state by the flow without replacing MutableSharedFlow
with channel? Thank you very much!
Here is the code:
val mutableFlow: MutableSharedFlow<String> = MutableSharedFlow()
val flow: Flow<String> = mutableFlow
@Volatile
var state: String = ""
fun main(): Unit = runBlocking {
val consumerDelay = 2_000L
val producerDelay = 2_001L // emit state in 1 ms longer
launch {
delay(consumerDelay)
if (state.isNotEmpty()) {
println("The state has been updated...")
showState(state)
} else {
println("State hasn't been set yet, waiting... :(")
/* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* HERE IS THE MOMENT WHEN PRODUCER MAY EMIT EVENT
* BUT WE HAVE NOT SUBSCRIBED YET AND WILL NOT RECEIVE IT
* IN THE FUTURE
* */
launch {
flow.collect { newState ->
showState(newState)
}
}
}
}
launch {
delay(producerDelay)
val newState = "New State"
state = newState // update the state variable
mutableFlow.emit(newState) // notify all subscribers
}
}
private fun showState(newState: String) {
println("New state is: $newState")
}
Moreover, it seems that wrapping code of the consumer and producer in mutex.withLock
will not help because we asynchronously launch new coroutine inside consumer for collecting state with flow.collect
Solution
You mustn't use an external state storage to duplicate a flow. To achieve this goal you have to use StateFlow instead of Flow.
Your code rewritten with state flow:
val mutableFlow = MutableStateFlow("")
val flow: StateFlow<String> = mutableFlow
fun main(): Unit = runBlocking {
val consumerDelay = 2_000L
val producerDelay = 2_001L // emit state in 1 ms longer
launch {
delay(consumerDelay)
flow.collect { state ->
showState(state)
}
}
launch {
delay(producerDelay)
mutableFlow.emit("New State")
}
}
private fun showState(newState: String) {
println("New state is: $newState")
}
Answered By - Viacheslav Smityukh
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.