Skip to content

Commit 781ca69

Browse files
committed
Make awaitSingleOrNull() consistent with singleOrNull()
Fixes #2591
1 parent c752d64 commit 781ca69

File tree

2 files changed

+16
-12
lines changed

2 files changed

+16
-12
lines changed

reactive/kotlinx-coroutines-reactive/src/Await.kt

+13-9
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,19 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
9393
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
9494

9595
/**
96-
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
97-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
96+
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
97+
* this observable has produced an error, throws the corresponding exception. If more than one value or none were
98+
* produced by the publisher, `null` is returned.
9899
*
99100
* This suspending function is cancellable.
100-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
101-
* immediately resumes with [CancellationException].
102-
*
103-
* @throws NoSuchElementException if publisher does not emit any value
104-
* @throws IllegalArgumentException if publisher emits more than one value
101+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
102+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
105103
*/
106-
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
104+
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
105+
awaitOne(Mode.SINGLE_OR_DEFAULT)
106+
} catch (e: TooManyElementsException) {
107+
null
108+
}
107109

108110
/**
109111
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
@@ -120,6 +122,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T
120122

121123
// ------------------------ private ------------------------
122124

125+
private class TooManyElementsException(message: String): IllegalArgumentException(message)
126+
123127
private enum class Mode(val s: String) {
124128
FIRST("awaitFirst"),
125129
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -186,7 +190,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
186190
/* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
187191
`onError` on its own. */
188192
if (cont.isActive) {
189-
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
193+
cont.resumeWithException(TooManyElementsException("More than one onNext value for $mode"))
190194
}
191195
} else {
192196
value = t

reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
104104
@Test
105105
fun testAwaitSingleOrNullException() {
106106
val flux = flux {
107-
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
107+
send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K")
108108
}
109109

110-
checkErroneous(flux) {
111-
assert(it is IllegalArgumentException)
110+
checkSingleValue(flux) {
111+
assertEquals("OK", it)
112112
}
113113
}
114114

0 commit comments

Comments
 (0)