diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3a1c8b7d31..fcf3eb9e94 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -17,6 +17,7 @@ public final class kotlinx/coroutines/reactor/FluxKt { } public final class kotlinx/coroutines/reactor/MonoKt { + public static final fun await (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 307ec2278c..036cb35d4c 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -8,11 +8,9 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* -import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. @@ -34,6 +32,17 @@ public fun mono( return monoInternal(GlobalScope, context, block) } +/** + * Awaits the completion of the [Mono] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [Mono] has produced an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and cancels its subscription. + */ +public suspend fun Mono.await(): T? = awaitFirstOrNull() + private fun monoInternal( scope: CoroutineScope, // support for legacy mono in scope context: CoroutineContext, diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index a98c514f19..5b8c9acdf6 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -13,7 +13,6 @@ import org.junit.Test import org.reactivestreams.* import reactor.core.publisher.* import reactor.util.context.* -import java.time.* import java.time.Duration.* import java.util.function.* import kotlin.test.* @@ -114,13 +113,13 @@ class MonoTest : TestBase() { @Test fun testMonoAwait() = runBlocking { - assertEquals("OK", Mono.just("O").awaitSingle() + "K") + assertEquals("OK", Mono.just("O").await() + "K") } @Test fun testMonoEmitAndAwait() { val mono = mono { - Mono.just("O").awaitSingle() + "K" + Mono.just("O").await() + "K" } checkMonoValue(mono) { @@ -275,19 +274,24 @@ class MonoTest : TestBase() { finish(2) } - private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } + private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } @Test fun testLeakedException() = runBlocking { // Test exception is not reported to global handler val flow = mono { throw TestException() }.toFlux().asFlow() repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect { } } } + @Test + fun testMonoAwaitNull() = runBlocking { + assertNull(Mono.empty().await()) + } + /** Test that cancelling a [mono] due to a timeout does throw an exception. */ @Test fun testTimeout() {