Skip to content

Add Publisher.awaitSingleOrDefault|Null|Else extensions #2260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ public final class kotlinx/coroutines/reactive/AwaitKt {
public static final fun awaitFirstOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrDefault (Lorg/reactivestreams/Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrElse (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrNull (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/reactive/ChannelKt {
Expand Down
48 changes: 44 additions & 4 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,53 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
*/
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

/**
* Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
*/
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)

/**
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
*/
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)

/**
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
*/
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()

// ------------------------ private ------------------------

private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
LAST("awaitLast"),
SINGLE("awaitSingle");
SINGLE("awaitSingle"),
SINGLE_OR_DEFAULT("awaitSingleOrDefault");
override fun toString(): String = s
}

Expand Down Expand Up @@ -114,8 +154,8 @@ private suspend fun <T> Publisher<T>.awaitOne(
cont.resume(t)
}
}
Mode.LAST, Mode.SINGLE -> {
if (mode == Mode.SINGLE && seenValue) {
Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
subscription.cancel()
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
Expand All @@ -134,7 +174,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
return
}
when {
mode == Mode.FIRST_OR_DEFAULT -> {
(mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
cont.resume(default as T)
}
cont.isActive -> {
Expand Down
11 changes: 10 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class IntegrationTest(
assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
assertFailsWith<NoSuchElementException> { pub.awaitLast() }
assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
assertEquals("OK", pub.awaitSingleOrDefault("OK"))
assertNull(pub.awaitSingleOrNull())
assertEquals("ELSE", pub.awaitSingleOrElse { "ELSE" })
var cnt = 0
pub.collect { cnt++ }
assertEquals(0, cnt)
Expand All @@ -65,6 +68,9 @@ class IntegrationTest(
assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
assertEquals("OK", pub.awaitLast())
assertEquals("OK", pub.awaitSingle())
assertEquals("OK", pub.awaitSingleOrDefault("!"))
assertEquals("OK", pub.awaitSingleOrNull())
assertEquals("OK", pub.awaitSingleOrElse { "ELSE" })
var cnt = 0
pub.collect {
assertEquals("OK", it)
Expand All @@ -84,10 +90,13 @@ class IntegrationTest(
}
assertEquals(1, pub.awaitFirst())
assertEquals(1, pub.awaitFirstOrDefault(0))
assertEquals(n, pub.awaitLast())
assertEquals(1, pub.awaitFirstOrNull())
assertEquals(1, pub.awaitFirstOrElse { 0 })
assertEquals(n, pub.awaitLast())
assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
checkNumbers(n, pub)
val channel = pub.openSubscription()
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
Expand Down
66 changes: 66 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,72 @@ class FluxSingleTest : TestBase() {
}
}

@Test
fun testAwaitSingleOrDefault() {
val flux = flux {
send(Flux.empty<String>().awaitSingleOrDefault("O") + "K")
}

checkSingleValue(flux) {
assertEquals("OK", it)
}
}

@Test
fun testAwaitSingleOrDefaultException() {
val flux = flux {
send(Flux.just("O", "#").awaitSingleOrDefault("!") + "K")
}

checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}

@Test
fun testAwaitSingleOrNull() {
val flux = flux<String?> {
send(Flux.empty<String>().awaitSingleOrNull() ?: "OK")
}

checkSingleValue(flux) {
assertEquals("OK", it)
}
}

@Test
fun testAwaitSingleOrNullException() {
val flux = flux {
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
}

checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}

@Test
fun testAwaitSingleOrElse() {
val flux = flux {
send(Flux.empty<String>().awaitSingleOrElse { "O" } + "K")
}

checkSingleValue(flux) {
assertEquals("OK", it)
}
}

@Test
fun testAwaitSingleOrElseException() {
val flux = flux {
send(Flux.just("O", "#").awaitSingleOrElse { "!" } + "K")
}

checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}

@Test
fun testAwaitFirst() {
val flux = flux {
Expand Down