Skip to content

Commit 329e13b

Browse files
committed
Make awaitSingleOrNull() consistent with singleOrNull()
Fixes #2591
1 parent 2021b5a commit 329e13b

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

reactive/kotlinx-coroutines-reactive/src/Await.kt

+13-9
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,19 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
9494
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
9595

9696
/**
97-
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
98-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
97+
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
98+
* this observable has produced an error, throws the corresponding exception. If more than one value or none were
99+
* produced by the publisher, `null` is returned.
99100
*
100101
* This suspending function is cancellable.
101-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102-
* immediately resumes with [CancellationException].
103-
*
104-
* @throws NoSuchElementException if publisher does not emit any value
105-
* @throws IllegalArgumentException if publisher emits more than one value
102+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
103+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
106104
*/
107-
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
105+
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
106+
awaitOne(Mode.SINGLE_OR_DEFAULT)
107+
} catch (e: TooManyElementsException) {
108+
null
109+
}
108110

109111
/**
110112
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
@@ -121,6 +123,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T
121123

122124
// ------------------------ private ------------------------
123125

126+
private class TooManyElementsException(message: String): IllegalArgumentException(message)
127+
124128
private enum class Mode(val s: String) {
125129
FIRST("awaitFirst"),
126130
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -158,7 +162,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
158162
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
159163
subscription.cancel()
160164
if (cont.isActive)
161-
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
165+
cont.resumeWithException(TooManyElementsException("More than one onNext value for $mode"))
162166
} else {
163167
value = t
164168
seenValue = true

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ class IntegrationTest(
9292
assertEquals(1, pub.awaitFirstOrDefault(0))
9393
assertEquals(1, pub.awaitFirstOrNull())
9494
assertEquals(1, pub.awaitFirstOrElse { 0 })
95+
assertEquals(null, pub.awaitSingleOrNull())
9596
assertEquals(n, pub.awaitLast())
9697
assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
9798
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
98-
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
9999
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
100100
checkNumbers(n, pub)
101101
val channel = pub.openSubscription()

reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
104104
@Test
105105
fun testAwaitSingleOrNullException() {
106106
val flux = flux {
107-
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
107+
send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K")
108108
}
109109

110-
checkErroneous(flux) {
111-
assert(it is IllegalArgumentException)
110+
checkSingleValue(flux) {
111+
assertEquals("OK", it)
112112
}
113113
}
114114

0 commit comments

Comments
 (0)