From 55b133f4afae946b4d69c8f98d69982d89c37b93 Mon Sep 17 00:00:00 2001 From: Nicklas Ansman Giertz Date: Fri, 4 Sep 2020 14:20:55 -0400 Subject: [PATCH 1/5] Allow nullable types in Flow's firstOrNull extension This closes #2229 --- kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt | 6 +++--- .../common/test/flow/terminal/FirstTest.kt | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index d36e1bbf7b..675e5ccdf3 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -112,7 +112,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Returns `null` if the flow was empty. */ -public suspend fun Flow.firstOrNull(): T? { +public suspend fun Flow.firstOrNull(): T? { var result: T? = null collectWhile { result = it @@ -122,10 +122,10 @@ public suspend fun Flow.firstOrNull(): T? { } /** - * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. * Returns `null` if the flow did not contain an element matching the [predicate]. */ -public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { +public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { var result: T? = null collectWhile { if (predicate(it)) { diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index edb9f00fa6..fa7fc9cb6c 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -128,6 +128,12 @@ class FirstTest : TestBase() { assertNull(emptyFlow().firstOrNull { true }) } + @Test + fun testFirstOrNullWithNullElement() = runTest { + assertNull(flowOf(null).firstOrNull()) + assertNull(flowOf(null).firstOrNull { true }) + } + @Test fun testFirstOrNullWhenErrorCancelsUpstream() = runTest { val latch = Channel() From 1479bf43144f0a1d40e61a28e94901e3f4b1a2fe Mon Sep 17 00:00:00 2001 From: Nicklas Ansman Giertz Date: Thu, 24 Sep 2020 12:19:46 -0400 Subject: [PATCH 2/5] Remove the type constraint for Flow.singleOrNull --- kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt | 2 +- .../common/test/flow/terminal/SingleTest.kt | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 675e5ccdf3..1a5b3de100 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -67,7 +67,7 @@ public suspend fun Flow.single(): T { * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ -public suspend fun Flow.singleOrNull(): T? { +public suspend fun Flow.singleOrNull(): T? { var result: T? = null collect { value -> if (result != null) error("Expected only one element") diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 4e89b93bd7..9c92b790d6 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -61,6 +61,10 @@ class SingleTest : TestBase() { assertEquals(1, flowOf(1).single()) assertNull(flowOf(null).single()) assertFailsWith { flowOf().single() } + + assertEquals(1, flowOf(1).singleOrNull()) + assertNull(flowOf(null).singleOrNull()) + assertFailsWith { flowOf().singleOrNull() } } @Test From 7d863420e6ca4290e5d4be5fa0cfc25debcc5b40 Mon Sep 17 00:00:00 2001 From: Nicklas Ansman Giertz Date: Mon, 5 Oct 2020 13:17:18 -0400 Subject: [PATCH 3/5] Fix the broken test --- .../common/test/flow/terminal/SingleTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 9c92b790d6..dd9857c7aa 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.test.* -class SingleTest : TestBase() { +class SingleTest : TestBase() { @Test fun testSingle() = runTest { @@ -64,7 +64,7 @@ class SingleTest : TestBase() { assertEquals(1, flowOf(1).singleOrNull()) assertNull(flowOf(null).singleOrNull()) - assertFailsWith { flowOf().singleOrNull() } + assertNull(flowOf().singleOrNull()) } @Test From f68190254e39e3fc86296706808f0a9fdbc38c32 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 7 Oct 2020 17:56:56 +0300 Subject: [PATCH 4/5] Align Flow.single and Flow.singleOrNull with Kotlin standard library Fixes #2289 --- .../common/src/flow/terminal/Reduce.kt | 31 +++++++++++++------ .../test/flow/operators/OnCompletionTest.kt | 4 +-- .../common/test/flow/terminal/SingleTest.kt | 4 +-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 1a5b3de100..ef46ff04cf 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -9,6 +9,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.Symbol import kotlin.jvm.* /** @@ -47,33 +48,43 @@ public suspend inline fun Flow.fold( } /** - * The terminal operator, that awaits for one and only one value to be published. + * The terminal operator that awaits for one and only one value to be emitted. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> - if (result !== NULL) error("Expected only one element") + require(result == NULL) { "Flow has more than one element" } result = value } - if (result === NULL) throw NoSuchElementException("Expected at least one element") - @Suppress("UNCHECKED_CAST") + if (result === NULL) throw NoSuchElementException("Flow is empty") return result as T } /** - * The terminal operator, that awaits for one and only one value to be published. - * Throws [IllegalStateException] for flow that contains more than one element. + * The terminal operator that awaits for one and only one value to be emitted. + * Returns the single value or `null`, if the flow was empty or emitted more than one value. */ public suspend fun Flow.singleOrNull(): T? { - var result: T? = null + var result: Any? = NULL collect { value -> - if (result != null) error("Expected only one element") - result = value + /* + * result === NULL -> first value + * result === user value -> we already had first value and second one arrived + * result === DONE -> we've seen more than one value, time to return it + * as well. + */ + result = if (result === NULL) { + value + } else { + // Indicator that more than one value was observed + DONE + } } - return result + // Symbols are never leaked, so it's one comparison versus two + return if (result is Symbol) null else result as T } /** diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index 7f0c548ca6..f55e8beeb2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -231,7 +231,7 @@ class OnCompletionTest : TestBase() { @Test fun testSingle() = runTest { - assertFailsWith { + assertFailsWith { flowOf(239).onCompletion { assertNull(it) expect(1) @@ -240,7 +240,7 @@ class OnCompletionTest : TestBase() { expectUnreached() } catch (e: Throwable) { // Second emit -- failure - assertTrue { e is IllegalStateException } + assertTrue { e is IllegalArgumentException } throw e } }.single() diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index dd9857c7aa..5865029893 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -25,8 +25,8 @@ class SingleTest : TestBase() { emit(239L) emit(240L) } - assertFailsWith { flow.single() } - assertFailsWith { flow.singleOrNull() } + assertFailsWith { flow.single() } + assertNull(flow.singleOrNull()) } @Test From 1c3680a31096ce9933eee383c27d2838b3e356c1 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 8 Oct 2020 12:02:04 +0300 Subject: [PATCH 5/5] Rewrite singleOrNull using collectWhile to cancel the flow when the second element is encountered --- .../common/src/flow/terminal/Reduce.kt | 24 ++++++++----------- .../common/test/flow/terminal/SingleTest.kt | 17 +++++++++++++ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index ef46ff04cf..83f5498e4d 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -55,7 +55,7 @@ public suspend inline fun Flow.fold( public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> - require(result == NULL) { "Flow has more than one element" } + require(result === NULL) { "Flow has more than one element" } result = value } @@ -69,22 +69,18 @@ public suspend fun Flow.single(): T { */ public suspend fun Flow.singleOrNull(): T? { var result: Any? = NULL - collect { value -> - /* - * result === NULL -> first value - * result === user value -> we already had first value and second one arrived - * result === DONE -> we've seen more than one value, time to return it - * as well. - */ - result = if (result === NULL) { - value + collectWhile { + // No values yet, update result + if (result === NULL) { + result = it + true } else { - // Indicator that more than one value was observed - DONE + // Second value, reset result and bail out + result = NULL + false } } - // Symbols are never leaked, so it's one comparison versus two - return if (result is Symbol) null else result as T + return if (result === NULL) null else result as T } /** diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 5865029893..2c1277b1e1 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -73,5 +73,22 @@ class SingleTest : TestBase() { val flow = flowOf(instance) assertSame(instance, flow.single()) assertSame(instance, flow.singleOrNull()) + + val flow2 = flow { + emit(BadClass()) + emit(BadClass()) + } + assertFailsWith { flow2.single() } + } + + @Test + fun testSingleNoWait() = runTest { + val flow = flow { + emit(1) + emit(2) + awaitCancellation() + } + + assertNull(flow.singleOrNull()) } }