Skip to content

Commit b8647a7

Browse files
committed
Flow firstOrNull support
1 parent c170373 commit b8647a7

File tree

3 files changed

+113
-0
lines changed

3 files changed

+113
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
900900
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
901901
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
902902
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
903+
public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
904+
public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
903905
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
904906
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
905907
public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+36
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,39 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
120120
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
121121
return result as T
122122
}
123+
124+
/**
125+
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
126+
* Returns [null] if the flow was empty.
127+
*/
128+
public suspend fun <T> Flow<T>.firstOrNull(): T? {
129+
var result: Any? = NULL
130+
try {
131+
collect { value ->
132+
result = value
133+
throw AbortFlowException(NopCollector)
134+
}
135+
} catch (e: AbortFlowException) {
136+
// Do nothing
137+
}
138+
return result.takeUnless { it == NULL } as T?
139+
}
140+
141+
/**
142+
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
143+
* Returns [null] if the flow did not contain an element matching the [predicate].
144+
*/
145+
public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
146+
var result: Any? = NULL
147+
try {
148+
collect { value ->
149+
if (predicate(value)) {
150+
result = value
151+
throw AbortFlowException(NopCollector)
152+
}
153+
}
154+
} catch (e: AbortFlowException) {
155+
// Do nothing
156+
}
157+
return result.takeUnless { it == NULL } as T?
158+
}

kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt

+75
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,79 @@ class FirstTest : TestBase() {
8383
assertEquals(1, flow.first())
8484
finish(2)
8585
}
86+
87+
@Test
88+
fun testFirstOrNull() = runTest {
89+
val flow = flowOf(1, 2, 3)
90+
assertEquals(1, flow.firstOrNull())
91+
}
92+
93+
@Test
94+
fun testFirstOrNullWithNulls() = runTest {
95+
val flow = flowOf(null, 1)
96+
assertNull(flow.firstOrNull())
97+
assertNull(flow.firstOrNull { it == null })
98+
assertEquals(1, flow.firstOrNull { it != null })
99+
}
100+
101+
@Test
102+
fun testFirstOrNullWithPredicate() = runTest {
103+
val flow = flowOf(1, 2, 3)
104+
assertEquals(1, flow.firstOrNull { it > 0 })
105+
assertEquals(2, flow.firstOrNull { it > 1 })
106+
assertNull(flow.firstOrNull { it > 3 })
107+
}
108+
109+
@Test
110+
fun testFirstOrNullCancellation() = runTest {
111+
val latch = Channel<Unit>()
112+
val flow = flow {
113+
coroutineScope {
114+
launch {
115+
latch.send(Unit)
116+
hang { expect(1) }
117+
}
118+
emit(1)
119+
emit(2)
120+
}
121+
}
122+
123+
124+
val result = flow.firstOrNull {
125+
latch.receive()
126+
true
127+
}
128+
assertEquals(1, result)
129+
finish(2)
130+
}
131+
132+
@Test
133+
fun testFirstOrNullWithEmptyFlow() = runTest {
134+
assertNull(emptyFlow<Int>().firstOrNull())
135+
assertNull(emptyFlow<Int>().firstOrNull { true })
136+
}
137+
138+
@Test
139+
fun testFirstOrNullWhenErrorCancelsUpstream() = runTest {
140+
val latch = Channel<Unit>()
141+
val flow = flow {
142+
coroutineScope {
143+
launch {
144+
latch.send(Unit)
145+
hang { expect(1) }
146+
}
147+
emit(1)
148+
}
149+
}
150+
151+
assertFailsWith<TestException> {
152+
flow.firstOrNull {
153+
latch.receive()
154+
throw TestException()
155+
}
156+
}
157+
158+
assertEquals(1, flow.firstOrNull())
159+
finish(2)
160+
}
86161
}

0 commit comments

Comments
 (0)