Skip to content

Flow.take() of outer flow not respected with presence of take() on inner flow #1610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
bnorm opened this issue Oct 11, 2019 · 0 comments
Closed
Assignees

Comments

@bnorm
Copy link
Member

bnorm commented Oct 11, 2019

Example code for 1.3.2:

suspend fun main() {
  println(flow<Int> {
    repeat(4) { i ->
      emitAll(flow {
        repeat(4) { j ->
          emit(i * 10 + j)
        }
      }.take(2))
    }
  }.take(4).toList().joinToString(","))
}

Should print 0,1,10,11 but actually prints 0,1,10,11,20,21,30,31. Changing the outer take(4) to take(1) produces 0,10,11,20,21,30,31.

I believe the AbortFlowException of the outer take(4) is being swallowed by the inner take(2). I rewrote the take operator locally and had it track its specific AbortFlowException and rethrow if it did not match and that corrected the problem.

fun <T> Flow<T>.take(count: Int): Flow<T> {
  require(count > 0) { "Requested element count $count should be positive" }
  return flow {
    // Creating the exception eagerly is obviously expensive but it was easy for testing
    val ours = AbortFlowException()
    var consumed = 0
    try {
      collect { value ->
        emit(value)
        if (++consumed == count) {
          throw ours
        }
      }
    } catch (e: AbortFlowException) {
      if (e != ours) throw e
    }
  }
}

There might be errors when combining other operators that use AbortFlowException as it seems to be a common pattern to just ignore this exception when thrown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants