Skip to content

Propose shortcut for Iterable<Deferred<T>> → Flow<T> e.g. map { it::await.asFlow() }.merge() #2752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jablko opened this issue Jun 8, 2021 · 5 comments
Labels

Comments

@jablko
Copy link

jablko commented Jun 8, 2021

What do you think about adding the following to the library:

/**
 * Creates a flow from a collection of deferreds. Elements are emitted in the order they complete.
 */
fun <T> Iterable<Deferred<T>>.race(): Flow<T> = map { it::await.asFlow() }.merge()

Call it:

Currently I use deferreds.map { it::await.asFlow() }.merge().filterNotNull().firstOrNull() to get the first in a collection of deferreds to complete with a non-null value.

The proposed shortcut isn't much shorter, but:

  • I think a shortcut for Iterable<Deferred<T>>Flow<T> has general utility
  • it's analogous to other libraries, i.e. Python concurrent.futures.as_completed(), Promise.race()
  • I think naming this idiom (map { it::await.asFlow() }.merge()) makes it clearer and easier to find and use?

Unless there's already a better way of doing the same thing?

Previously proposed during the discussion of #171

@lowasser
Copy link
Contributor

lowasser commented Jun 8, 2021

Why not do this as

channelFlow {
  for (deferred in this) {
    launch { send(deferred.await()) }
  }
}

@jablko
Copy link
Author

jablko commented Jun 8, 2021

Sounds good, I don't know the merits of map { it::await.asFlow() }.merge() vs. channelFlow { ... }, presumably the latter is more efficient?

Another point in favor of a canonical shortcut?

@qwwdfsad qwwdfsad added the flow label Jun 11, 2021
@qwwdfsad
Copy link
Collaborator

Could you please elaborate on the usage of such API? E.g. what exact problem you are solving with that?
Could be the case that #1147 or #172 are more relevant

@pacher
Copy link

pacher commented Jun 12, 2021

If it gets implemented, I think more idiomatic would be to call it awaitAny() to complement existing awaitAll().

@jablko
Copy link
Author

jablko commented Jun 12, 2021

Could you please elaborate on the usage of such API? E.g. what exact problem you are solving with that?

I have a collection of deferreds and I need to get the first one to complete with a non-null value. Backing up, I start with a collection of objects (tax forms) which I evaluate concurrently and I need to get the first occurrence of a specified tax line. Each form returns null if that line wasn't found.

My code looks something like the following (eliding most of it):

class EvalVisitor(file: File, val scope: CoroutineScope) {
  // ...

  // Look up a single line number
  fun FormParser.RangesContext.single(): Deferred<Any?>? {
    val (line, part, form) = visit(this).single()
    // ...
    // First occurrence in the listed forms of the specified line number
    return listOf(yourReturn, schedule1, form428).get(line, part)
  }

  fun Iterable<EvalVisitor?>.get(line: String, part: String? = null): Deferred<Any?> {
    // Evaluate each form concurrently
    val bound = mapNotNull { it?.get(line, part, this@EvalVisitor) }
    // First non-null result
    return scope.async { bound.map { it::await.asFlow() }.merge().filterNotNull().firstOrNull() }
  }

  fun get(line: String, part: String?, caller: EvalVisitor): Deferred<Any?>? {
    // ...
  }
}

In the absence of a shortcut I use bound.map { it::await.asFlow() }.merge().filterNotNull().firstOrNull() to get the first deferred to complete with a non-null value, or alternatively:

  channelFlow {
        for (deferred in bound) {
          launch { send(deferred.await()) }
        }
      }
      .filterNotNull()
      .firstOrNull()

or

  channelFlow {
        for (deferred in bound) {
          launch { deferred.await()?.let(::send) }
        }
      }
      .firstOrNull()

I propose e.g. bound.race().filterNotNull().firstOrNull() modulo what you name the method.

Could be the case that #1147 or #172 are more relevant

Ultimately I start with an Iterable<EvalVisitor?> (list of forms) and I need a Deferred<Any?> (first non-null result). Making use of flows in the middle seemed like the right approach because composable operators (.filterNotNull() and .firstOrNull()) already exist (one could use deferreds.race().firstOrNull(predicate) to get the first deferred to satisfy a generic predicate if needed).

Keeping with flows, I could introduce them before concurrently evaluating the forms vs. after (flowOf(yourReturn, schedule1, form428) vs. map { it::await.asFlow() }), however I don't think this adds any clarity or efficiency?

  flowOf(yourReturn, schedule1, form428)
      // Like in https://github.com/Kotlin/kotlinx.coroutines/issues/1147
      .concurrent { flow -> flow.mapNotNull { it?.get(line, part, this@EvalVisitor) } }
      .merge()
      .filterNotNull()
      .firstOrNull()

vs.

  listOf(yourReturn, schedule1, form428)
      .mapNotNull { it?.get(line, part, this@EvalVisitor) }
      .race()
      .filterNotNull()
      .firstOrNull()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants