From 14bc9f1cac531cc94f5a371e3322560593dffaee Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Tue, 10 May 2022 14:21:19 -0700 Subject: [PATCH 1/5] Add a Flow.iterate method, allowing an Iterator-style traversal of flows. --- .../common/src/flow/terminal/Iterate.kt | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt new file mode 100644 index 0000000000..1001d6c62b --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* + +/** + * Iterator for [Flow]. Instances of this interface are only usable within calls to `flow.iterate`. + * They are not thread-safe and should not be used from concurrent coroutines. + */ +interface FlowIterator { + /** + * Returns `true` if there is another element in the flow, or `false` if the flow completes normally. + * If the flow fails exceptionally, throws that exception. + * + * This function suspends until the backing flow either emits an element or completes. + */ + operator suspend fun hasNext(): Boolean + + /** + * Returns the next element in the flow, or throws `NoSuchElementException` if the flow completed normally without + * emitting another element. If the flow failed exceptionally, throws that exception. + * + * This function does not suspend if `hasNext()` has already been called since the last call to `next`. + * Otherwise, it suspends until the backing flow either emits an element or completes. + */ + operator suspend fun next(): T +} + +/** + * Collects this flow, allowing it to be iterated through one element at a time. For example, + * instead of writing + * ``` + * var even = true + * flow.collect { + * if (even) { + * handleEven(it) + * } else { + * handleOdd(it) + * } + * even = !even + * } + * ``` + * + * you might write + * ``` + * flow.iterate { iter -> + * while (iter.hasNext()) { + * handleEven(iter.next()) + * if (!iter.hasNext()) break + * handleOdd(iter.next()) + * } + * } + * ``` + * + * Flow collection is cancelled as soon as [block] returns a value: + * ``` + * suspend fun Flow.all(predicate: (T) -> Boolean): Boolean = iterate { iter -> + * while (iter.hasNext()) { + * if (!predicate(iter.next())) return@iterate false // stops collecting the flow + * } + * true + * } + * ``` + * + * The `FlowIterator` available to [block] is only usable within [block], and has undefined behavior + * if used anywhere outside [block]. Additionally, the `FlowIterator` cannot be used concurrently + * by multiple coroutines. + */ +suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutineScope { + val channel = Channel>>(capacity = 1) + val collectorJob = launch { + var cont = channel.receive() + onCompletion { thrown -> + cont.resume(ChannelResult.closed(thrown)) + }.collect { elem -> + cont.resume(ChannelResult.success(elem)) + cont = channel.receive() + } + } + var usable = true + val itr = object : FlowIterator { + private var next = ChannelResult.failure() + + private fun checkValid() { + check(usable) { "FlowIterator is only usable within the body of the corresponding iterate call" } + } + + override suspend fun hasNext(): Boolean { + checkValid() + if (next.isFailure && !next.isClosed) { + next = suspendCancellableCoroutine { cont -> + channel + .trySend(cont) + .onFailure { + throw AssertionError( + "Unexpected behavior in iterate. Perhaps the iterator is being used" + + " concurrently, which is unsupported.", + it + ) + } + } + } + + // next should never be failed now + return if (next.isSuccess) { + true + } else { + val ex = next.exceptionOrNull() + if (ex == null) { + false + } else { + throw ex + } + } + } + + override suspend fun next(): T { + if (!hasNext()) { + throw NoSuchElementException("No next element") + } + return try { + next.getOrThrow() + } finally { + next = ChannelResult.failure() + } + } + } + try { + block(itr) + } finally { + usable = false + collectorJob.cancel(CancellationException("early return from Flow.iterate")) + // we don't actually want to close the channel, just let it die from leaving scope + } +} \ No newline at end of file From 5071746c874eb26ec71b5e9d592e4118458fae66 Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Tue, 10 May 2022 14:22:46 -0700 Subject: [PATCH 2/5] Inline checkValid. --- kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt index 1001d6c62b..3ebd64972e 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt @@ -87,12 +87,8 @@ suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutin val itr = object : FlowIterator { private var next = ChannelResult.failure() - private fun checkValid() { - check(usable) { "FlowIterator is only usable within the body of the corresponding iterate call" } - } - override suspend fun hasNext(): Boolean { - checkValid() + check(usable) { "FlowIterator is only usable within the body of the corresponding iterate call" } if (next.isFailure && !next.isClosed) { next = suspendCancellableCoroutine { cont -> channel From 392d76d6d9edcd205d225e79825d4dda59f46b61 Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Mon, 23 May 2022 11:32:00 -0700 Subject: [PATCH 3/5] Convert Flow.iterate to a continuation-based style. --- .../common/src/flow/terminal/Iterate.kt | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt index 3ebd64972e..860f35ca9e 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt @@ -73,34 +73,40 @@ interface FlowIterator { * by multiple coroutines. */ suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutineScope { - val channel = Channel>>(capacity = 1) + // Instead of a channel-based approach, we pass continuations back and forth between the collector and the + // iterator. + + val firstCont = CompletableDeferred>>() val collectorJob = launch { - var cont = channel.receive() + var tokenCont = firstCont.await() onCompletion { thrown -> - cont.resume(ChannelResult.closed(thrown)) + suspendCancellableContinuation { collectionCont -> + tokenCont.resume(ContToken(ChannelResult.closed(thrown), collectionCont)) + } }.collect { elem -> - cont.resume(ChannelResult.success(elem)) - cont = channel.receive() + tokenCont = suspendCancellableContinuation { collectionCont -> + tokenCont.resume(ContToken(ChannelResult.success(elem), collectionCont)) + } } } var usable = true val itr = object : FlowIterator { private var next = ChannelResult.failure() + private var collectionCont: CancellableContinuation>? = null override suspend fun hasNext(): Boolean { check(usable) { "FlowIterator is only usable within the body of the corresponding iterate call" } if (next.isFailure && !next.isClosed) { - next = suspendCancellableCoroutine { cont -> - channel - .trySend(cont) - .onFailure { - throw AssertionError( - "Unexpected behavior in iterate. Perhaps the iterator is being used" + - " concurrently, which is unsupported.", - it - ) - } + val (theNext, theCollectionCont) = suspendCancellableCoroutine { tokenCont -> + // collectorJob is waiting for tokenCont. Pass tokenCont to it and suspend until it replies + // with a ChannelResult/element-or-termination and a continuation. + when (val theCollectionCont = collectionCont) { + null -> firstCont.complete(tokenCont) + else -> theCollectionCont.resume(tokenCont) + } } + next = theNext + collectionCont = theCollectionCont } // next should never be failed now @@ -120,11 +126,7 @@ suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutin if (!hasNext()) { throw NoSuchElementException("No next element") } - return try { - next.getOrThrow() - } finally { - next = ChannelResult.failure() - } + return next.getOrThrow().also { next = ChannelResult.failure()} } } try { @@ -134,4 +136,6 @@ suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutin collectorJob.cancel(CancellationException("early return from Flow.iterate")) // we don't actually want to close the channel, just let it die from leaving scope } -} \ No newline at end of file +} + +private class ContToken(val nextValue: ChannelResult, val resumption: CancellableContinuation>>) \ No newline at end of file From e1c3bfdf157967d37502d2b9efe2dcb0b3c93a56 Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Mon, 23 May 2022 16:34:54 -0700 Subject: [PATCH 4/5] Simplify iterate. Add a first test. --- .../common/src/flow/terminal/Iterate.kt | 59 +++++++++++-------- .../common/test/flow/terminal/IterateTest.kt | 27 +++++++++ 2 files changed, 61 insertions(+), 25 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt index 860f35ca9e..723a59567d 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt @@ -7,20 +7,23 @@ package kotlinx.coroutines.flow +import kotlin.coroutines.* +import kotlin.jvm.* import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* /** * Iterator for [Flow]. Instances of this interface are only usable within calls to `flow.iterate`. * They are not thread-safe and should not be used from concurrent coroutines. */ -interface FlowIterator { +public interface FlowIterator { /** * Returns `true` if there is another element in the flow, or `false` if the flow completes normally. * If the flow fails exceptionally, throws that exception. * * This function suspends until the backing flow either emits an element or completes. */ - operator suspend fun hasNext(): Boolean + public operator suspend fun hasNext(): Boolean /** * Returns the next element in the flow, or throws `NoSuchElementException` if the flow completed normally without @@ -29,7 +32,7 @@ interface FlowIterator { * This function does not suspend if `hasNext()` has already been called since the last call to `next`. * Otherwise, it suspends until the backing flow either emits an element or completes. */ - operator suspend fun next(): T + public operator suspend fun next(): T } /** @@ -72,36 +75,38 @@ interface FlowIterator { * if used anywhere outside [block]. Additionally, the `FlowIterator` cannot be used concurrently * by multiple coroutines. */ -suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutineScope { +public suspend fun Flow.iterate(block: suspend FlowIterator.() -> R): R = coroutineScope { // Instead of a channel-based approach, we pass continuations back and forth between the collector and the // iterator. - - val firstCont = CompletableDeferred>>() - val collectorJob = launch { - var tokenCont = firstCont.await() - onCompletion { thrown -> - suspendCancellableContinuation { collectionCont -> - tokenCont.resume(ContToken(ChannelResult.closed(thrown), collectionCont)) - } - }.collect { elem -> - tokenCont = suspendCancellableContinuation { collectionCont -> - tokenCont.resume(ContToken(ChannelResult.success(elem), collectionCont)) - } - } - } var usable = true val itr = object : FlowIterator { private var next = ChannelResult.failure() - private var collectionCont: CancellableContinuation>? = null + private var collectionCont: CancellableContinuation>>? = null + var collectorJob: Job? = null override suspend fun hasNext(): Boolean { - check(usable) { "FlowIterator is only usable within the body of the corresponding iterate call" } + check(usable) { "FlowIterator is only usable ithin the body of the corresponding iterate call" } if (next.isFailure && !next.isClosed) { - val (theNext, theCollectionCont) = suspendCancellableCoroutine { tokenCont -> + val (theNext, theCollectionCont) = suspendCancellableCoroutine> { tokenCont -> // collectorJob is waiting for tokenCont. Pass tokenCont to it and suspend until it replies // with a ChannelResult/element-or-termination and a continuation. + when (val theCollectionCont = collectionCont) { - null -> firstCont.complete(tokenCont) + null -> { + collectorJob = launch { + var currentTokenCont = tokenCont + onCompletion { thrown -> + // should never get used + currentTokenCont = suspendCancellableCoroutine { collectionCont -> + currentTokenCont.resume(ContToken(ChannelResult.closed(thrown), collectionCont)) + } + }.collect { elem -> + currentTokenCont = suspendCancellableCoroutine { collectionCont -> + currentTokenCont.resume(ContToken(ChannelResult.success(elem), collectionCont)) + } + } + } + } else -> theCollectionCont.resume(tokenCont) } } @@ -126,16 +131,20 @@ suspend fun Flow.iterate(block: FlowIterator.() -> R): R = coroutin if (!hasNext()) { throw NoSuchElementException("No next element") } - return next.getOrThrow().also { next = ChannelResult.failure()} + return next.getOrThrow().also { next = ChannelResult.failure() } } } try { block(itr) } finally { usable = false - collectorJob.cancel(CancellationException("early return from Flow.iterate")) + itr.collectorJob?.cancel(CancellationException("early return from Flow.iterate")) // we don't actually want to close the channel, just let it die from leaving scope } } -private class ContToken(val nextValue: ChannelResult, val resumption: CancellableContinuation>>) \ No newline at end of file +/** Pair of a [ChannelResult] and a continuation of a continuation. */ +private data class ContToken( + val nextValue: ChannelResult, + val resumption: CancellableContinuation>> +) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt new file mode 100644 index 0000000000..2acd0789f7 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class IterateTest : TestBase() { + @Test + fun testIterateToList() = runTest { + val flow = flow { + emit(1) + emit(2) + } + val list = flow.iterate { + val mutableList = mutableListOf() + while (hasNext()) { + mutableList.add(next()) + } + mutableList + } + assertEquals(listOf(1, 2), list) + } +} From 18acc1c7a0f455765d425cee9563c04ce4387949 Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Tue, 24 May 2022 12:52:16 -0700 Subject: [PATCH 5/5] Add some tests. --- .../common/src/flow/terminal/Iterate.kt | 67 +++++++++++++------ .../common/test/flow/terminal/IterateTest.kt | 41 ++++++++++++ 2 files changed, 86 insertions(+), 22 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt index 723a59567d..83154e7367 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Iterate.kt @@ -77,36 +77,35 @@ public interface FlowIterator { */ public suspend fun Flow.iterate(block: suspend FlowIterator.() -> R): R = coroutineScope { // Instead of a channel-based approach, we pass continuations back and forth between the collector and the - // iterator. + // iterator. This requires some nested continuations, but probably improves performance. var usable = true val itr = object : FlowIterator { + /** + * A pure failure indicates the next value hasn't been determined yet. + */ private var next = ChannelResult.failure() + + /** + * The continuation to use to resume collection, passing it the continuation to resume the iterator + * functions after the next element (or closure) is ready. + */ private var collectionCont: CancellableContinuation>>? = null var collectorJob: Job? = null + private var iteratorJob: Job? = null override suspend fun hasNext(): Boolean { check(usable) { "FlowIterator is only usable ithin the body of the corresponding iterate call" } if (next.isFailure && !next.isClosed) { + if (iteratorJob == null) { + iteratorJob = coroutineContext[Job] + } else { + check(iteratorJob === coroutineContext[Job]) { + "FlowIterator is not thread-safe and cannot be used from multiple coroutines." + } + } val (theNext, theCollectionCont) = suspendCancellableCoroutine> { tokenCont -> - // collectorJob is waiting for tokenCont. Pass tokenCont to it and suspend until it replies - // with a ChannelResult/element-or-termination and a continuation. - when (val theCollectionCont = collectionCont) { - null -> { - collectorJob = launch { - var currentTokenCont = tokenCont - onCompletion { thrown -> - // should never get used - currentTokenCont = suspendCancellableCoroutine { collectionCont -> - currentTokenCont.resume(ContToken(ChannelResult.closed(thrown), collectionCont)) - } - }.collect { elem -> - currentTokenCont = suspendCancellableCoroutine { collectionCont -> - currentTokenCont.resume(ContToken(ChannelResult.success(elem), collectionCont)) - } - } - } - } + null -> collectorJob = launch { doCollect(tokenCont) } else -> theCollectionCont.resume(tokenCont) } } @@ -114,10 +113,10 @@ public suspend fun Flow.iterate(block: suspend FlowIterator.() -> R collectionCont = theCollectionCont } - // next should never be failed now return if (next.isSuccess) { true } else { + // assert(next.isClosed) val ex = next.exceptionOrNull() if (ex == null) { false @@ -127,18 +126,42 @@ public suspend fun Flow.iterate(block: suspend FlowIterator.() -> R } } + private suspend fun doCollect(firstTokenCont: CancellableContinuation>) { + var tokenCont = firstTokenCont + onCompletion { thrown -> + if (thrown !is CancellationException) { + // should never get used + tokenCont = suspendCancellableCoroutine { collectionCont -> + tokenCont.resume( + ContToken( + ChannelResult.closed(thrown), + collectionCont + ) + ) + } + } + }.collect { elem -> + tokenCont = suspendCancellableCoroutine { collectionCont -> + tokenCont.resume(ContToken(ChannelResult.success(elem), collectionCont)) + } + } + } + override suspend fun next(): T { if (!hasNext()) { throw NoSuchElementException("No next element") } - return next.getOrThrow().also { next = ChannelResult.failure() } + // getOrThrow should always succeed at this point + return next.getOrThrow().also { + next = ChannelResult.failure() + } } } try { block(itr) } finally { usable = false - itr.collectorJob?.cancel(CancellationException("early return from Flow.iterate")) + itr.collectorJob?.cancel("early return from Flow.iterate") // we don't actually want to close the channel, just let it die from leaving scope } } diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt index 2acd0789f7..ff9268dd58 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/IterateTest.kt @@ -24,4 +24,45 @@ class IterateTest : TestBase() { } assertEquals(listOf(1, 2), list) } + + @Test + fun testCancelsAfterDone() = runTest { + val flow = flow { + emit(1) + error("Should not be executed") + } + val result = flow.iterate { next() } + assertEquals(1, result) + } + + @Test + fun testDoesNotRace() = runTest { + val flow = flow { + emit(1) + error("Should not be executed") + } + val result = flow.iterate { + next().also { + yield() + // not obvious if this results in a deterministic test? + // advanceUntilIdle would make this clearly deterministic + } + } + assertEquals(1, result) + } + + @Test + fun testBackingFlowFailure() = runTest { + val flow = flow { + emit(1) + throw IllegalStateException() + } + assertFailsWith { + flow.iterate { + while (hasNext()) { + next() + } + } + } + } }