Skip to content

Commit d5d257d

Browse files
committed
Check for cancellation in concurrent flow merge on each element
* Implementation detail (launch on each value) is leaking into upstream behaviour * The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel) Fixes #1392
1 parent ace5899 commit d5d257d

File tree

4 files changed

+47
-1
lines changed

4 files changed

+47
-1
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ private class ChannelFlowMerge<T>(
151151
// The actual merge implementation with concurrency limit
152152
private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
153153
val semaphore = Semaphore(concurrency)
154-
@Suppress("UNCHECKED_CAST")
154+
val job: Job? = coroutineContext[Job]
155155
flow.collect { inner ->
156+
job?.ensureActive()
156157
semaphore.acquire() // Acquire concurrency permit
157158
scope.launch {
158159
try {

kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,19 @@ class BufferTest : TestBase() {
183183
}
184184
finish(n + 4)
185185
}
186+
187+
@Test
188+
fun testCancellation() = runTest {
189+
val result = flow {
190+
emit(1)
191+
emit(2)
192+
emit(3)
193+
expectUnreached()
194+
emit(4)
195+
}.buffer(0)
196+
.take(2)
197+
.toList()
198+
assertEquals(listOf(1, 2), result)
199+
}
186200
}
187201

kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt

+16
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,20 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
7373
assertFailsWith<CancellationException>(flow)
7474
finish(5)
7575
}
76+
77+
@Test
78+
fun testCancellation() = runTest {
79+
val result = flow {
80+
emit(1)
81+
emit(2)
82+
emit(3)
83+
emit(4)
84+
expectUnreached() // Cancelled by take
85+
emit(5)
86+
}.flatMapMerge(2) { v -> flow { emit(v) } }
87+
.buffer(0)
88+
.take(2)
89+
.toList()
90+
assertEquals(listOf(1, 2), result)
91+
}
7692
}

kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt

+15
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,21 @@ class FlowOnTest : TestBase() {
261261
finish(3)
262262
}
263263

264+
@Test
265+
fun testCancellation() = runTest {
266+
val result = flow {
267+
emit(1)
268+
emit(2)
269+
emit(3)
270+
expectUnreached()
271+
emit(4)
272+
}.flowOn(wrapperDispatcher())
273+
.buffer(0)
274+
.take(2)
275+
.toList()
276+
assertEquals(listOf(1, 2), result)
277+
}
278+
264279
private inner class Source(private val value: Int) {
265280
public var contextName: String = "unknown"
266281

0 commit comments

Comments
 (0)