Skip to content

Commit 51b5ff7

Browse files
committed
Introduce Mono<T>.singleOrNull
1 parent a01d0bd commit 51b5ff7

File tree

7 files changed

+202
-24
lines changed

7 files changed

+202
-24
lines changed

reactive/kotlinx-coroutines-reactive/src/Await.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
9494
* @throws IllegalArgumentException if the publisher emits more than one value
9595
*/
9696
@Deprecated(
97-
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
97+
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
98+
"Please consider using awaitFirstOrDefault().",
9899
level = DeprecationLevel.WARNING
99100
)
100101
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
@@ -112,7 +113,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitO
112113
*/
113114
@Deprecated(
114115
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
115-
"There is a specialized version for Reactor's Mono, please use that where applicable.",
116+
"There is a specialized version for Reactor's Mono, please use that where applicable. " +
117+
"Alternatively, please consider using awaitFirstOrNull().",
116118
level = DeprecationLevel.WARNING,
117119
replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
118120
)
@@ -130,7 +132,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SING
130132
* @throws IllegalArgumentException if the publisher emits more than one value
131133
*/
132134
@Deprecated(
133-
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
135+
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
136+
"Please consider using awaitFirstOrElse().",
134137
level = DeprecationLevel.WARNING
135138
)
136139
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ public final class kotlinx/coroutines/reactor/FluxKt {
1717
}
1818

