-
Notifications
You must be signed in to change notification settings - Fork 1.9k
awaitFirst
function
#424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
POC combined with #410 fun main(args: Array<String>) = runBlocking {
val result = job {
val fetch1 = fork { longRunJob(1) }
val fetch2 = fork { longRunJob(2) }
listOf(fetch1, fetch2).awaitFirst()
}
println("The winner is $result")
}
suspend fun <R> job(parent: Job? = null, block: suspend Job.() -> R): R {
val scope = Job(parent)
try {
return scope.block()
} finally {
scope.cancel()
}
}
suspend fun <R> Job.fork(block: suspend () -> R): Deferred<R> = async(parent = this) { block() }
suspend fun longRunJob(i: Int): Int = i // TODO |
I think it lacks use-cases and is not so reliable: who will be responsible for checking the rest of jobs for exceptions or cancellations? Value of such extension in the real-world application is unclear as well. I was never satisfied with Let's see whether there is demand for such extension though |
Hi @qwwdfsad This is more generic, so Here a POC: fun <E : Job> Iterable<E>.joinChannel(): ReceiveChannel<E> = JoinChannel(this)
private class JoinChannel<E : Job>(jobs: Iterable<E>) : LinkedListChannel<E>() {
/**
* Pending jobs count. May be negative in initialization phase
*/
val _pendingJobCount = atomic(0)
@Volatile
var _disposableHandles: List<DisposableHandle>?
init {
val disposableHandles =
jobs.mapNotNull { job ->
if (job.isCompleted) {
offer(job)
null
} else {
job.invokeOnCompletion { onJobCompletion(job) }
}
}
if (disposableHandles.isEmpty()) {
_disposableHandles = null
close()
} else {
_disposableHandles = disposableHandles
val pending = _pendingJobCount.addAndGet(disposableHandles.size)
if (pending == 0) close()
}
}
fun onJobCompletion(job: E) {
try {
offer(job)
val pending = _pendingJobCount.decrementAndGet()
if (pending == 0) {
_disposableHandles = null
close()
}
} catch (_: Exception) {
// ignore exception if output channel is closed
}
}
override fun cleanupSendQueueOnCancel() {
_disposableHandles?.apply {
_disposableHandles = null
forEach(DisposableHandle::dispose)
}
}
} |
Could you please elaborate why does one need this operator? |
Good catch @qwwdfsad
Should I do it? I prefer a different approach for this problem, ie using a regular map/reduce. suspend fun build(vararg parts: Part): Result =
parts
.map { async { buildPart(it) } }
.joinChannel()
.map { it.await() }
.reduce { acc, result -> acc + result } It is possible to implement this function using something like |
I document my "use-case needed" as future reference. I have to download a page from n hosts (often two or three), the page is the same so I can use equally a random one, possible the fastest to retrieve. I shifted the timeout problem outside, so I have to choose only the fastest page. A solution is: fun main(args: Array<String>) = runBlocking {
val job = Job(coroutineContext[Job])
try {
val context = coroutineContext + job
val loaders =
args
.mapAsync(context) { fetch(it) }
.toMutableList()
while (loaders.isNotEmpty()) {
val data: String? = select {
for (verifier in loaders) {
verifier.onJoin {
loaders.remove(verifier)
if (verifier.isCompletedExceptionally) null
else verifier.getCompleted()
}
}
}
if (data != null) println("Found $data")
}
println("Data not founds")
} finally {
job.cancel()
}
}
fun <T, R> Array<out T>.mapAsync(context: CoroutineContext, transform: suspend CoroutineScope.(T) -> R): List<Deferred<R>> =
map { async(context) { transform(it) } } using fun main(args: Array<String>) = runBlocking {
val job = Job(coroutineContext[Job])
try {
val context = coroutineContext + job
val loaders = args.mapAsync(context) { fetch(it) }
val data = loaders
.joinChannel()
.filterNotNull()
.firstOrNull()
if (data != null) println("Found $data")
else println("Data not founds")
} finally {
job.cancel()
}
}
If you have a better way to solve this problem, please give me a tip. |
How about using
|
Hi @elizarov suspend fun <E : Job> Iterable<E>.joinFirst(): E = select {
for (job in this@joinFirst) {
job.onJoin { job }
}
}
suspend fun <E : Deferred<R>, R> Iterable<E>.awaitFirst(): R = joinFirst().getCompleted() I rethinked my solution and my first proposal is inefficient for my own use case. So my proposal is something like a I suspect that this issue is related to #172, but I have to work with already created |
@fvasco So, you are basically looking for the following extension:
This implementation is |
Yes @elizarov, it is correct. I solved my job in this way, five days ago I wrote a really similar ad hoc implementation, I named Eleven days ago I proposed an untested POC with complexity However I consider this issue really linked to #172 My recent question #172 (comment) is about the link between this issues. val urls: List<String> = TODO()
val data = urls
.asReceiveChannel()
.mapNotNull(parallelism = urls.size, preserveOrder = false) { fetch(it) }
.first() |
@fvasco Thanks. It is indeed related. I wonder if we should really blow up a list of parameters for all the different operators or, maybe, provide a single extension for that you can write something like:
|
Good, Dart's Future have the wait any and wait all methods: And Dart's async*, yeild*, await for are far more fancy ergonomics : for instance, in Dart, await for (var request in requestServer) {
handleRequest(request);
} maybe equivlent in Kotlin: for (var request.await() in requestServer) {
handleRequest(request)
} Or like in Scala: for (var request in requestServer) {
val x = handleRequest(request);
} yield x |
@SolomonSun2010 We've designed Kotlin coroutines for more ergonomics. In Kotlin these examples look like this:
It is cleaner and with less boiler-plate in Kotlin (no need to explicitly write |
Most / many people treat Kotlin as the Better Java. I appreciate Kotlin 's ergonomics focus spirit. |
Closing as obsolete |
Core library misses of an
Promise.race
equivalent function.I propose a little POC of
joinFirst
/awaitFirst
, or a more genericselectFirst
.What do you think?
The text was updated successfully, but these errors were encountered: