Skip to content

Commit a44037a

Browse files
committed
Fixes
1 parent e551129 commit a44037a

File tree

2 files changed

+4
-19
lines changed

2 files changed

+4
-19
lines changed

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+3-18
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,23 @@
77
package kotlinx.coroutines.reactor
88

99
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.reactive.*
1011
import org.reactivestreams.*
1112
import reactor.core.*
1213
import reactor.core.publisher.*
1314
import kotlin.coroutines.*
1415
import kotlin.internal.*
1516

1617
/**
17-
* Creates cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
18+
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
1819
* Every time the returned mono is subscribed, it starts a new coroutine.
1920
* If the result of [block] is `null`, [MonoSink.success] is invoked without a value.
2021
* Unsubscribing cancels the running coroutine.
2122
*
2223
* Coroutine context can be specified with [context] argument.
2324
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
2425
*
25-
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
26+
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
2627
*/
2728
public fun <T> mono(
2829
context: CoroutineContext = EmptyCoroutineContext,
@@ -33,22 +34,6 @@ public fun <T> mono(
3334
return monoInternal(GlobalScope, context, block)
3435
}
3536

36-
@Suppress("UNCHECKED_CAST")
37-
internal suspend fun <T> Mono<T>.await(): T? = (this as Mono<T?>).awaitOrDefault(null)
38-
39-
internal suspend fun <T> Mono<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
40-
subscribe(object: CoreSubscriber<T> {
41-
override fun onSubscribe(s: Subscription) {
42-
cont.invokeOnCancellation { s.cancel() }
43-
s.request(1)
44-
}
45-
override fun onNext(t: T) { cont.resume(t) }
46-
override fun onError(t: Throwable) { cont.resumeWithException(t) }
47-
override fun onComplete() { cont.resume(default) }
48-
49-
})
50-
}
51-
5237
@Deprecated(
5338
message = "CoroutineScope.mono is deprecated in favour of top-level mono",
5439
level = DeprecationLevel.ERROR,

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ class MonoTest : TestBase() {
333333
newSingleThreadContext("testDownstreamCancellationDoesNotThrow").use { pool ->
334334
val job = launch(pool, start = CoroutineStart.UNDISPATCHED) {
335335
expect(1)
336-
mono.await()
336+
mono.awaitFirstOrNull()
337337
}
338338
launch(pool) {
339339
job.cancelAndJoin()

0 commit comments

Comments
 (0)