diff --git a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api index 5783edeaa1..961fdbe238 100644 --- a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api +++ b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api @@ -5,6 +5,9 @@ public final class kotlinx/coroutines/reactive/AwaitKt { public static final fun awaitFirstOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitLast (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitSingle (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrDefault (Lorg/reactivestreams/Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrElse (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/reactive/ChannelKt { diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 9ea2e3c50e..7956c26010 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -80,13 +80,53 @@ public suspend fun Publisher.awaitLast(): T = awaitOne(Mode.LAST) */ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) +/** + * Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + * @throws IllegalArgumentException if publisher emits more than one value + */ +public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) + +/** + * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + * @throws IllegalArgumentException if publisher emits more than one value + */ +public suspend fun Publisher.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT) + +/** + * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + * @throws IllegalArgumentException if publisher emits more than one value + */ +public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() + // ------------------------ private ------------------------ private enum class Mode(val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), LAST("awaitLast"), - SINGLE("awaitSingle"); + SINGLE("awaitSingle"), + SINGLE_OR_DEFAULT("awaitSingleOrDefault"); override fun toString(): String = s } @@ -114,8 +154,8 @@ private suspend fun Publisher.awaitOne( cont.resume(t) } } - Mode.LAST, Mode.SINGLE -> { - if (mode == Mode.SINGLE && seenValue) { + Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> { + if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) { subscription.cancel() if (cont.isActive) cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode")) @@ -134,7 +174,7 @@ private suspend fun Publisher.awaitOne( return } when { - mode == Mode.FIRST_OR_DEFAULT -> { + (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> { cont.resume(default as T) } cont.isActive -> { diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 6f7d98480b..18cd012d16 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -48,6 +48,9 @@ class IntegrationTest( assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" }) assertFailsWith { pub.awaitLast() } assertFailsWith { pub.awaitSingle() } + assertEquals("OK", pub.awaitSingleOrDefault("OK")) + assertNull(pub.awaitSingleOrNull()) + assertEquals("ELSE", pub.awaitSingleOrElse { "ELSE" }) var cnt = 0 pub.collect { cnt++ } assertEquals(0, cnt) @@ -65,6 +68,9 @@ class IntegrationTest( assertEquals("OK", pub.awaitFirstOrElse { "ELSE" }) assertEquals("OK", pub.awaitLast()) assertEquals("OK", pub.awaitSingle()) + assertEquals("OK", pub.awaitSingleOrDefault("!")) + assertEquals("OK", pub.awaitSingleOrNull()) + assertEquals("OK", pub.awaitSingleOrElse { "ELSE" }) var cnt = 0 pub.collect { assertEquals("OK", it) @@ -84,10 +90,13 @@ class IntegrationTest( } assertEquals(1, pub.awaitFirst()) assertEquals(1, pub.awaitFirstOrDefault(0)) - assertEquals(n, pub.awaitLast()) assertEquals(1, pub.awaitFirstOrNull()) assertEquals(1, pub.awaitFirstOrElse { 0 }) + assertEquals(n, pub.awaitLast()) assertFailsWith { pub.awaitSingle() } + assertFailsWith { pub.awaitSingleOrDefault(0) } + assertFailsWith { pub.awaitSingleOrNull() } + assertFailsWith { pub.awaitSingleOrElse { 0 } } checkNumbers(n, pub) val channel = pub.openSubscription() checkNumbers(n, channel.asPublisher(ctx(coroutineContext))) diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index 3879c62c71..cc336ba6b5 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -68,6 +68,72 @@ class FluxSingleTest : TestBase() { } } + @Test + fun testAwaitSingleOrDefault() { + val flux = flux { + send(Flux.empty().awaitSingleOrDefault("O") + "K") + } + + checkSingleValue(flux) { + assertEquals("OK", it) + } + } + + @Test + fun testAwaitSingleOrDefaultException() { + val flux = flux { + send(Flux.just("O", "#").awaitSingleOrDefault("!") + "K") + } + + checkErroneous(flux) { + assert(it is IllegalArgumentException) + } + } + + @Test + fun testAwaitSingleOrNull() { + val flux = flux { + send(Flux.empty().awaitSingleOrNull() ?: "OK") + } + + checkSingleValue(flux) { + assertEquals("OK", it) + } + } + + @Test + fun testAwaitSingleOrNullException() { + val flux = flux { + send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K") + } + + checkErroneous(flux) { + assert(it is IllegalArgumentException) + } + } + + @Test + fun testAwaitSingleOrElse() { + val flux = flux { + send(Flux.empty().awaitSingleOrElse { "O" } + "K") + } + + checkSingleValue(flux) { + assertEquals("OK", it) + } + } + + @Test + fun testAwaitSingleOrElseException() { + val flux = flux { + send(Flux.just("O", "#").awaitSingleOrElse { "!" } + "K") + } + + checkErroneous(flux) { + assert(it is IllegalArgumentException) + } + } + @Test fun testAwaitFirst() { val flux = flux {