Skip to content

Commit ecbfa6d

Browse files
committed
Properly distinguish AbortFlowExceptions from different non-terminal operators
Fixes #1610
1 parent 3affb90 commit ecbfa6d

File tree

10 files changed

+48
-26
lines changed

10 files changed

+48
-26
lines changed

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
114114
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
115115
*/
116116
(second as SendChannel<*>).invokeOnClose {
117-
if (!first.isClosedForReceive) first.cancel(AbortFlowException())
117+
if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
118118
}
119119

120120
val otherIterator = second.iterator()
@@ -126,9 +126,9 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
126126
emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
127127
}
128128
} catch (e: AbortFlowException) {
129-
// complete
129+
e.checkOwnership(owner = this@unsafeFlow)
130130
} finally {
131-
if (!second.isClosedForReceive) second.cancel(AbortFlowException())
131+
if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
132132
}
133133
}
134134
}

kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt

+10-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,21 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89

910
/**
1011
* This exception is thrown when operator need no more elements from the flow.
1112
* This exception should never escape outside of operator's implementation.
13+
* This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner
14+
* (see usages of [checkOwnership]).
1215
*/
13-
internal expect class AbortFlowException() : CancellationException
16+
internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException {
17+
public val owner: FlowCollector<*>
18+
}
19+
20+
internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) {
21+
if (this.owner !== owner) throw this
22+
}
1423

1524
/**
1625
* Exception used to cancel child of [scopedFlow] without cancelling the whole scope.

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
6262
}
6363
}
6464
} catch (e: AbortFlowException) {
65-
// Nothing, bail out
65+
e.checkOwnership(owner = this)
6666
}
6767
}
6868
}
6969

7070
private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
7171
emit(value)
72-
throw AbortFlowException()
72+
throw AbortFlowException(this)
7373
}
7474

7575
/**
@@ -80,9 +80,9 @@ public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
8080
try {
8181
collect { value ->
8282
if (predicate(value)) emit(value)
83-
else throw AbortFlowException()
83+
else throw AbortFlowException(this)
8484
}
8585
} catch (e: AbortFlowException) {
86-
// Nothing, bail out
86+
e.checkOwnership(owner = this)
8787
}
8888
}

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+1
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,6 @@ public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
130130
* Collects all the values from the given [flow] and emits them to the collector.
131131
* It is a shorthand for `flow.collect { value -> emit(value) }`.
132132
*/
133+
@BuilderInference
133134
@ExperimentalCoroutinesApi
134135
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public suspend fun <T> Flow<T>.first(): T {
9090
try {
9191
collect { value ->
9292
result = value
93-
throw AbortFlowException()
93+
throw AbortFlowException(NopCollector)
9494
}
9595
} catch (e: AbortFlowException) {
9696
// Do nothing
@@ -110,7 +110,7 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
110110
collect { value ->
111111
if (predicate(value)) {
112112
result = value
113-
throw AbortFlowException()
113+
throw AbortFlowException(NopCollector)
114114
}
115115
}
116116
} catch (e: AbortFlowException) {

kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,18 @@ class TakeTest : TestBase() {
131131
}
132132
finish(2)
133133
}
134+
135+
@Test
136+
fun testNestedTake() = runTest {
137+
val inner = flow {
138+
emit(1)
139+
expectUnreached()
140+
}.take(1)
141+
val outer = flow {
142+
while(true) {
143+
emitAll(inner)
144+
}
145+
}
146+
assertEquals(listOf(1, 1, 1), outer.take(3).toList())
147+
}
134148
}

kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt

+1-13
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class TransformLatestTest : TestBase() {
4444
}
4545

4646
@Test
47-
fun testSwitchRendevouzBuffer() = runTest {
47+
fun testSwitchRendezvousBuffer() = runTest {
4848
val flow = flowOf(1, 2, 3, 4, 5)
4949
flow.transformLatest {
5050
emit(it)
@@ -157,16 +157,4 @@ class TransformLatestTest : TestBase() {
157157
val flow = flowOf(1, 2, 3, 4, 5).transformLatest { emit(it) }
158158
assertEquals(listOf(1), flow.take(1).toList())
159159
}
160-
161-
@Test
162-
@Ignore // TODO separate branch and/or discuss
163-
fun testTakeUpstreamCancellation() = runTest {
164-
val flow = flow {
165-
emit(1)
166-
expectUnreached()
167-
emit(2)
168-
emit(3)
169-
}.transformLatest { emit(it) }
170-
assertEquals(listOf(1), flow.take(1).toList())
171-
}
172160
}

kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89

9-
internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
10+
internal actual class AbortFlowException actual constructor(
11+
actual val owner: FlowCollector<*>
12+
) : CancellationException("Flow was aborted, no more elements needed")
1013
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")

kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
10+
internal actual class AbortFlowException actual constructor(
11+
actual val owner: FlowCollector<*>
12+
) : CancellationException("Flow was aborted, no more elements needed") {
813

9-
internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
1014
override fun fillInStackTrace(): Throwable {
1115
if (DEBUG) super.fillInStackTrace()
1216
return this

kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89

9-
internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
10+
internal actual class AbortFlowException actual constructor(
11+
actual val owner: FlowCollector<*>
12+
) : CancellationException("Flow was aborted, no more elements needed")
1013
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
1114

0 commit comments

Comments
 (0)