Skip to content

Commit fa7b73f

Browse files
qwwdfsadpablobaxter
authored andcommitted
Fail-fast in emitAll implementation from onCompletion (Kotlin#2700)
It is helpful to prevent bugs like KT-46013 and potential deadlocks or delayed cancellations
1 parent c30e18c commit fa7b73f

File tree

4 files changed

+36
-4
lines changed

4 files changed

+36
-4
lines changed

kotlinx-coroutines-core/common/src/flow/Channels.kt

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
3030
emitAllImpl(channel, consume = true)
3131

3232
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
33+
ensureActive()
3334
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
3435
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
3536
// fix retention of the last emitted value.

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,15 @@ public fun <T> Flow<T>.onEmpty(
194194
}
195195
}
196196

197-
private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
197+
/*
198+
* 'emitAll' methods call this to fail-fast before starting to collect
199+
* their sources (that may not have any elements for a long time).
200+
*/
201+
internal fun FlowCollector<*>.ensureActive() {
202+
if (this is ThrowingCollector) throw e
203+
}
204+
205+
internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> {
198206
override suspend fun emit(value: Any?) {
199207
throw e
200208
}

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -127,5 +127,7 @@ public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
127127
* Collects all the values from the given [flow] and emits them to the collector.
128128
* It is a shorthand for `flow.collect { value -> emit(value) }`.
129129
*/
130-
@BuilderInference
131-
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit = flow.collect(this)
130+
public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
131+
ensureActive()
132+
flow.collect(this)
133+
}

kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import kotlinx.coroutines.flow.internal.*
910
import kotlin.test.*
1011

@@ -290,4 +291,24 @@ class OnCompletionTest : TestBase() {
290291
val expected = (1..5).toList() + (-1)
291292
assertEquals(expected, result)
292293
}
294+
295+
@Test
296+
fun testCancelledEmitAllFlow() = runTest {
297+
// emitAll does not call 'collect' on onCompletion collector
298+
// if the target flow is empty
299+
flowOf(1, 2, 3)
300+
.onCompletion { emitAll(MutableSharedFlow()) }
301+
.take(1)
302+
.collect()
303+
}
304+
305+
@Test
306+
fun testCancelledEmitAllChannel() = runTest {
307+
// emitAll does not call 'collect' on onCompletion collector
308+
// if the target channel is empty
309+
flowOf(1, 2, 3)
310+
.onCompletion { emitAll(Channel()) }
311+
.take(1)
312+
.collect()
313+
}
293314
}

0 commit comments

Comments
 (0)