File tree 4 files changed +9
-4
lines changed
kotlinx-coroutines-reactor
kotlinx-coroutines-rx2/api
kotlinx-coroutines-rx3/api
4 files changed +9
-4
lines changed Original file line number Diff line number Diff line change 7
7
package kotlinx.coroutines.reactor
8
8
9
9
import kotlinx.coroutines.*
10
+ import kotlinx.coroutines.reactive.*
10
11
import org.reactivestreams.*
11
12
import reactor.core.*
12
13
import reactor.core.publisher.*
@@ -43,7 +44,7 @@ public fun <T> mono(
43
44
* function immediately cancels its [Subscription] and resumes with [CancellationException].
44
45
*/
45
46
public suspend fun <T > Mono<T>.awaitSingleOrNull (): T ? = suspendCancellableCoroutine { cont ->
46
- subscribe(object : Subscriber <T > {
47
+ injectCoroutineContext(cont.context). subscribe(object : Subscriber <T > {
47
48
private var seenValue = false
48
49
49
50
override fun onSubscribe (s : Subscription ) {
Original file line number Diff line number Diff line change @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
104
104
@Test
105
105
fun testAwaitSingleOrNullException () {
106
106
val flux = flux {
107
- send((Flux .just(" ! " , " #" ).awaitSingleOrNull() ? : " O " ) + " K" )
107
+ send((Flux .just(" O " , " #" ).awaitSingleOrNull() ? : " ! " ) + " K" )
108
108
}
109
109
110
- checkSingleValue (flux) {
111
- assertEquals( " OK " , it )
110
+ checkErroneous (flux) {
111
+ assert (it is IllegalArgumentException )
112
112
}
113
113
}
114
114
Original file line number Diff line number Diff line change @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt {
8
8
public static final fun awaitFirstOrNull (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9
9
public static final fun awaitLast (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10
10
public static final fun awaitOrDefault (Lio/reactivex/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11
+ public static final fun awaitSingle (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11
12
public static final fun awaitSingle (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13
+ public static final fun awaitSingleOrNull (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12
14
}
13
15
14
16
public final class kotlinx/coroutines/rx2/RxChannelKt {
Original file line number Diff line number Diff line change @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx3/RxAwaitKt {
8
8
public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9
9
public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10
10
public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11
+ public static final fun awaitSingle (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11
12
public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13
+ public static final fun awaitSingleOrNull (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12
14
}
13
15
14
16
public final class kotlinx/coroutines/rx3/RxChannelKt {
You can’t perform that action at this time.
0 commit comments