-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Support "worker pool" pattern in actor builder and other related operators #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
actor
builder and related operators
oops, this title let me think it as (-: Support Kotlin native "worker pool", see also: |
Is this too naive implementation of
|
@enleur This is close. However, I'd like to have a slightly more efficient implementation that launches up to |
As a proposal for an alternative implementation:
Usage example 1:
Usage example 2:
The main advantage: this is quite flexible with respect to the outer "looping" code. Also you are not limited exactly one 'request' parameter for processing function, you may use function with any number of parameters. |
Does concurrent Should we introduce a optional parameter |
Sometimes you need an order preserved, sometimes you do not. I wonder what should be the default and whether it should be controlled by a boolean of there should be separate operators. |
Note, that an alternative design approach to solve the use-case of parallel processing is to introduce a dedicated |
I am considering the follow signature, this encapsulates the parallel blocks and allows to reuse all current operators. suspend fun <E, R> ReceiveChannel<E>.parallel(
parallelism: Int,
block: suspend ProducerScope<R>.(ReceiveChannel<E>) -> Unit
): ReceiveChannel<R> or suspend fun <E, R> ReceiveChannel<E>.parallel(
parallelism: Int,
block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R> |
I take some time to expose my previous message. The idea behind is to use a regular fork/join strategy, fork and join using Multiple coroutines receive items from a single source suspend fun <E, R> ReceiveChannel<E>.pipelines(
parallelism: Int,
block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R>
val ids: ReceiveChannel<Int> = loadIds()
val largeItem = ids
.pipelines(5) {
map { loadItem(it) }
.filter { it.active }
}
.maxBy { it.size }
} Unfortunately using this syntax is difficult consume data in parallel, ie So an alternative syntax can be: suspend fun <E, R> ReceiveChannel<E>.fork(
parallelism: Int,
block: suspend (ReceiveChannel<E>) -> R
): List<R>
val largeItem = ids
.fork(5) {
it.map { loadItem(it) }
.filter { it.active }
.maxBy { it.size }
}
.filterNotNull()
.maxBy { it.size } Obviously consuming items in the I suspect that both operators are useful. |
I want to bump this issue. |
A potential implementation of suspend inline fun <E> ReceiveChannel<E>.consumeEach(
maxConcurrency: Int,
initialConcurrency: Int = 10,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline action: suspend (E) -> Unit
) =
withContext(coroutineContext) {
if (maxConcurrency <= 0)
if (initialConcurrency > maxConcurrency)
throw IllegalArgumentException("initialConcurrency must be less than or equal to maxConcurrency")
else if (initialConcurrency < 0)
throw IllegalArgumentException("Can not have a negative initialConcurrency")
val busy = AtomicInteger(0)
val workers = MutableList(min(maxConcurrency, initialConcurrency)) {
launch {
while (isActive && !(isClosedForReceive && isEmpty)) {
busy.incrementAndGet()
action(this@consumeEach.receive())
busy.decrementAndGet()
}
}
}
if (maxConcurrency > initialConcurrency || maxConcurrency <= 0) {
while (isActive && !(isClosedForReceive && isEmpty) && (workers.size < maxConcurrency || maxConcurrency <= 0)) {
if (busy.get() == workers.size) {
val received = receive()
workers += launch {
busy.incrementAndGet()
action(received)
busy.decrementAndGet()
while (isActive && !(isClosedForReceive && isEmpty)) {
busy.incrementAndGet()
action(this@consumeEach.receive())
busy.decrementAndGet()
}
}
}
delay(10)
}
}
workers.joinAll()
} I really dislike that while loop to check sizes. It may be possible to do some kind of fake- Either way, it shouldn't be to terrible as it quits once the spin up is done, and will often be waiting on I'm also not sure if the |
This pattern is common enough even outside of actors (e.g. make a lot of web requests, but only have 10 going at a time) that it seems like it might be worth having a separate api for launching Something like: coroutineScope{
limitedConcurrency(concurrency = 10){
(1..100).forEach{
launch{ doThing() }
}
}
} Only 10 Where any |
I agree, coming from RxJava I really wish there was something like flatMap() with maxConcurrency without requiring channels |
It's been quite long without updates, is it somewhere on the roadmap? |
How does this work for handling errors that occur? If you have something one of the transformations fail does there need to be a means of stopping the other transformations and if so how? Or maybe this is not matter since that should be a part of the transformation block |
actor
builder should natively support "worker pool" pattern via an additional optional parameterparallelismconcurrency
that defaults to1
, so that to you if you have a list of of some requests, then can be all processed concurrently with a specified limited concurrency with a simple code like this:This particular pattern seems to be quite common, with requests being stored in either a list of requests of receive from some other channel, so the proposal is to add concurrency to
map
, andcosumeEach
, too, to be able to write something like:UPDATE: We will consistently call it
concurrency
here. We can have dozens of concurrent coroutines which run on a single CPU core. We will reserve the nameparallelism
to denote limits on the number of CPU cores that are used.The text was updated successfully, but these errors were encountered: