Issue
I am just starting with the usage of coroutines/flow (and kotlin in general) and I am struggling to convert a callbackFlow to a sharedFlow.
I have put together the simple example below just to show what I have tried, without success. My code is more complex but I believe this example reflects what are the issues of what I am trying to achieve.
fun main() = runBlocking {
getMySharedFlow().collect{
println("collector 1 value: $it")
}
getMySharedFlow().collect{
println("collector 2 value: $it")
}
}
val sharedFlow = MutableSharedFlow<Int>()
suspend fun getMySharedFlow(): SharedFlow<Int> {
println("inside sharedflow")
getMyCallbackFlow().collect{
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
return sharedFlow
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: ()->Unit) {
println("fetching something...")
myCallBack()
}
The idea is that the fetchSomethingContinuously
is only called once, independent the number of collectors of the sharedFlow. But as you can see from the output, collectors never gets the values:
inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3
I looked in the shareIn operator, but not sure how to use it exactly.
How could I achieve something like this? Any tips would be very appreciated.
Solution
So what you are missing here is the fact that calls to collect
, emit()
and awaitClose()
are suspending and will finish only after the respective operation is done.
The function getMySharedFlow()
doesn't even return in order to apply collect on it because it is collecting the callbackFlow
, the callbackFlow
is stuck at the call to awaitClose()
which in turn doesn't finish because fetchSomethingContinuously
did not end the callback with the close()
function.
You need to create some explicit parallelism here, not live in the suspending world. A working variant of your sample code would be:
val sharedFlow = MutableSharedFlow<Int>()
suspend fun startSharedFlow() {
println("Starting Shared Flow callback collection")
getMyCallbackFlow().collect {
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
}
fun main() = runBlocking<Unit> {
launch {
startSharedFlow()
}
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
Calls to launch
allow for asynchronous execution of emitting and collecting values.
Also, about the shareIn()
operator, it just creates a SharedFlow from the specified upstream, like you wanted to do. Also, you can specify when to start sharing with the started
parameter. More on this here.
This is how you would use it in your example:
fun main() = runBlocking<Unit> {
val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
Answered By - Halex
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.