Issue
I'm trying to execute certain suspend function multiple times, in such a way that never more than N of these are being executed at the same time.
For those acquainted with Akka and Scala Streaming libraries, something like mapAsync
.
I did my own implementation using one input channel (as in kotlin channels) and N output channels. But it seems cumbersome and not very efficient.
The code I'm currently using is somewhat like this:
val inChannel = Channel<T>()
val outChannels = (0..n).map{
Channel<T>()
}
launch{
var i = 0
for(t in inChannel){
outChannels[i].offer(t)
i = ((i+1)%n)
}
}
outChannels.forEach{outChannel ->
launch{
for(t in outChannel){
fn(t)
}
}
}
Of course it has error management and everything, but still...
Edit: I did the following test, and it failed.
test("Parallelism is correctly capped") {
val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
var num = 0
(1..100).map {
scope.launch {
num ++
println("started $it")
delay(Long.MAX_VALUE)
}
}
delay(500)
assertEquals(3,num)
}
Solution
Your question, as asked, calls for @marstran's answer. If what you want is that no more than N coroutines are being actively executed at any given time (in parallel), then limitedParallelism
is the way to go:
val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)
elements.forEach { elt ->
scope.launch(limitedDispatcher) {
doSomething(elt)
}
}
Now, if what you want is to even limit concurrency, so that at most N coroutines are run concurrently (potentially interlacing), regardless of threads, you could use a Semaphore instead:
val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)
elements.forEach { elt ->
scope.async {
semaphore.withPermit {
doSomething(elt)
}
}
}
You can also combine both approaches.
Answered By - Joffrey
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.