Skip to content

Atomically start coroutines in intermediate Flow operators in order t… #1829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -746,10 +746,10 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
public final class kotlinx/coroutines/channels/ProduceKt {
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}

public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/channels/SendChannel {
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,15 @@ public fun <E> CoroutineScope.produce(
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
coroutine.start(start, coroutine, block)
return coroutine
}

Expand Down
10 changes: 9 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,16 @@ public abstract class ChannelFlow<T>(
open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
scope.broadcast(context, produceCapacity, start, block = collectToFun)

/**
* Here we use ATOMIC start for a reason (#1825).
* NB: [produceImpl] is used for [flowOn].
* For non-atomic start it is possible to observe the situation,
* where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
* handlers, while the pipeline before does not, because it was cancelled during its dispatch.
* Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
*/
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, block = collectToFun)
scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ internal fun <T> CoroutineScope.flowProduce(
val channel = Channel<T>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = FlowProduceCoroutine(newContext, channel)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
return coroutine
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,15 @@ class ChannelBuildersFlowTest : TestBase() {
fail()
}
}

@Test
fun testProduceInAtomicity() = runTest {
val flow = flowOf(1).onCompletion { expect(2) }
val scope = CoroutineScope(wrapperDispatcher())
flow.produceIn(scope)
expect(1)
scope.cancel()
scope.coroutineContext[Job]?.join()
finish(3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,35 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
finish(3)
}

@Test
fun testAtomicStart() = runTest {
try {
coroutineScope {
val job = coroutineContext[Job]!!
val flow = flow {
expect(3)
emit(1)
}
.onCompletion { expect(5) }
.flatMapMerge {
expect(4)
flowOf(it).onCompletion { expectUnreached() } }
.onCompletion { expect(6) }

launch {
expect(1)
flow.collect()
}
launch {
expect(2)
job.cancel()
}
}
} catch (e: CancellationException) {
finish(7)
}
}

@Test
fun testCancellationExceptionDownstream() = runTest {
val flow = flow {
Expand Down
27 changes: 27 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,33 @@ class FlowOnTest : TestBase() {
assertEquals(listOf(1, 2), result)
}

@Test
fun testAtomicStart() = runTest {
try {
coroutineScope {
val job = coroutineContext[Job]!!
val flow = flow {
expect(3)
emit(1)
}
.onCompletion { expect(4) }
.flowOn(wrapperDispatcher())
.onCompletion { expect(5) }

launch {
expect(1)
flow.collect()
}
launch {
expect(2)
job.cancel()
}
}
} catch (e: CancellationException) {
finish(6)
}
}

@Test
fun testException() = runTest {
val flow = flow {
Expand Down