diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 04a4e99c9a..76a0bed462 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -746,10 +746,10 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli public final class kotlinx/coroutines/channels/ProduceKt { public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; - public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; + public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; + public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; } public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/channels/SendChannel { diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 26bd544811..24fd399bb7 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -115,6 +115,7 @@ public fun CoroutineScope.produce( public fun CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, + start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope.() -> Unit ): ReceiveChannel { @@ -122,7 +123,7 @@ public fun CoroutineScope.produce( val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) + coroutine.start(start, coroutine, block) return coroutine } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index 2b62be44f4..8a18bff30e 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -68,8 +68,16 @@ public abstract class ChannelFlow( open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel = scope.broadcast(context, produceCapacity, start, block = collectToFun) + /** + * Here we use ATOMIC start for a reason (#1825). + * NB: [produceImpl] is used for [flowOn]. + * For non-atomic start it is possible to observe the situation, + * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`) + * handlers, while the pipeline before does not, because it was cancelled during its dispatch. + * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks. + */ open fun produceImpl(scope: CoroutineScope): ReceiveChannel = - scope.produce(context, produceCapacity, block = collectToFun) + scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun) override suspend fun collect(collector: FlowCollector) = coroutineScope { diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt index 36d7a3a20d..ea077cc42d 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -60,7 +60,7 @@ internal fun CoroutineScope.flowProduce( val channel = Channel(capacity) val newContext = newCoroutineContext(context) val coroutine = FlowProduceCoroutine(newContext, channel) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) + coroutine.start(CoroutineStart.ATOMIC, coroutine, block) return coroutine } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index 8dd6e3c8a7..f93d039933 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -265,4 +265,15 @@ class ChannelBuildersFlowTest : TestBase() { fail() } } + + @Test + fun testProduceInAtomicity() = runTest { + val flow = flowOf(1).onCompletion { expect(2) } + val scope = CoroutineScope(wrapperDispatcher()) + flow.produceIn(scope) + expect(1) + scope.cancel() + scope.coroutineContext[Job]?.join() + finish(3) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt index 511a003a8e..684923c861 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt @@ -37,6 +37,35 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() { finish(3) } + @Test + fun testAtomicStart() = runTest { + try { + coroutineScope { + val job = coroutineContext[Job]!! + val flow = flow { + expect(3) + emit(1) + } + .onCompletion { expect(5) } + .flatMapMerge { + expect(4) + flowOf(it).onCompletion { expectUnreached() } } + .onCompletion { expect(6) } + + launch { + expect(1) + flow.collect() + } + launch { + expect(2) + job.cancel() + } + } + } catch (e: CancellationException) { + finish(7) + } + } + @Test fun testCancellationExceptionDownstream() = runTest { val flow = flow { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt index 34c0476ef6..f8350ff584 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt @@ -276,6 +276,33 @@ class FlowOnTest : TestBase() { assertEquals(listOf(1, 2), result) } + @Test + fun testAtomicStart() = runTest { + try { + coroutineScope { + val job = coroutineContext[Job]!! + val flow = flow { + expect(3) + emit(1) + } + .onCompletion { expect(4) } + .flowOn(wrapperDispatcher()) + .onCompletion { expect(5) } + + launch { + expect(1) + flow.collect() + } + launch { + expect(2) + job.cancel() + } + } + } catch (e: CancellationException) { + finish(6) + } + } + @Test fun testException() = runTest { val flow = flow {