Skip to content

Add Mono<T>.await #2640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class kotlinx/coroutines/reactor/FluxKt {
}

public final class kotlinx/coroutines/reactor/MonoKt {
public static final fun await (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
Expand Down
13 changes: 11 additions & 2 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
Expand All @@ -34,6 +32,17 @@ public fun <T> mono(
return monoInternal(GlobalScope, context, block)
}

/**
* Awaits the completion of the [Mono] without blocking the thread.
* Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
* [Mono] has produced an error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
* function immediately resumes with [CancellationException] and cancels its subscription.
*/
public suspend fun <T> Mono<T>.await(): T? = awaitFirstOrNull()

private fun <T> monoInternal(
scope: CoroutineScope, // support for legacy mono in scope
context: CoroutineContext,
Expand Down
14 changes: 9 additions & 5 deletions reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.junit.Test
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import java.time.*
import java.time.Duration.*
import java.util.function.*
import kotlin.test.*
Expand Down Expand Up @@ -114,13 +113,13 @@ class MonoTest : TestBase() {

@Test
fun testMonoAwait() = runBlocking {
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
assertEquals("OK", Mono.just("O").await() + "K")
}

@Test
fun testMonoEmitAndAwait() {
val mono = mono {
Mono.just("O").awaitSingle() + "K"
Mono.just("O").await() + "K"
}

checkMonoValue(mono) {
Expand Down Expand Up @@ -275,19 +274,24 @@ class MonoTest : TestBase() {
finish(2)
}

private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }

@Test
fun testLeakedException() = runBlocking {
// Test exception is not reported to global handler
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
combine(flow, flow) { _, _ -> }
.catch {}
.collect { }
}
}

@Test
fun testMonoAwaitNull() = runBlocking {
assertNull(Mono.empty<String>().await())
}

/** Test that cancelling a [mono] due to a timeout does throw an exception. */
@Test
fun testTimeout() {
Expand Down