Skip to content

Commit 80e5fc9

Browse files
committed
Introduce Mono<T>.singleOrNull
1 parent 68d8fe0 commit 80e5fc9

File tree

7 files changed

+202
-24
lines changed

7 files changed

+202
-24
lines changed

reactive/kotlinx-coroutines-reactive/src/Await.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
9494
* @throws IllegalArgumentException if the publisher emits more than one value
9595
*/
9696
@Deprecated(
97-
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
97+
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
98+
"Please consider using awaitFirstOrDefault().",
9899
level = DeprecationLevel.WARNING
99100
)
100101
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
@@ -112,7 +113,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitO
112113
*/
113114
@Deprecated(
114115
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
115-
"There is a specialized version for Reactor's Mono, please use that where applicable.",
116+
"There is a specialized version for Reactor's Mono, please use that where applicable. " +
117+
"Alternatively, please consider using awaitFirstOrNull().",
116118
level = DeprecationLevel.WARNING,
117119
replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
118120
)
@@ -130,7 +132,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SING
130132
* @throws IllegalArgumentException if the publisher emits more than one value
131133
*/
132134
@Deprecated(
133-
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
135+
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
136+
"Please consider using awaitFirstOrElse().",
134137
level = DeprecationLevel.WARNING
135138
)
136139
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ public final class kotlinx/coroutines/reactor/FluxKt {
1717
}
1818

