From 2ed7a1dea0912f1fedc17ee3c6999eb8201f1911 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 30 Mar 2021 16:05:26 +0300 Subject: [PATCH 1/3] Breaking change: make `publish {...}` require Judging by https://github.com/ReactiveX/RxJava/issues/4644, the requirement 2.13 from the reactive spec (https://github.com/reactive-streams/reactive-streams-jvm#2.13) does mean that null values can't be passed to `onNext`. Thus, it is incorrect for `publish {...}` to accept nullable values. --- reactive/kotlinx-coroutines-jdk9/src/Publish.kt | 2 +- reactive/kotlinx-coroutines-reactive/src/Convert.kt | 12 ++++++++---- reactive/kotlinx-coroutines-reactive/src/Publish.kt | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt index 3db0d5da4c..0753adf39e 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt @@ -29,7 +29,7 @@ import org.reactivestreams.FlowAdapters * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi // Since 1.3.x -public fun flowPublish( +public fun flowPublish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flow.Publisher { diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt index 8f4b26d377..9836307180 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.reactive +import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.reactivestreams.* import kotlin.coroutines.* @@ -18,7 +19,10 @@ import kotlin.coroutines.* @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.WARNING, // Error in 1.4 replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()")) -public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = publish(context) { - for (t in this@asPublisher) - send(t) -} +public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = + // we call the deprecated version here because the non-deprecated one requires now + @Suppress("DEPRECATION_ERROR") + GlobalScope.publish(context) { + for (t in this@asPublisher) + send(t) + } diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 6bb02ef192..1a8fb579a7 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -34,7 +34,7 @@ import kotlin.internal.* * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi -public fun publish( +public fun publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Publisher { From d36568d4236cb28490f0c3aaf9fac451e144d30e Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 6 Apr 2021 10:39:07 +0300 Subject: [PATCH 2/3] Add the missing boundaries to the Reactor integration `flux` and `mono` now have . `flux`, because it's illegal to emit `null` values, and `mono`, because it can be confusing for the user to be able to write `mono` to create a `Mono` that emits either `String` or no value at all. --- reactive/kotlinx-coroutines-reactor/src/Convert.kt | 13 ++++++++----- reactive/kotlinx-coroutines-reactor/src/Flux.kt | 2 +- reactive/kotlinx-coroutines-reactor/src/Mono.kt | 2 +- .../kotlinx-coroutines-reactor/src/ReactorFlow.kt | 2 +- .../test/FluxSingleTest.kt | 4 ++-- .../test/ReactorContextTest.kt | 2 +- 6 files changed, 14 insertions(+), 11 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index 7807549ce1..fbc7290bab 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -36,7 +36,7 @@ public fun Job.asMono(context: CoroutineContext): Mono = mono(context) { t * @param context -- the coroutine context from which the resulting mono is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } +public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } /** * Converts a stream of elements received from the channel to the hot reactive flux. @@ -48,7 +48,10 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()")) -public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = flux(context) { - for (t in this@asFlux) - send(t) -} +public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = + // use the deprecated version of `flux` here because the proper one has the boundary now + @Suppress("DEPRECATION_ERROR") + GlobalScope.flux(context) { + for (t in this@asFlux) + send(t) + } diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 8d4f9cc969..f75a8c2614 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -34,7 +34,7 @@ import kotlin.internal.* * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi -public fun flux( +public fun flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flux { diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index e146dca9f6..8eb766b8d2 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -23,7 +23,7 @@ import kotlin.internal.* * * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun mono( +public fun mono( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index 0fc743f937..b2c793d456 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -25,7 +25,7 @@ import kotlin.coroutines.* * is used, so calls are performed from an arbitrary thread. */ @JvmOverloads // binary compatibility -public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = +public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = FlowAsFlux(this, Dispatchers.Unconfined + context) private class FlowAsFlux( diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index cc336ba6b5..f188041c61 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -92,7 +92,7 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitSingleOrNull() { - val flux = flux { + val flux = flux { send(Flux.empty().awaitSingleOrNull() ?: "OK") } @@ -169,7 +169,7 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val flux = flux { + val flux = flux { send(Flux.empty().awaitFirstOrNull() ?: "OK") } diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index aff29241c9..f0a7167f26 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -74,7 +74,7 @@ class ReactorContextTest : TestBase() { } - private fun createFlux(): Flux = flux { + private fun createFlux(): Flux = flux { val ctx = reactorContext() (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } } From 27c878ad3068f0b5f40a5b14ef6f64cfd9d653a3 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 6 Apr 2021 11:16:14 +0300 Subject: [PATCH 3/3] Add the type boundary for rxMaybe This does not affect the range of representable programs, but, without the type boundary, the users could be confused into thinking that they could emit `null` as a value. --- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 2 +- reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt | 2 +- reactive/kotlinx-coroutines-rx3/src/RxConvert.kt | 2 +- reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942ac..9cd655297a 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -44,7 +44,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index c4ca54cf48..420647e8a4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -20,7 +20,7 @@ import kotlin.internal.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun rxMaybe( +public fun rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Maybe { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 63e30f2617..c6a5c58b4a 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -44,7 +44,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index 1c10266470..56fc77dcfb 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -17,7 +17,7 @@ import kotlin.coroutines.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -public fun rxMaybe( +public fun rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Maybe {