1919
public final class kotlinx/coroutines/reactor/MonoKt {
20+
public static final fun awaitFirst (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
21+
public static final fun awaitFirstOrDefault (Lreactor/core/publisher/Mono;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
22+
public static final fun awaitFirstOrElse (Lreactor/core/publisher/Mono;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
23+
public static final fun awaitFirstOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
24+
public static final fun awaitLast (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
25+
public static final fun awaitSingle (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
26+
public static final fun awaitSingleOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
2027
public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
2128
public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
2229
public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+129-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
package kotlinx.coroutines.reactor
88

99
import kotlinx.coroutines.*
10-
import kotlinx.coroutines.reactive.*
1110
import org.reactivestreams.*
1211
import reactor.core.*
1312
import reactor.core.publisher.*
1413
import kotlin.coroutines.*
15-
import kotlin.internal.*
1614

1715
/**
1816
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
@@ -34,6 +32,50 @@ public fun <T> mono(
3432
return monoInternal(GlobalScope, context, block)
3533
}
3634

35+
/**
36+
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if
37+
* this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value,
38+
* `null` is returned.
39+
*
40+
* This suspending function is cancellable.
41+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
42+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
43+
*/
44+
public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
45+
subscribe(object : Subscriber<T> {
46+
private var seenValue = false
47+
48+
override fun onSubscribe(s: Subscription) {
49+
cont.invokeOnCancellation { s.cancel() }
50+
s.request(Long.MAX_VALUE)
51+
}
52+
53+
override fun onComplete() {
54+
if (!seenValue)
55+
cont.resume(null)
56+
}
57+
58+
override fun onNext(t: T) {
59+
seenValue = true
60+
cont.resume(t)
61+
}
62+
63+
override fun onError(error: Throwable) { cont.resumeWithException(error) }
64+
})
65+
}
66+
67+
/**
68+
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or,
69+
* if this Mono has produced an error, throws the corresponding exception.
70+
*
71+
* This suspending function is cancellable.
72+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
73+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
74+
*
75+
* @throws NoSuchElementException if the Mono does not emit any value
76+
*/
77+
public suspend fun <T> Mono<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
78+
3779
private fun <T> monoInternal(
3880
scope: CoroutineScope, // support for legacy mono in scope
3981
context: CoroutineContext,
@@ -89,3 +131,88 @@ public fun <T> CoroutineScope.mono(
89131
block: suspend CoroutineScope.() -> T?
90132
): Mono<T> = monoInternal(this, context, block)
91133

134+
/**
135+
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
136+
* the publisher has produced an error, throws the corresponding exception.
137+
*
138+
* This suspending function is cancellable.
139+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
140+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
141+
*
142+
* @throws NoSuchElementException if the publisher does not emit any value
143+
*/
144+
@Deprecated(
145+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
146+
"Please use awaitSingle() instead.",
147+
level = DeprecationLevel.WARNING,
148+
replaceWith = ReplaceWith("this.awaitSingle()")
149+
)
150+
public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle()
151+
152+
/**
153+
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
154+
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
155+
* exception.
156+
*
157+
* This suspending function is cancellable.
158+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
159+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
160+
*/
161+
@Deprecated(
162+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
163+
"Please use awaitSingleOrNull() instead.",
164+
level = DeprecationLevel.WARNING,
165+
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
166+
)
167+
public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default
168+
169+
/**
170+
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
171+
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
172+
*
173+
* This suspending function is cancellable.
174+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
175+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
176+
*/
177+
@Deprecated(
178+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
179+
"Please use awaitSingleOrNull() instead.",
180+
level = DeprecationLevel.WARNING,
181+
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
182+
)
183+
public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull()
184+
185+
/**
186+
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
187+
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
188+
* corresponding exception.
189+
*
190+
* This suspending function is cancellable.
191+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
192+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
193+
*/
194+
@Deprecated(
195+
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
196+
"Please use awaitSingleOrNull() instead.",
197+
level = DeprecationLevel.WARNING,
198+
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()")
199+
)
200+
public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue()
201+
202+
/**
203+
* Awaits the last value from the given publisher without blocking the thread and
204+
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
205+
*
206+
* This suspending function is cancellable.
207+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
208+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
209+
*
210+
* @throws NoSuchElementException if the publisher does not emit any value
211+
*/
212+
@Deprecated(
213+
message = "Mono produces at most one value, so the last element is the same as the first. " +
214+
"Please use awaitSingle() instead.",
215+
level = DeprecationLevel.WARNING,
216+
replaceWith = ReplaceWith("this.awaitSingle()")
217+
)
218+
public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle()

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import kotlinx.coroutines.reactive.*
1616
*
1717
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
1818
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
19-
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
20-
* subscriber's [Context].
19+
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst])
20+
* also propagate the [ReactorContext] to the subscriber's [Context].
2121
**
2222
* ### Examples of Reactive context integration.
2323
*

reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import kotlin.test.*
1414
class FlowAsFluxTest : TestBase() {
1515
@Test
1616
fun testFlowAsFluxContextPropagation() {
17-
val flux = flow<String> {
18-
(1..4).forEach { i -> emit(createMono(i).awaitFirst()) }
17+
val flux = flow {
18+
(1..4).forEach { i -> emit(createMono(i).awaitSingle()) }
1919
}
2020
.asFlux()
2121
.contextWrite(Context.of(1, "1"))

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+52-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import org.junit.Test
1313
import org.reactivestreams.*
1414
import reactor.core.publisher.*
1515
import reactor.util.context.*
16-
import java.time.*
1716
import java.time.Duration.*
1817
import java.util.function.*
1918
import kotlin.test.*
@@ -115,6 +114,52 @@ class MonoTest : TestBase() {
115114
@Test
116115
fun testMonoAwait() = runBlocking {
117116
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
117+
assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K")
118+
assertFailsWith<NoSuchElementException>{ Mono.empty<String>().awaitSingle() }
119+
assertNull(Mono.empty<Int>().awaitSingleOrNull())
120+
}
121+
122+
/** Tests that the versions of the await methods specialized for Mono for deprecation behave correctly and we don't
123+
* break any code by introducing them. */
124+
@Test
125+
@Suppress("DEPRECATION")
126+
fun testDeprecatedAwaitMethods() = runBlocking {
127+
val filledMono = mono<String> { "OK" }
128+
assertEquals("OK", filledMono.awaitFirst())
129+
assertEquals("OK", filledMono.awaitFirstOrDefault("!"))
130+
assertEquals("OK", filledMono.awaitFirstOrNull())
131+
assertEquals("OK", filledMono.awaitFirstOrElse { "ELSE" })
132+
assertEquals("OK", filledMono.awaitLast())
133+
assertEquals("OK", filledMono.awaitSingleOrDefault("!"))
134+
assertEquals("OK", filledMono.awaitSingleOrElse { "ELSE" })
135+
val emptyMono = mono<String> { null }
136+
assertFailsWith<NoSuchElementException> { emptyMono.awaitFirst() }
137+
assertEquals("OK", emptyMono.awaitFirstOrDefault("OK"))
138+
assertNull(emptyMono.awaitFirstOrNull())
139+
assertEquals("ELSE", emptyMono.awaitFirstOrElse { "ELSE" })
140+
assertFailsWith<NoSuchElementException> { emptyMono.awaitLast() }
141+
assertEquals("OK", emptyMono.awaitSingleOrDefault("OK"))
142+
assertEquals("ELSE", emptyMono.awaitSingleOrElse { "ELSE" })
143+
}
144+
145+
/** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException]
146+
* and unsubscribe from the publisher when their [Job] is cancelled. */
147+
@Test
148+
fun testAwaitCancellation() = runTest {
149+
expect(1)
150+
val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) }
151+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
152+
try {
153+
expect(2)
154+
mono.awaitSingleOrNull()
155+
} catch (e: CancellationException) {
156+
expect(6)
157+
throw e
158+
}
159+
}
160+
expect(4)
161+
job.cancelAndJoin()
162+
finish(7)
118163
}
119164

120165
@Test
@@ -264,7 +309,7 @@ class MonoTest : TestBase() {
264309
.interval(ofMillis(1))
265310
.switchMap {
266311
mono(coroutineContext) {
267-
timeBomb().awaitFirst()
312+
timeBomb().awaitSingle()
268313
}
269314
}
270315
.onErrorReturn({
@@ -275,14 +320,14 @@ class MonoTest : TestBase() {
275320
finish(2)
276321
}
277322

278-
private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
323+
private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
279324

280325
@Test
281326
fun testLeakedException() = runBlocking {
282327
// Test exception is not reported to global handler
283328
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
284329
repeat(10000) {
285-
combine(flow, flow) { _, _ -> Unit }
330+
combine(flow, flow) { _, _ -> }
286331
.catch {}
287332
.collect { }
288333
}
@@ -373,13 +418,13 @@ class MonoTest : TestBase() {
373418
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
374419
}
375420

376-
/** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then.
421+
/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then.
377422
*
378423
* Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
379424
* ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
380-
private suspend fun <T> Publisher<T>.awaitCancelAndJoin() = coroutineScope {
425+
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
381426
async(start = CoroutineStart.UNDISPATCHED) {
382-
awaitFirstOrNull()
427+
awaitSingleOrNull()
383428
}.cancelAndJoin()
384429
}
385430
}

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+4-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class ReactorContextTest : TestBase() {
2020
}
2121
} .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
2222
.contextWrite { ctx -> ctx.put(6, "6") }
23-
assertEquals(mono.awaitFirst(), "1234567")
23+
assertEquals(mono.awaitSingle(), "1234567")
2424
}
2525

2626
@Test
@@ -43,22 +43,18 @@ class ReactorContextTest : TestBase() {
4343
(1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
4444
}
4545
} .contextWrite(Context.of(2, "2"))
46-
.awaitFirst()
46+
.awaitSingle()
4747
assertEquals(result, "123")
4848
}
4949

5050
@Test
5151
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
52-
assertEquals(createMono().awaitFirst(), "7")
53-
assertEquals(createMono().awaitFirstOrDefault("noValue"), "7")
54-
assertEquals(createMono().awaitFirstOrNull(), "7")
55-
assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7")
56-
assertEquals(createMono().awaitLast(), "7")
5752
assertEquals(createMono().awaitSingle(), "7")
53+
assertEquals(createMono().awaitSingleOrNull(), "7")
5854
}
5955

6056
@Test
61-
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
57+
fun testFluxAwaitContextPropagation() = runBlocking(
6258
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
6359
) {
6460
assertEquals(createFlux().awaitFirst(), "1")

0 commit comments

Comments
 (0)