diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index dc3b709a5f..2c37e24162 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* +import kotlin.coroutines.* import kotlin.jvm.* import kotlinx.coroutines.flow.flow as safeFlow import kotlinx.coroutines.flow.internal.unsafeFlow as flow @@ -133,5 +134,7 @@ internal suspend inline fun Flow.collectWhile(crossinline predicate: susp collect(collector) } catch (e: AbortFlowException) { e.checkOwnership(collector) + // The task might have been cancelled before AbortFlowException was thrown. + coroutineContext.ensureActive() } } diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index 74336262b8..cdb36bd8f8 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -2,9 +2,11 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineStart.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlin.test.* +import kotlin.time.* class FirstTest : TestBase() { @Test @@ -173,4 +175,21 @@ class FirstTest : TestBase() { assertFailsWith { flow.first() } } -} + + @Test + fun testFirstThrowOnCancellation() = runTest { + val job = launch(start = UNDISPATCHED) { + flow { + try { + emit(Unit) + } finally { + runCatching { yield() } + finish(2) + } + }.first() + expectUnreached() + } + expect(1) + job.cancel() + } +} \ No newline at end of file