Skip to content

Commit 421cab7

Browse files
committed
Flow.first operators family (without firstOrNull and firstOrDefault support)
Fixes #1078
1 parent 33befbf commit 421cab7

File tree

3 files changed

+133
-2
lines changed

3 files changed

+133
-2
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
827827
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
828828
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
829829
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
830+
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
831+
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
830832
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
831833
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
832834
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+45-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
@file:JvmMultifileClass
66
@file:JvmName("FlowKt")
7+
@file:Suppress("UNCHECKED_CAST")
78

89
package kotlinx.coroutines.flow
910

@@ -50,7 +51,7 @@ public suspend inline fun <T, R> Flow<T>.fold(
5051
}
5152

5253
/**
53-
* Terminal operator, that awaits for one and only one value to be published.
54+
* The terminal operator, that awaits for one and only one value to be published.
5455
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
5556
* that contains more than one element.
5657
*/
@@ -68,7 +69,7 @@ public suspend fun <T> Flow<T>.single(): T {
6869
}
6970

7071
/**
71-
* Terminal operator, that awaits for one and only one value to be published.
72+
* The terminal operator, that awaits for one and only one value to be published.
7273
* Throws [IllegalStateException] for flow that contains more than one element.
7374
*/
7475
@FlowPreview
@@ -81,3 +82,45 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
8182

8283
return result
8384
}
85+
86+
/**
87+
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
88+
* Throws [NoSuchElementException] if the flow was empty.
89+
*/
90+
@FlowPreview
91+
public suspend fun <T> Flow<T>.first(): T {
92+
var result: Any? = NullSurrogate
93+
try {
94+
collect { value ->
95+
result = value
96+
throw AbortFlowException()
97+
}
98+
} catch (e: AbortFlowException) {
99+
// Do nothing
100+
}
101+
102+
if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element")
103+
return result as T
104+
}
105+
106+
/**
107+
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
108+
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
109+
*/
110+
@FlowPreview
111+
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
112+
var result: Any? = NullSurrogate
113+
try {
114+
collect { value ->
115+
if (predicate(value)) {
116+
result = value
117+
throw AbortFlowException()
118+
}
119+
}
120+
} catch (e: AbortFlowException) {
121+
// Do nothing
122+
}
123+
124+
if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
125+
return result as T
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class FirstTest : TestBase() {
12+
@Test
13+
fun testFirst() = runTest {
14+
val flow = flowOf(1, 2, 3)
15+
assertEquals(1, flow.first())
16+
}
17+
18+
@Test
19+
fun testNulls() = runTest {
20+
val flow = flowOf(null, 1)
21+
assertNull(flow.first())
22+
assertNull(flow.first { it == null })
23+
assertEquals(1, flow.first { it != null })
24+
}
25+
26+
@Test
27+
fun testFirstWithPredicate() = runTest {
28+
val flow = flowOf(1, 2, 3)
29+
assertEquals(1, flow.first { it > 0 })
30+
assertEquals(2, flow.first { it > 1 })
31+
assertFailsWith<NoSuchElementException> { flow.first { it > 3 } }
32+
}
33+
34+
@Test
35+
fun testFirstCancellation() = runTest {
36+
val latch = Channel<Unit>()
37+
val flow = flow {
38+
coroutineScope {
39+
launch {
40+
latch.send(Unit)
41+
hang { expect(1) }
42+
}
43+
emit(1)
44+
emit(2)
45+
}
46+
}
47+
48+
49+
val result = flow.first {
50+
latch.receive()
51+
true
52+
}
53+
assertEquals(1, result)
54+
finish(2)
55+
}
56+
57+
@Test
58+
fun testEmptyFlow() = runTest {
59+
assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first() }
60+
assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first { true } }
61+
}
62+
63+
@Test
64+
fun testErrorCancelsUpstream() = runTest {
65+
val latch = Channel<Unit>()
66+
val flow = flow {
67+
coroutineScope {
68+
launch {
69+
latch.send(Unit)
70+
hang { expect(1) }
71+
}
72+
emit(1)
73+
}
74+
}
75+
76+
assertFailsWith<TestException> {
77+
flow.first {
78+
latch.receive()
79+
throw TestException()
80+
}
81+
}
82+
83+
assertEquals(1, flow.first())
84+
finish(2)
85+
}
86+
}

0 commit comments

Comments
 (0)