diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 6df47e179f..f0e337a1bd 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -900,6 +900,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index eb3ce288a2..b9bb72c02f 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -120,3 +120,39 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") return result as T } + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Returns [null] if the flow was empty. + */ +public suspend fun Flow.firstOrNull(): T? { + var result: Any? = NULL + try { + collect { value -> + result = value + throw AbortFlowException(NopCollector) + } + } catch (e: AbortFlowException) { + // Do nothing + } + return result.takeUnless { it == NULL } as T? +} + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Returns [null] if the flow did not contain an element matching the [predicate]. + */ +public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { + var result: Any? = NULL + try { + collect { value -> + if (predicate(value)) { + result = value + throw AbortFlowException(NopCollector) + } + } + } catch (e: AbortFlowException) { + // Do nothing + } + return result.takeUnless { it == NULL } as T? +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index e84d4c7b77..b7c908c21a 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -83,4 +83,71 @@ class FirstTest : TestBase() { assertEquals(1, flow.first()) finish(2) } + + @Test + fun testFirstOrNull() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.firstOrNull()) + } + + @Test + fun testFirstOrNullWithPredicate() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.firstOrNull { it > 0 }) + assertEquals(2, flow.firstOrNull { it > 1 }) + assertNull(flow.firstOrNull { it > 3 }) + } + + @Test + fun testFirstOrNullCancellation() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + emit(2) + } + } + + + val result = flow.firstOrNull { + latch.receive() + true + } + assertEquals(1, result) + finish(2) + } + + @Test + fun testFirstOrNullWithEmptyFlow() = runTest { + assertNull(emptyFlow().firstOrNull()) + assertNull(emptyFlow().firstOrNull { true }) + } + + @Test + fun testFirstOrNullWhenErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + } + } + + assertFailsWith { + flow.firstOrNull { + latch.receive() + throw TestException() + } + } + + assertEquals(1, flow.firstOrNull()) + finish(2) + } }