From 3b7e256c80a2029a2e63460635ea5c66a3664be9 Mon Sep 17 00:00:00 2001 From: Francesco Tescari Date: Mon, 6 Jan 2025 16:36:59 +0100 Subject: [PATCH 1/5] fix: rethrow CE when calling Flow.stateIn on cancelled scope --- .../common/src/flow/operators/Share.kt | 2 +- .../common/test/flow/sharing/StateInTest.kt | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index 5ab761a16e..299b266b90 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -320,7 +320,7 @@ public fun Flow.stateIn( */ public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow { val config = configureSharing(1) - val result = CompletableDeferred>() + val result = CompletableDeferred>(scope.coroutineContext[Job]) scope.launchSharingDeferred(config.context, config.upstream, result) return result.await() } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index 75a5e4b972..c0bc439f51 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlin.coroutines.* import kotlin.test.* /** @@ -88,4 +89,13 @@ class StateInTest : TestBase() { fun testSubscriptionByFirstSuspensionInStateFlow() = runTest { testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest)) { } } + + @Test + fun testRethrowsCEOnCancelledScope() = runTest { + val cancelledScope = CoroutineScope(EmptyCoroutineContext).apply { cancel("CancelMessageToken") } + val flow = flowOf(1, 2, 3) + assertFailsWith("CancelMessageToken") { + flow.stateIn(cancelledScope) + } + } } From 80b114c005bcfb222fec47eb23ed87f37bef8cdf Mon Sep 17 00:00:00 2001 From: Francesco Tescari Date: Sat, 11 Jan 2025 00:12:15 +0100 Subject: [PATCH 2/5] fix: throw NoSuchElementException on empty flow --- .../common/src/flow/operators/Share.kt | 4 ++++ .../common/test/flow/sharing/StateInTest.kt | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index 299b266b90..a69846ee57 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -317,6 +317,7 @@ public fun Flow.stateIn( * with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows. * * @param scope the coroutine scope in which sharing is started. + * @throws NoSuchElementException if the upstream flow does not emit any value. */ public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow { val config = configureSharing(1) @@ -340,6 +341,9 @@ private fun CoroutineScope.launchSharingDeferred( } } } + if (state == null) { + result.completeExceptionally(NoSuchElementException("Flow is empty")) + } } catch (e: Throwable) { // Notify the waiter that the flow has failed result.completeExceptionally(e) diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index c0bc439f51..a8f4de42a2 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -98,4 +98,12 @@ class StateInTest : TestBase() { flow.stateIn(cancelledScope) } } + + @Test + fun testThrowsNoSuchElementExceptionOnEmptyFlow() = runTest { + val flow = emptyFlow() + assertFailsWith { + flow.stateIn(GlobalScope) + } + } } From 67a2166f7e68217a57c9492737786956610f8d4d Mon Sep 17 00:00:00 2001 From: Francesco Tescari Date: Tue, 11 Feb 2025 20:59:18 +0100 Subject: [PATCH 3/5] fix: avoid cancelling scope on empty flow --- .../common/src/flow/operators/Share.kt | 10 +++++----- .../common/test/flow/sharing/StateInTest.kt | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index a69846ee57..da656f8307 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -321,15 +321,15 @@ public fun Flow.stateIn( */ public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow { val config = configureSharing(1) - val result = CompletableDeferred>(scope.coroutineContext[Job]) + val result = CompletableDeferred>>(scope.coroutineContext[Job]) scope.launchSharingDeferred(config.context, config.upstream, result) - return result.await() + return result.await().getOrThrow() } private fun CoroutineScope.launchSharingDeferred( context: CoroutineContext, upstream: Flow, - result: CompletableDeferred> + result: CompletableDeferred>>, ) { launch(context) { try { @@ -337,12 +337,12 @@ private fun CoroutineScope.launchSharingDeferred( upstream.collect { value -> state?.let { it.value = value } ?: run { state = MutableStateFlow(value).also { - result.complete(ReadonlyStateFlow(it, coroutineContext.job)) + result.complete(Result.success(ReadonlyStateFlow(it, coroutineContext.job))) } } } if (state == null) { - result.completeExceptionally(NoSuchElementException("Flow is empty")) + result.complete(Result.failure(NoSuchElementException("Flow is empty"))) } } catch (e: Throwable) { // Notify the waiter that the flow has failed diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index a8f4de42a2..bb2139c4a8 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -103,7 +103,9 @@ class StateInTest : TestBase() { fun testThrowsNoSuchElementExceptionOnEmptyFlow() = runTest { val flow = emptyFlow() assertFailsWith { - flow.stateIn(GlobalScope) + flow.stateIn(this) } + // Ensure that the collecting scope is not cancelled by the NoSuchElementException + assertEquals(true, coroutineContext[Job]?.isActive) } } From bcb1e14c2e9c20c903628ac9564589d59a96a323 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 12 Feb 2025 13:32:08 +0100 Subject: [PATCH 4/5] Update kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt --- kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index bb2139c4a8..c187bae0d8 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -106,6 +106,6 @@ class StateInTest : TestBase() { flow.stateIn(this) } // Ensure that the collecting scope is not cancelled by the NoSuchElementException - assertEquals(true, coroutineContext[Job]?.isActive) + assertTrue(coroutineContext[Job]?.isActive) } } From ee11623935f80b439ca3eee1e63803ee4cc0b969 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:17:12 +0100 Subject: [PATCH 5/5] Undo the incorrect edit --- kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index c187bae0d8..bb2139c4a8 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -106,6 +106,6 @@ class StateInTest : TestBase() { flow.stateIn(this) } // Ensure that the collecting scope is not cancelled by the NoSuchElementException - assertTrue(coroutineContext[Job]?.isActive) + assertEquals(true, coroutineContext[Job]?.isActive) } }