Issue
I'm not sure how to go about using Kotlin flows when there is a series of parallel requests which need to converge in the same result. As an example let's use the scenario where we have two endpoints, one returning a list of userIDs and one returning whether the user is older than 30.
fun callEndpoint1(): Flow<List<String>> {
// Actual call to server returning a list of N userIDs
// For simulation purposes it returns the following
return flowOf(listOf("1", "2", "3", "4", "5"))
}
fun callEndpoint2(userId: String): Flow<Boolean> {
// Actual call to server returning a boolean
// For simulation purposes it returns the following
return flowOf(userId.toInt() % 2 == 0)
}
fun calculateTotalPeopleOver30(): Flow<Int> {
return callEndpoint1().map{ listIds ->
// wrong approach returning 0, should return 2
var result = 0
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
result
}
}
This approach is wrong because the variable result
will be returned before having the chance to store all results coming from the different parallel calls.
Without the chance of having an endpoint able to process a bulk of IDs all together, what would be the correct way to approach the issue?
I found a way to make it work, but it's only by leaking in another class the knowledge of what we need to achieve, which is not what I want. Just for illustration purposes I include it here
fun calculateTotalPeopleOver30(): Flow<List<Boolean>> {
return callEndpoint1().map { listIds ->
val result = arrayListOf<Boolean>()
listIds.forEach { id ->
result.add(callEndpoint2(id).first())
}
result.toList()
}
}
fun coroutineLauncher(scope: CoroutineScope) {
scope.launch{
calculateTotalPeopleOver30()
.collect { list ->
println("people over 30 are ${list.filter{it}.size}")
}
}
}
Solution
Your example doesn't work because a Flow
will not do anything until you collect it. It is a lazy construct. This means that this piece of code doesn't actually do anything:
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
To solve this, you need to collect the Flow
returned by callEndpoint2(id)
. You can use the count
-method to do this, instead of counting manually.
val result = listIds.map { id ->
callEndpoint2(id)
.count { it }
}.sum()
Now, there is another problem with this. The counting will not be performed in parallel. It will finish counting the result of one callEndpoint2(id)
before continuing with the next.
To do this, you can convert your list of IDs to a Flow
and use flatMapMerge
to call callEndpoint2
concurrently. The default concurrency of flatMapMerge
is 16, but you can configure this with the concurrency
-parameter. The resulting code is this:
fun calculateTotal(): Flow<Int> {
return callEndpoint1()
.map {
it.asFlow()
.flatMapMerge { callEndpoint2(it) }
.count { it }
}
}
Answered By - marstran
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.