diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt index 3db0d5da4c..0753adf39e 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt @@ -29,7 +29,7 @@ import org.reactivestreams.FlowAdapters * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi // Since 1.3.x -public fun flowPublish( +public fun flowPublish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flow.Publisher { diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt index 8f4b26d377..9836307180 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.reactive +import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.reactivestreams.* import kotlin.coroutines.* @@ -18,7 +19,10 @@ import kotlin.coroutines.* @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.WARNING, // Error in 1.4 replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()")) -public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = publish(context) { - for (t in this@asPublisher) - send(t) -} +public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = + // we call the deprecated version here because the non-deprecated one requires now + @Suppress("DEPRECATION_ERROR") + GlobalScope.publish(context) { + for (t in this@asPublisher) + send(t) + } diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 6bb02ef192..1a8fb579a7 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -34,7 +34,7 @@ import kotlin.internal.* * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi -public fun publish( +public fun publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Publisher { diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index 7807549ce1..fbc7290bab 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -36,7 +36,7 @@ public fun Job.asMono(context: CoroutineContext): Mono = mono(context) { t * @param context -- the coroutine context from which the resulting mono is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } +public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } /** * Converts a stream of elements received from the channel to the hot reactive flux. @@ -48,7 +48,10 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()")) -public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = flux(context) { - for (t in this@asFlux) - send(t) -} +public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = + // use the deprecated version of `flux` here because the proper one has the boundary now + @Suppress("DEPRECATION_ERROR") + GlobalScope.flux(context) { + for (t in this@asFlux) + send(t) + } diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 8d4f9cc969..f75a8c2614 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -34,7 +34,7 @@ import kotlin.internal.* * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi -public fun flux( +public fun flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flux { diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index e146dca9f6..8eb766b8d2 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -23,7 +23,7 @@ import kotlin.internal.* * * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun mono( +public fun mono( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index 0fc743f937..b2c793d456 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -25,7 +25,7 @@ import kotlin.coroutines.* * is used, so calls are performed from an arbitrary thread. */ @JvmOverloads // binary compatibility -public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = +public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = FlowAsFlux(this, Dispatchers.Unconfined + context) private class FlowAsFlux( diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index cc336ba6b5..f188041c61 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -92,7 +92,7 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitSingleOrNull() { - val flux = flux { + val flux = flux { send(Flux.empty().awaitSingleOrNull() ?: "OK") } @@ -169,7 +169,7 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val flux = flux { + val flux = flux { send(Flux.empty().awaitFirstOrNull() ?: "OK") } diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index aff29241c9..f0a7167f26 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -74,7 +74,7 @@ class ReactorContextTest : TestBase() { } - private fun createFlux(): Flux = flux { + private fun createFlux(): Flux = flux { val ctx = reactorContext() (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942ac..9cd655297a 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -44,7 +44,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index c4ca54cf48..420647e8a4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -20,7 +20,7 @@ import kotlin.internal.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun rxMaybe( +public fun rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Maybe { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 63e30f2617..c6a5c58b4a 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -44,7 +44,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index 1c10266470..56fc77dcfb 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -17,7 +17,7 @@ import kotlin.coroutines.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun rxMaybe( +public fun rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Maybe {