Skip to content

Commit 245f9b6

Browse files
committed
Properly check identity of caught AbortFlowException in Flow.first operator.
It fixes two problems: * NoSuchElementException can be thrown during cancellation sequence (see FirstJvmTest that reproduces this problem with explanation) * Cancellation can be accidentally suppressed and flow activity can be prolonged Fixes #2051
1 parent 17248c8 commit 245f9b6

File tree

3 files changed

+70
-18
lines changed

3 files changed

+70
-18
lines changed

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+32-18
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,18 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
8484
*/
8585
public suspend fun <T> Flow<T>.first(): T {
8686
var result: Any? = NULL
87-
try {
88-
collect { value ->
87+
val collector = object : FlowCollector<T> {
88+
override suspend fun emit(value: T) {
8989
result = value
90-
throw AbortFlowException(NopCollector)
90+
throw AbortFlowException(this)
9191
}
92+
}
93+
try {
94+
collect(collector)
9295
} catch (e: AbortFlowException) {
93-
// Do nothing
96+
// Do not suppress other cancellation sources
97+
e.checkOwnership(collector)
9498
}
95-
9699
if (result === NULL) throw NoSuchElementException("Expected at least one element")
97100
return result as T
98101
}
@@ -103,17 +106,20 @@ public suspend fun <T> Flow<T>.first(): T {
103106
*/
104107
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
105108
var result: Any? = NULL
106-
try {
107-
collect { value ->
109+
val collector = object : FlowCollector<T> {
110+
override suspend fun emit(value: T) {
108111
if (predicate(value)) {
109112
result = value
110-
throw AbortFlowException(NopCollector)
113+
throw AbortFlowException(this)
111114
}
112115
}
116+
}
117+
try {
118+
collect(collector)
113119
} catch (e: AbortFlowException) {
114-
// Do nothing
120+
// Do not suppress other cancellation sources
121+
e.checkOwnership(collector)
115122
}
116-
117123
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
118124
return result as T
119125
}
@@ -124,13 +130,17 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
124130
*/
125131
public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
126132
var result: T? = null
127-
try {
128-
collect { value ->
133+
val collector = object : FlowCollector<T> {
134+
override suspend fun emit(value: T) {
129135
result = value
130-
throw AbortFlowException(NopCollector)
136+
throw AbortFlowException(this)
131137
}
138+
}
139+
try {
140+
collect(collector)
132141
} catch (e: AbortFlowException) {
133-
// Do nothing
142+
// Do not suppress other cancellation sources
143+
e.checkOwnership(collector)
134144
}
135145
return result
136146
}
@@ -141,15 +151,19 @@ public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
141151
*/
142152
public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
143153
var result: T? = null
144-
try {
145-
collect { value ->
154+
val collector = object : FlowCollector<T> {
155+
override suspend fun emit(value: T) {
146156
if (predicate(value)) {
147157
result = value
148-
throw AbortFlowException(NopCollector)
158+
throw AbortFlowException(this)
149159
}
150160
}
161+
}
162+
try {
163+
collect(collector)
151164
} catch (e: AbortFlowException) {
152-
// Do nothing
165+
// Do not suppress other cancellation sources
166+
e.checkOwnership(collector)
153167
}
154168
return result
155169
}

kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.internal.*
910
import kotlin.test.*
1011

1112
class FirstTest : TestBase() {
@@ -160,4 +161,13 @@ class FirstTest : TestBase() {
160161
assertSame(instance, flow.first { true })
161162
assertSame(instance, flow.firstOrNull { true })
162163
}
164+
165+
@Test
166+
fun testAbortFlowException() = runTest {
167+
val flow = flow<Int> {
168+
throw AbortFlowException(NopCollector) // Emulate cancellation
169+
}
170+
171+
assertFailsWith<CancellationException> { flow.first() }
172+
}
163173
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class FirstJvmTest : TestBase() {
12+
13+
@Test
14+
fun testTakeInterference() = runBlocking(Dispatchers.Default) {
15+
/*
16+
* This test tests a racy situation when outer channelFlow is being cancelled,
17+
* inner flow starts atomically in "CANCELLING" state, sends one element and completes
18+
* (=> cancels and drops element away), triggering NSEE in Flow.first operator
19+
*/
20+
val values = (0..10000).asFlow().flatMapMerge(Int.MAX_VALUE) {
21+
channelFlow {
22+
val value = channelFlow { send(1) }.first()
23+
send(value)
24+
}
25+
}.take(1).toList()
26+
assertEquals(listOf(1), values)
27+
}
28+
}

0 commit comments

Comments
 (0)