From 26e2d88bde705accf8a10f169c401c48632b67f3 Mon Sep 17 00:00:00 2001 From: Bradyn Poulsen Date: Mon, 10 Feb 2020 21:11:12 -0700 Subject: [PATCH 1/7] Flow firstOrNull support --- .../api/kotlinx-coroutines-core.api | 2 + .../common/src/flow/terminal/Reduce.kt | 36 +++++++++ .../common/test/flow/terminal/FirstTest.kt | 75 +++++++++++++++++++ 3 files changed, 113 insertions(+) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index db821a1e05..a6e5fd513e 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -907,6 +907,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index ccf8241f41..c3d02b13f5 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -120,3 +120,39 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") return result as T } + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Returns [null] if the flow was empty. + */ +public suspend fun Flow.firstOrNull(): T? { + var result: Any? = NULL + try { + collect { value -> + result = value + throw AbortFlowException(NopCollector) + } + } catch (e: AbortFlowException) { + // Do nothing + } + return result.takeUnless { it == NULL } as T? +} + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Returns [null] if the flow did not contain an element matching the [predicate]. + */ +public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { + var result: Any? = NULL + try { + collect { value -> + if (predicate(value)) { + result = value + throw AbortFlowException(NopCollector) + } + } + } catch (e: AbortFlowException) { + // Do nothing + } + return result.takeUnless { it == NULL } as T? +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index e84d4c7b77..91811ceb70 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -83,4 +83,79 @@ class FirstTest : TestBase() { assertEquals(1, flow.first()) finish(2) } + + @Test + fun testFirstOrNull() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.firstOrNull()) + } + + @Test + fun testFirstOrNullWithNulls() = runTest { + val flow = flowOf(null, 1) + assertNull(flow.firstOrNull()) + assertNull(flow.firstOrNull { it == null }) + assertEquals(1, flow.firstOrNull { it != null }) + } + + @Test + fun testFirstOrNullWithPredicate() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.firstOrNull { it > 0 }) + assertEquals(2, flow.firstOrNull { it > 1 }) + assertNull(flow.firstOrNull { it > 3 }) + } + + @Test + fun testFirstOrNullCancellation() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + emit(2) + } + } + + + val result = flow.firstOrNull { + latch.receive() + true + } + assertEquals(1, result) + finish(2) + } + + @Test + fun testFirstOrNullWithEmptyFlow() = runTest { + assertNull(emptyFlow().firstOrNull()) + assertNull(emptyFlow().firstOrNull { true }) + } + + @Test + fun testFirstOrNullWhenErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + } + } + + assertFailsWith { + flow.firstOrNull { + latch.receive() + throw TestException() + } + } + + assertEquals(1, flow.firstOrNull()) + finish(2) + } } From f3bdd562b69ce4489f95af4067fb5e2527d0af46 Mon Sep 17 00:00:00 2001 From: Bradyn Poulsen Date: Thu, 27 Feb 2020 08:19:55 -0700 Subject: [PATCH 2/7] Updated firstOrNull with Any lower bound --- kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index c3d02b13f5..30dbc83b38 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -142,7 +142,7 @@ public suspend fun Flow.firstOrNull(): T? { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Returns [null] if the flow did not contain an element matching the [predicate]. */ -public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { +public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { var result: Any? = NULL try { collect { value -> From af3932a148e7281f771765728e94fe3b8a6376eb Mon Sep 17 00:00:00 2001 From: Bradyn Poulsen Date: Thu, 27 Feb 2020 08:21:14 -0700 Subject: [PATCH 3/7] Updated firstOrNull w/o arguments with Any lower bound --- kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 30dbc83b38..eaf3cbeb15 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -125,7 +125,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Returns [null] if the flow was empty. */ -public suspend fun Flow.firstOrNull(): T? { +public suspend fun Flow.firstOrNull(): T? { var result: Any? = NULL try { collect { value -> From a327f01253b82c719c61ddf8a2fd34e78c80902f Mon Sep 17 00:00:00 2001 From: Bradyn Poulsen Date: Thu, 27 Feb 2020 08:23:39 -0700 Subject: [PATCH 4/7] Removed firstOrNull tests with nullable types --- .../common/test/flow/terminal/FirstTest.kt | 8 -------- 1 file changed, 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index 91811ceb70..b7c908c21a 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -90,14 +90,6 @@ class FirstTest : TestBase() { assertEquals(1, flow.firstOrNull()) } - @Test - fun testFirstOrNullWithNulls() = runTest { - val flow = flowOf(null, 1) - assertNull(flow.firstOrNull()) - assertNull(flow.firstOrNull { it == null }) - assertEquals(1, flow.firstOrNull { it != null }) - } - @Test fun testFirstOrNullWithPredicate() = runTest { val flow = flowOf(1, 2, 3) From e1997db6ef8e1978788fbb44f150c95fcffee5dd Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 16 Mar 2020 14:37:45 +0300 Subject: [PATCH 5/7] Fix documentation, fix equality check, add tests --- .../common/src/flow/terminal/Reduce.kt | 10 +++++----- kotlinx-coroutines-core/common/test/AsyncTest.kt | 6 ------ kotlinx-coroutines-core/common/test/TestBase.common.kt | 5 +++++ .../common/test/WithTimeoutOrNullTest.kt | 6 ------ kotlinx-coroutines-core/common/test/WithTimeoutTest.kt | 6 ------ .../common/test/channels/RendezvousChannelTest.kt | 6 ------ .../common/test/flow/terminal/FirstTest.kt | 10 ++++++++++ .../common/test/flow/terminal/SingleTest.kt | 9 ++++++++- 8 files changed, 28 insertions(+), 30 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index eaf3cbeb15..06c60058f3 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -123,7 +123,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { /** * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. - * Returns [null] if the flow was empty. + * Returns `null` if the flow was empty. */ public suspend fun Flow.firstOrNull(): T? { var result: Any? = NULL @@ -135,12 +135,12 @@ public suspend fun Flow.firstOrNull(): T? { } catch (e: AbortFlowException) { // Do nothing } - return result.takeUnless { it == NULL } as T? + return result.takeUnless { it === NULL } as T? } /** - * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. - * Returns [null] if the flow did not contain an element matching the [predicate]. + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * Returns `null` if the flow did not contain an element matching the [predicate]. */ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { var result: Any? = NULL @@ -154,5 +154,5 @@ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boole } catch (e: AbortFlowException) { // Do nothing } - return result.takeUnless { it == NULL } as T? + return result.takeUnless { it === NULL } as T? } diff --git a/kotlinx-coroutines-core/common/test/AsyncTest.kt b/kotlinx-coroutines-core/common/test/AsyncTest.kt index 6fd4ebbe04..3019ddeab1 100644 --- a/kotlinx-coroutines-core/common/test/AsyncTest.kt +++ b/kotlinx-coroutines-core/common/test/AsyncTest.kt @@ -210,12 +210,6 @@ class AsyncTest : TestBase() { finish(13) } - class BadClass { - override fun equals(other: Any?): Boolean = error("equals") - override fun hashCode(): Int = error("hashCode") - override fun toString(): String = error("toString") - } - @Test fun testDeferBadClass() = runTest { val bad = BadClass() diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt index a6119ee8a6..0ba80ee509 100644 --- a/kotlinx-coroutines-core/common/test/TestBase.common.kt +++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt @@ -80,3 +80,8 @@ public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext { public suspend fun wrapperDispatcher(): CoroutineContext = wrapperDispatcher(coroutineContext) +class BadClass { + override fun equals(other: Any?): Boolean = error("equals") + override fun hashCode(): Int = error("hashCode") + override fun toString(): String = error("toString") +} diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutOrNullTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullTest.kt index 3faf900cb9..40d2758daa 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutOrNullTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullTest.kt @@ -152,12 +152,6 @@ class WithTimeoutOrNullTest : TestBase() { assertSame(bad, result) } - class BadClass { - override fun equals(other: Any?): Boolean = error("Should not be called") - override fun hashCode(): Int = error("Should not be called") - override fun toString(): String = error("Should not be called") - } - @Test fun testNullOnTimeout() = runTest { expect(1) diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt index ab61b9c8f3..8462c96953 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt @@ -107,12 +107,6 @@ class WithTimeoutTest : TestBase() { assertSame(bad, result) } - class BadClass { - override fun equals(other: Any?): Boolean = error("Should not be called") - override fun hashCode(): Int = error("Should not be called") - override fun toString(): String = error("Should not be called") - } - @Test fun testExceptionOnTimeout() = runTest { expect(1) diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index d036af9395..4d20d71596 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -241,12 +241,6 @@ class RendezvousChannelTest : TestBase() { finish(12) } - class BadClass { - override fun equals(other: Any?): Boolean = error("equals") - override fun hashCode(): Int = error("hashCode") - override fun toString(): String = error("toString") - } - @Test fun testProduceBadClass() = runTest { val bad = BadClass() diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index b7c908c21a..f737a1d0de 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -150,4 +150,14 @@ class FirstTest : TestBase() { assertEquals(1, flow.firstOrNull()) finish(2) } + + @Test + fun testBadClass() = runTest { + val instance = BadClass() + val flow = flowOf(instance) + assertSame(instance, flow.first()) + assertSame(instance, flow.firstOrNull()) + assertSame(instance, flow.first { true }) + assertSame(instance, flow.firstOrNull { true }) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index f7205957d1..4e89b93bd7 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -17,7 +17,6 @@ class SingleTest : TestBase() { assertEquals(239L, flow.single()) assertEquals(239L, flow.singleOrNull()) - } @Test @@ -63,4 +62,12 @@ class SingleTest : TestBase() { assertNull(flowOf(null).single()) assertFailsWith { flowOf().single() } } + + @Test + fun testBadClass() = runTest { + val instance = BadClass() + val flow = flowOf(instance) + assertSame(instance, flow.single()) + assertSame(instance, flow.singleOrNull()) + } } From a6b74c714a2e37fbce3a55c40344c11d004c921a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 16 Mar 2020 14:44:44 +0300 Subject: [PATCH 6/7] Get rid of NULL comparison because of T: Any upper bound in firstOrNull --- .../common/src/flow/terminal/Reduce.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 06c60058f3..b7b9509f72 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -77,7 +77,6 @@ public suspend fun Flow.singleOrNull(): T? { if (result != null) error("Expected only one element") result = value } - return result } @@ -126,7 +125,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { * Returns `null` if the flow was empty. */ public suspend fun Flow.firstOrNull(): T? { - var result: Any? = NULL + var result: Any? = null try { collect { value -> result = value @@ -135,7 +134,7 @@ public suspend fun Flow.firstOrNull(): T? { } catch (e: AbortFlowException) { // Do nothing } - return result.takeUnless { it === NULL } as T? + return result as? T } /** @@ -143,7 +142,7 @@ public suspend fun Flow.firstOrNull(): T? { * Returns `null` if the flow did not contain an element matching the [predicate]. */ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { - var result: Any? = NULL + var result: Any? = null try { collect { value -> if (predicate(value)) { @@ -154,5 +153,5 @@ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boole } catch (e: AbortFlowException) { // Do nothing } - return result.takeUnless { it === NULL } as T? + return result as? T } From a1312f65775c92cd417fbc9f23919286f79fe3ee Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 17 Mar 2020 11:17:34 +0300 Subject: [PATCH 7/7] Any? -> T? --- .../common/src/flow/terminal/Reduce.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index b7b9509f72..674f8322f2 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -125,7 +125,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { * Returns `null` if the flow was empty. */ public suspend fun Flow.firstOrNull(): T? { - var result: Any? = null + var result: T? = null try { collect { value -> result = value @@ -134,7 +134,7 @@ public suspend fun Flow.firstOrNull(): T? { } catch (e: AbortFlowException) { // Do nothing } - return result as? T + return result } /** @@ -142,7 +142,7 @@ public suspend fun Flow.firstOrNull(): T? { * Returns `null` if the flow did not contain an element matching the [predicate]. */ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { - var result: Any? = null + var result: T? = null try { collect { value -> if (predicate(value)) { @@ -153,5 +153,5 @@ public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boole } catch (e: AbortFlowException) { // Do nothing } - return result as? T + return result }