Skip to content

Commit 5dd94a3

Browse files
authored
Add Publisher.awaitSingleOrDefault|Null|Else extensions (#2260)
This commit adds awaitSingle variants similar to awaitFirst ones, but always emitting the value during onComplete(). Fixes #1993
1 parent 85b1a2b commit 5dd94a3

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed

reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ public final class kotlinx/coroutines/reactive/AwaitKt {
55
public static final fun awaitFirstOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
66
public static final fun awaitLast (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
77
public static final fun awaitSingle (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
public static final fun awaitSingleOrDefault (Lorg/reactivestreams/Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9+
public static final fun awaitSingleOrElse (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10+
public static final fun awaitSingleOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
811
}
912

1013
public final class kotlinx/coroutines/reactive/ChannelKt {

reactive/kotlinx-coroutines-reactive/src/Await.kt

+44-4
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,53 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
8080
*/
8181
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
8282

83+
/**
84+
* Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and
85+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
86+
*
87+
* This suspending function is cancellable.
88+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
89+
* immediately resumes with [CancellationException].
90+
*
91+
* @throws NoSuchElementException if publisher does not emit any value
92+
* @throws IllegalArgumentException if publisher emits more than one value
93+
*/
94+
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
95+
96+
/**
97+
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
98+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
99+
*
100+
* This suspending function is cancellable.
101+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102+
* immediately resumes with [CancellationException].
103+
*
104+
* @throws NoSuchElementException if publisher does not emit any value
105+
* @throws IllegalArgumentException if publisher emits more than one value
106+
*/
107+
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
108+
109+
/**
110+
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
111+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
112+
*
113+
* This suspending function is cancellable.
114+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
115+
* immediately resumes with [CancellationException].
116+
*
117+
* @throws NoSuchElementException if publisher does not emit any value
118+
* @throws IllegalArgumentException if publisher emits more than one value
119+
*/
120+
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
121+
83122
// ------------------------ private ------------------------
84123

85124
private enum class Mode(val s: String) {
86125
FIRST("awaitFirst"),
87126
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
88127
LAST("awaitLast"),
89-
SINGLE("awaitSingle");
128+
SINGLE("awaitSingle"),
129+
SINGLE_OR_DEFAULT("awaitSingleOrDefault");
90130
override fun toString(): String = s
91131
}
92132

@@ -114,8 +154,8 @@ private suspend fun <T> Publisher<T>.awaitOne(
114154
cont.resume(t)
115155
}
116156
}
117-
Mode.LAST, Mode.SINGLE -> {
118-
if (mode == Mode.SINGLE && seenValue) {
157+
Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
158+
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
119159
subscription.cancel()
120160
if (cont.isActive)
121161
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
@@ -134,7 +174,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
134174
return
135175
}
136176
when {
137-
mode == Mode.FIRST_OR_DEFAULT -> {
177+
(mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
138178
cont.resume(default as T)
139179
}
140180
cont.isActive -> {

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

+10-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ class IntegrationTest(
4848
assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
4949
assertFailsWith<NoSuchElementException> { pub.awaitLast() }
5050
assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
51+
assertEquals("OK", pub.awaitSingleOrDefault("OK"))
52+
assertNull(pub.awaitSingleOrNull())
53+
assertEquals("ELSE", pub.awaitSingleOrElse { "ELSE" })
5154
var cnt = 0
5255
pub.collect { cnt++ }
5356
assertEquals(0, cnt)
@@ -65,6 +68,9 @@ class IntegrationTest(
6568
assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
6669
assertEquals("OK", pub.awaitLast())
6770
assertEquals("OK", pub.awaitSingle())
71+
assertEquals("OK", pub.awaitSingleOrDefault("!"))
72+
assertEquals("OK", pub.awaitSingleOrNull())
73+
assertEquals("OK", pub.awaitSingleOrElse { "ELSE" })
6874
var cnt = 0
6975
pub.collect {
7076
assertEquals("OK", it)
@@ -84,10 +90,13 @@ class IntegrationTest(
8490
}
8591
assertEquals(1, pub.awaitFirst())
8692
assertEquals(1, pub.awaitFirstOrDefault(0))
87-
assertEquals(n, pub.awaitLast())
8893
assertEquals(1, pub.awaitFirstOrNull())
8994
assertEquals(1, pub.awaitFirstOrElse { 0 })
95+
assertEquals(n, pub.awaitLast())
9096
assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
97+
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
98+
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
99+
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
91100
checkNumbers(n, pub)
92101
val channel = pub.openSubscription()
93102
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))

reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt

+66
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,72 @@ class FluxSingleTest : TestBase() {
6868
}
6969
}
7070

71+
@Test
72+
fun testAwaitSingleOrDefault() {
73+
val flux = flux {
74+
send(Flux.empty<String>().awaitSingleOrDefault("O") + "K")
75+
}
76+
77+
checkSingleValue(flux) {
78+
assertEquals("OK", it)
79+
}
80+
}
81+
82+
@Test
83+
fun testAwaitSingleOrDefaultException() {
84+
val flux = flux {
85+
send(Flux.just("O", "#").awaitSingleOrDefault("!") + "K")
86+
}
87+
88+
checkErroneous(flux) {
89+
assert(it is IllegalArgumentException)
90+
}
91+
}
92+
93+
@Test
94+
fun testAwaitSingleOrNull() {
95+
val flux = flux<String?> {
96+
send(Flux.empty<String>().awaitSingleOrNull() ?: "OK")
97+
}
98+
99+
checkSingleValue(flux) {
100+
assertEquals("OK", it)
101+
}
102+
}
103+
104+
@Test
105+
fun testAwaitSingleOrNullException() {
106+
val flux = flux {
107+
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
108+
}
109+
110+
checkErroneous(flux) {
111+
assert(it is IllegalArgumentException)
112+
}
113+
}
114+
115+
@Test
116+
fun testAwaitSingleOrElse() {
117+
val flux = flux {
118+
send(Flux.empty<String>().awaitSingleOrElse { "O" } + "K")
119+
}
120+
121+
checkSingleValue(flux) {
122+
assertEquals("OK", it)
123+
}
124+
}
125+
126+
@Test
127+
fun testAwaitSingleOrElseException() {
128+
val flux = flux {
129+
send(Flux.just("O", "#").awaitSingleOrElse { "!" } + "K")
130+
}
131+
132+
checkErroneous(flux) {
133+
assert(it is IllegalArgumentException)
134+
}
135+
}
136+
71137
@Test
72138
fun testAwaitFirst() {
73139
val flux = flux {

0 commit comments

Comments
 (0)