Skip to content

Commit 15112a6

Browse files
committed
Atomically start coroutines in intermediate Flow operators in order to ensure proper termination, including finally blocks and onCompletion operators
Fixes #1825
1 parent 3651276 commit 15112a6

File tree

7 files changed

+81
-5
lines changed

7 files changed

+81
-5
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2-2
Original file line numberDiff line numberDiff line change
@@ -746,10 +746,10 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
746746
public final class kotlinx/coroutines/channels/ProduceKt {
747747
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
748748
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
749-
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
750749
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
751-
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
750+
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
752751
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
752+
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
753753
}
754754

755755
public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/channels/SendChannel {

kotlinx-coroutines-core/common/src/channels/Produce.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,15 @@ public fun <E> CoroutineScope.produce(
115115
public fun <E> CoroutineScope.produce(
116116
context: CoroutineContext = EmptyCoroutineContext,
117117
capacity: Int = 0,
118+
start: CoroutineStart = CoroutineStart.DEFAULT,
118119
onCompletion: CompletionHandler? = null,
119120
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
120121
): ReceiveChannel<E> {
121122
val channel = Channel<E>(capacity)
122123
val newContext = newCoroutineContext(context)
123124
val coroutine = ProducerCoroutine(newContext, channel)
124125
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
125-
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
126+
coroutine.start(start, coroutine, block)
126127
return coroutine
127128
}
128129

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ public abstract class ChannelFlow<T>(
6868
open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
6969
scope.broadcast(context, produceCapacity, start, block = collectToFun)
7070

71+
/**
72+
* Here we use ATOMIC start for a reason (#1825).
73+
* NB: [produceImpl] is used for [flowOn].
74+
* For non-atomic start it is possible to observe the situation,
75+
* where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
76+
* handlers, while the pipeline before does not, because it was cancelled during its dispatch.
77+
* Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
78+
*/
7179
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
72-
scope.produce(context, produceCapacity, block = collectToFun)
80+
scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
7381

7482
override suspend fun collect(collector: FlowCollector<T>) =
7583
coroutineScope {

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal fun <T> CoroutineScope.flowProduce(
6060
val channel = Channel<T>(capacity)
6161
val newContext = newCoroutineContext(context)
6262
val coroutine = FlowProduceCoroutine(newContext, channel)
63-
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
63+
coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
6464
return coroutine
6565
}
6666

kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -265,4 +265,15 @@ class ChannelBuildersFlowTest : TestBase() {
265265
fail()
266266
}
267267
}
268+
269+
@Test
270+
fun testProduceInAtomicity() = runTest {
271+
val flow = flowOf(1).onCompletion { expect(2) }
272+
val scope = CoroutineScope(wrapperDispatcher())
273+
flow.produceIn(scope)
274+
expect(1)
275+
scope.cancel()
276+
scope.coroutineContext[Job]?.join()
277+
finish(3)
278+
}
268279
}

kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt

+29
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,35 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
3737
finish(3)
3838
}
3939

40+
@Test
41+
fun testAtomicStart() = runTest {
42+
try {
43+
coroutineScope {
44+
val job = coroutineContext[Job]!!
45+
val flow = flow {
46+
expect(3)
47+
emit(1)
48+
}
49+
.onCompletion { expect(5) }
50+
.flatMapMerge {
51+
expect(4)
52+
flowOf(it).onCompletion { expectUnreached() } }
53+
.onCompletion { expect(6) }
54+
55+
launch {
56+
expect(1)
57+
flow.collect()
58+
}
59+
launch {
60+
expect(2)
61+
job.cancel()
62+
}
63+
}
64+
} catch (e: CancellationException) {
65+
finish(7)
66+
}
67+
}
68+
4069
@Test
4170
fun testCancellationExceptionDownstream() = runTest {
4271
val flow = flow {

kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt

+27
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,33 @@ class FlowOnTest : TestBase() {
276276
assertEquals(listOf(1, 2), result)
277277
}
278278

279+
@Test
280+
fun testAtomicStart() = runTest {
281+
try {
282+
coroutineScope {
283+
val job = coroutineContext[Job]!!
284+
val flow = flow {
285+
expect(3)
286+
emit(1)
287+
}
288+
.onCompletion { expect(4) }
289+
.flowOn(wrapperDispatcher())
290+
.onCompletion { expect(5) }
291+
292+
launch {
293+
expect(1)
294+
flow.collect()
295+
}
296+
launch {
297+
expect(2)
298+
job.cancel()
299+
}
300+
}
301+
} catch (e: CancellationException) {
302+
finish(6)
303+
}
304+
}
305+
279306
@Test
280307
fun testException() = runTest {
281308
val flow = flow {

0 commit comments

Comments
 (0)