Skip to content

Commit 505eea4

Browse files
committed
Add 'awaitSingleOr*' methods to the JDK9 Flow integration
1 parent 386f5d3 commit 505eea4

File tree

3 files changed

+55
-2
lines changed

3 files changed

+55
-2
lines changed

reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ public final class kotlinx/coroutines/jdk9/AwaitKt {
55
public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
66
public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
77
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
public static final fun awaitSingleOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9+
public static final fun awaitSingleOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10+
public static final fun awaitSingleOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
811
}
912

1013
public final class kotlinx/coroutines/jdk9/PublishKt {

reactive/kotlinx-coroutines-jdk9/src/Await.kt

+41
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,44 @@ public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
7979
*/
8080
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
8181
FlowAdapters.toPublisher(this).awaitSingle()
82+
83+
/**
84+
* Awaits the single value from the given observable, or returns the [default] value if none is emitted, without
85+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
86+
* corresponding exception.
87+
*
88+
* This suspending function is cancellable.
89+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
90+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
91+
*
92+
* @throws NoSuchElementException if the publisher does not emit any value
93+
* @throws IllegalArgumentException if the publisher emits more than one value
94+
*/
95+
public suspend fun <T> Flow.Publisher<T>.awaitSingleOrDefault(default: T): T =
96+
FlowAdapters.toPublisher(this).awaitSingleOrDefault(default)
97+
98+
/**
99+
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
100+
* this observable has produced an error, throws the corresponding exception. If more than one value or none were
101+
* produced by the publisher, `null` is returned.
102+
*
103+
* This suspending function is cancellable.
104+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
105+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
106+
*/
107+
public suspend fun <T> Flow.Publisher<T>.awaitSingleOrNull(): T? =
108+
FlowAdapters.toPublisher(this).awaitSingleOrNull()
109+
110+
/**
111+
* Awaits the single value from the given observable, or calls [defaultValue] to get a value if none is emitted, without
112+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
113+
* corresponding exception.
114+
*
115+
* This suspending function is cancellable.
116+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
117+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
118+
*
119+
* @throws IllegalArgumentException if the publisher emits more than one value
120+
*/
121+
public suspend fun <T> Flow.Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
122+
FlowAdapters.toPublisher(this).awaitSingleOrElse(defaultValue)

reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ class IntegrationTest(
4949
assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
5050
assertFailsWith<NoSuchElementException> { pub.awaitLast() }
5151
assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
52+
assertEquals("OK", pub.awaitSingleOrDefault("OK"))
53+
assertNull(pub.awaitSingleOrNull())
54+
assertEquals("ELSE", pub.awaitSingleOrElse { "ELSE" })
5255
var cnt = 0
5356
pub.collect { cnt++ }
5457
assertEquals(0, cnt)
@@ -66,6 +69,9 @@ class IntegrationTest(
6669
assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
6770
assertEquals("OK", pub.awaitLast())
6871
assertEquals("OK", pub.awaitSingle())
72+
assertEquals("OK", pub.awaitSingleOrDefault("!"))
73+
assertEquals("OK", pub.awaitSingleOrNull())
74+
assertEquals("OK", pub.awaitSingleOrElse { "ELSE" })
6975
var cnt = 0
7076
pub.collect {
7177
assertEquals("OK", it)
@@ -85,10 +91,13 @@ class IntegrationTest(
8591
}
8692
assertEquals(1, pub.awaitFirst())
8793
assertEquals(1, pub.awaitFirstOrDefault(0))
88-
assertEquals(n, pub.awaitLast())
8994
assertEquals(1, pub.awaitFirstOrNull())
9095
assertEquals(1, pub.awaitFirstOrElse { 0 })
96+
assertEquals(null, pub.awaitSingleOrNull())
97+
assertEquals(n, pub.awaitLast())
9198
assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
99+
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
100+
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
92101
checkNumbers(n, pub)
93102
val flow = pub.asFlow()
94103
checkNumbers(n, flow.flowOn(ctx(coroutineContext)).asPublisher())
@@ -107,7 +116,7 @@ class IntegrationTest(
107116
}
108117

109118
@Test
110-
fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException}) {
119+
fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
111120
expect(1)
112121
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
113122
flowPublish<String> {

0 commit comments

Comments
 (0)