1919
public final class kotlinx/coroutines/reactor/MonoKt {
20+
public static final fun awaitFirst (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
21+
public static final fun awaitFirstOrDefault (Lreactor/core/publisher/Mono;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
22+
public static final fun awaitFirstOrElse (Lreactor/core/publisher/Mono;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
23+
public static final fun awaitFirstOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
24+
public static final fun awaitLast (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
25+
public static final fun awaitSingle (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
26+
public static final fun awaitSingleOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
2027
public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
2128
public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
2229
public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+129-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
package kotlinx.coroutines.reactor
88

99
import kotlinx.coroutines.*
10-
import kotlinx.coroutines.reactive.*
1110
import org.reactivestreams.*
1211
import reactor.core.*
1312
import reactor.core.publisher.*
1413
import kotlin.coroutines.*
15-
import kotlin.internal.*
1614
import kotlinx.coroutines.internal.*
1715

1816
/**
@@ -35,6 +33,50 @@ public fun <T> mono(
3533
return monoInternal(GlobalScope, context, block)
3634
}
3735

36+
/**
37+
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if
38+
* this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value,
39+
* `null` is returned.
40+
*
41+
* This suspending function is cancellable.
42+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
44+
*/
45+
public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46+
subscribe(object : Subscriber<T> {
47+
private var seenValue = false
48+
49+
override fun onSubscribe(s: Subscription) {
50+
cont.invokeOnCancellation { s.cancel() }
51+
s.request(Long.MAX_VALUE)
52+
}
53+
54+
override fun onComplete() {
55+
if (!seenValue)
56+
cont.resume(null)
57+
}
58+
59+
override fun onNext(t: T) {
60+
seenValue = true
61+
cont.resume(t)
62+
}
63+
64+
override fun onError(error: Throwable) { cont.resumeWithException(error) }
65+
})
66+
}
67+
68+
/**
69+
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or,
70+
* if this Mono has produced an error, throws the corresponding exception.
71+
*
72+
* This suspending function is cancellable.
73+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
74+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
75+
*
76+
* @throws NoSuchElementException if the Mono does not emit any value
77+
*/
78+
public suspend fun <T> Mono<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
79+
3880
private fun <T> monoInternal(
3981
scope: CoroutineScope, // support for legacy mono in scope
4082
context: CoroutineContext,
@@ -92,3 +134,88 @@ public fun <T> CoroutineScope.mono(
92134
block: suspend CoroutineScope.() -> T?
93135
): Mono<T> = monoInternal(this, context, block)
94136

137+
/**
138+
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
139+
* the publisher has produced an error, throws the corresponding exception.
140+
*
141+
* This suspending function is cancellable.
142+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
143+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
144+
*
145+
* @throws NoSuchElementException if the publisher does not emit any value
146+
*/
147+
@Deprecated(
148+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
149+
"Please use awaitSingle() instead.",
150+
level = DeprecationLevel.WARNING,
151+
replaceWith = ReplaceWith("this.awaitSingle()")
152+
)
153+
public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle()
154+
155+
/**
156+
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
157+
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
158+
* exception.
159+
*
160+
* This suspending function is cancellable.
161+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
163+
*/
164+
@Deprecated(
165+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
166+
"Please use awaitSingleOrNull() instead.",
167+
level = DeprecationLevel.WARNING,
168+
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
169+
)
170+
public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default
171+
172+
/**
173+
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
174+
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
175+
*
176+
* This suspending function is cancellable.
177+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
178+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
179+
*/
180+
@Deprecated(
181+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
182+
"Please use awaitSingleOrNull() instead.",
183+
level = DeprecationLevel.WARNING,
184+
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
185+
)
186+
public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull()
187+
188+
/**
189+
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
190+
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
191+
* corresponding exception.
192+
*
193+
* This suspending function is cancellable.
194+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
195+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
196+
*/
197+
@Deprecated(
198+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
199+
"Please use awaitSingleOrNull() instead.",
200+
level = DeprecationLevel.WARNING,
201+
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()")
202+
)
203+
public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue()
204+
205+
/**
206+
* Awaits the last value from the given publisher without blocking the thread and
207+
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
208+
*
209+
* This suspending function is cancellable.
210+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
211+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
212+
*
213+
* @throws NoSuchElementException if the publisher does not emit any value
214+
*/
215+
@Deprecated(
216+
message = "Mono produces at most one value, so the last element is the same as the first. " +
217+
"Please use awaitSingle() instead.",
218+
level = DeprecationLevel.WARNING,
219+
replaceWith = ReplaceWith("this.awaitSingle()")
220+
)
221+
public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle()

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import kotlinx.coroutines.reactive.*
1616
*
1717
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
1818
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
19-
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
20-
* subscriber's [Context].
19+
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst])
20+
* also propagate the [ReactorContext] to the subscriber's [Context].
2121
**
2222
* ### Examples of Reactive context integration.
2323
*

reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import kotlin.test.*
1414
class FlowAsFluxTest : TestBase() {
1515
@Test
1616
fun testFlowAsFluxContextPropagation() {
17-
val flux = flow<String> {
18-
(1..4).forEach { i -> emit(createMono(i).awaitFirst()) }
17+
val flux = flow {
18+
(1..4).forEach { i -> emit(createMono(i).awaitSingle()) }
1919
}
2020
.asFlux()
2121
.contextWrite(Context.of(1, "1"))

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+52-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import org.junit.Test
1313
import org.reactivestreams.*
1414
import reactor.core.publisher.*
1515
import reactor.util.context.*
16-
import java.time.*
1716
import java.time.Duration.*
1817
import java.util.function.*
1918
import kotlin.test.*
@@ -115,6 +114,52 @@ class MonoTest : TestBase() {
115114
@Test
116115
fun testMonoAwait() = runBlocking {
117116
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
117+
assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K")
118+
assertFailsWith<NoSuchElementException>{ Mono.empty<String>().awaitSingle() }
119+
assertNull(Mono.empty<Int>().awaitSingleOrNull())
120+
}
121+
122+
/** Tests that the versions of the await methods specialized for Mono for deprecation behave correctly and we don't
123+
* break any code by introducing them. */
124+
@Test
125+
@Suppress("DEPRECATION")
126+
fun testDeprecatedAwaitMethods() = runBlocking {
127+
val filledMono = mono<String> { "OK" }
128+
assertEquals("OK", filledMono.awaitFirst())
129+
assertEquals("OK", filledMono.awaitFirstOrDefault("!"))
130+
assertEquals("OK", filledMono.awaitFirstOrNull())
131+
assertEquals("OK", filledMono.awaitFirstOrElse { "ELSE" })
132+
assertEquals("OK", filledMono.awaitLast())
133+
assertEquals("OK", filledMono.awaitSingleOrDefault("!"))
134+
assertEquals("OK", filledMono.awaitSingleOrElse { "ELSE" })
135+
val emptyMono = mono<String> { null }
136+
assertFailsWith<NoSuchElementException> { emptyMono.awaitFirst() }
137+
assertEquals("OK", emptyMono.awaitFirstOrDefault("OK"))
138+
assertNull(emptyMono.awaitFirstOrNull())
139+
assertEquals("ELSE", emptyMono.awaitFirstOrElse { "ELSE" })
140+
assertFailsWith<NoSuchElementException> { emptyMono.awaitLast() }
141+
assertEquals("OK", emptyMono.awaitSingleOrDefault("OK"))
142+
assertEquals("ELSE", emptyMono.awaitSingleOrElse { "ELSE" })
143+
}
144+
145+
/** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException]
146+
* and unsubscribe from the publisher when their [Job] is cancelled. */
147+
@Test
148+
fun testAwaitCancellation() = runTest {
149+
expect(1)
150+
val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) }
151+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
152+
try {
153+
expect(2)
154+
mono.awaitSingleOrNull()
155+
} catch (e: CancellationException) {
156+
expect(6)
157+
throw e
158+
}
159+
}
160+
expect(4)
161+
job.cancelAndJoin()
162+
finish(7)
118163
}
119164

120165
@Test
@@ -264,7 +309,7 @@ class MonoTest : TestBase() {
264309
.interval(ofMillis(1))
265310
.switchMap {
266311
mono(coroutineContext) {
267-
timeBomb().awaitFirst()
312+
timeBomb().awaitSingle()
268313
}
269314
}
270315
.onErrorReturn({
@@ -275,14 +320,14 @@ class MonoTest : TestBase() {
275320
finish(2)
276321
}
277322

278-
private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
323+
private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
279324

280325
@Test
281326
fun testLeakedException() = runBlocking {
282327
// Test exception is not reported to global handler
283328
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
284329
repeat(10000) {
285-
combine(flow, flow) { _, _ -> Unit }
330+
combine(flow, flow) { _, _ -> }
286331
.catch {}
287332
.collect { }
288333
}
@@ -373,13 +418,13 @@ class MonoTest : TestBase() {
373418
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
374419
}
375420

376-
/** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then.
421+
/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then.
377422
*
378423
* Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
379424
* ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
380-
private suspend fun <T> Publisher<T>.awaitCancelAndJoin() = coroutineScope {
425+
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
381426
async(start = CoroutineStart.UNDISPATCHED) {
382-
awaitFirstOrNull()
427+
awaitSingleOrNull()
383428
}.cancelAndJoin()
384429
}
385430
}

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+4-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class ReactorContextTest : TestBase() {
2020
}
2121
} .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
2222
.contextWrite { ctx -> ctx.put(6, "6") }
23-
assertEquals(mono.awaitFirst(), "1234567")
23+
assertEquals(mono.awaitSingle(), "1234567")
2424
}
2525

2626
@Test
@@ -43,22 +43,18 @@ class ReactorContextTest : TestBase() {
4343
(1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
4444
}
4545
} .contextWrite(Context.of(2, "2"))
46-
.awaitFirst()
46+
.awaitSingle()
4747
assertEquals(result, "123")
4848
}
4949

5050
@Test
5151
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
52-
assertEquals(createMono().awaitFirst(), "7")
53-
assertEquals(createMono().awaitFirstOrDefault("noValue"), "7")
54-
assertEquals(createMono().awaitFirstOrNull(), "7")
55-
assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7")
56-
assertEquals(createMono().awaitLast(), "7")
5752
assertEquals(createMono().awaitSingle(), "7")
53+
assertEquals(createMono().awaitSingleOrNull(), "7")
5854
}
5955

6056
@Test
61-
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
57+
fun testFluxAwaitContextPropagation() = runBlocking(
6258
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
6359
) {
6460
assertEquals(createFlux().awaitFirst(), "1")

0 commit comments

Comments
 (0)