From 40de24a29857e2dfd812c765a4aee8bc6e237f07 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 2 Apr 2021 12:16:32 +0300 Subject: [PATCH 1/4] Reword docs for the Reactor integration --- .../kotlinx-coroutines-reactor/src/Convert.kt | 4 ++-- .../kotlinx-coroutines-reactor/src/Flux.kt | 19 ++++++++++--------- .../src/ReactorContext.kt | 16 +++++++++------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index 3b08bd6639..002baa6185 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -41,8 +41,8 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co /** * Converts a stream of elements received from the channel to the hot reactive flux. * - * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, - * they'll receive values in round-robin way. + * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers, + * they'll receive values in a round-robin way. * @param context -- the coroutine context from which the resulting flux is going to be signalled */ @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 806f5bd5bc..fb46aadede 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -11,23 +11,24 @@ import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import reactor.util.context.* +import java.lang.IllegalArgumentException import kotlin.coroutines.* /** - * Creates cold reactive [Flux] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flux] that runs a given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * Coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) + * when the coroutine completes, or the channel is explicitly closed and emits an error ([Subscriber.onError]) + * if the coroutine throws an exception or closes the channel with a cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. - * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ @ExperimentalCoroutinesApi public fun flux( diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 333f056d97..08a02fdd57 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -10,14 +10,16 @@ import kotlin.coroutines.* import kotlinx.coroutines.reactive.* /** - * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines. - * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext]. - * Coroutine context element that propagates information about Reactor's [Context] through coroutines. + * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between + * Reactor and kotlinx.coroutines. + * [Context.asCoroutineContext] is defined to place Reactor's [Context] elements into a [CoroutineContext], + * which can be used to propagate the information about Reactor's [Context] through coroutines. * - * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], - * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]) - * also propagate the [ReactorContext] to the subscriber's [Context]. + * This context element is implicitly propagated through subscriber's context by all Reactive integrations, + * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. + * Functions that subscribe to the reactive stream + * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext] + * to the subscriber's [Context]. ** * ### Examples of Reactive context integration. * From 5cbef7b67aed7b6260b35833cb7d1fb0a96e219c Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 2 Apr 2021 12:17:40 +0300 Subject: [PATCH 2/4] Fix reactorPublish's incompatibility with the RS spec --- .../kotlinx-coroutines-reactor/src/Flux.kt | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index fb46aadede..12b5f7110c 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -44,10 +44,11 @@ private fun reactorPublish( scope: CoroutineScope, context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Publisher = Publisher { subscriber -> - // specification requires NPE on null subscriber - if (subscriber == null) throw NullPointerException("Subscriber cannot be null") - require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." } +): Publisher = Publisher onSubscribe@{ subscriber: Subscriber? -> + if (subscriber !is CoreSubscriber) { + subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted.")) + return@onSubscribe + } val currentContext = subscriber.currentContext() val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() val newContext = scope.newCoroutineContext(context + reactorContext) @@ -67,6 +68,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct } } +/** The proper way to reject the subscriber, according to + * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9) + */ +private fun Subscriber?.reject(t: Throwable) { + if (this == null) + throw NullPointerException("The subscriber can not be null") + onSubscribe(object: Subscription { + override fun request(n: Long) { + // intentionally left blank + } + override fun cancel() { + // intentionally left blank + } + }) + onError(t) +} + @Deprecated( message = "CoroutineScope.flux is deprecated in favour of top-level flux", level = DeprecationLevel.HIDDEN, From 4c5e3af10a64b49a63f729813af2e4c7d66305cf Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 2 Apr 2021 13:08:27 +0300 Subject: [PATCH 3/4] Add support for ContextView in Reactor integration --- .../api/kotlinx-coroutines-reactor.api | 4 +++- .../kotlinx-coroutines-reactor/src/Flux.kt | 2 +- .../kotlinx-coroutines-reactor/src/Mono.kt | 2 +- .../src/ReactorContext.kt | 23 ++++++++++++++++--- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 0a10aa12a9..b69bb334d7 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -33,6 +33,7 @@ public final class kotlinx/coroutines/reactor/MonoKt { public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement { public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key; public fun (Lreactor/util/context/Context;)V + public fun (Lreactor/util/context/ContextView;)V public final fun getContext ()Lreactor/util/context/Context; } @@ -40,7 +41,8 @@ public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/corout } public final class kotlinx/coroutines/reactor/ReactorContextKt { - public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; + public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; + public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext; } public final class kotlinx/coroutines/reactor/ReactorFlowKt { diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 12b5f7110c..f8d76918ca 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -50,7 +50,7 @@ private fun reactorPublish( return@onSubscribe } val currentContext = subscriber.currentContext() - val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() + val reactorContext = context.extendReactorContext(currentContext) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 6a4a38f379..67c1baa02d 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -83,7 +83,7 @@ private fun monoInternal( context: CoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono = Mono.create { sink -> - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext() + val reactorContext = context.extendReactorContext(sink.currentContext()) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 08a02fdd57..8d4cee401d 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -5,9 +5,9 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.ExperimentalCoroutinesApi -import reactor.util.context.Context import kotlin.coroutines.* import kotlinx.coroutines.reactive.* +import reactor.util.context.* /** * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between @@ -51,12 +51,29 @@ import kotlinx.coroutines.reactive.* */ @ExperimentalCoroutinesApi public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { + + public constructor(contextView: ContextView): this(Context.of(contextView)) + public companion object Key : CoroutineContext.Key } /** - * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context + * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context + * and later used via `coroutineContext[ReactorContext]`. + */ +@ExperimentalCoroutinesApi +public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this) + +/** + * Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context * and later used via `coroutineContext[ReactorContext]`. */ @ExperimentalCoroutinesApi -public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this) +@Deprecated("Use the more general version for ContextView instead", level = DeprecationLevel.HIDDEN) +public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() + +/** + * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values. + */ +internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext = + (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext() \ No newline at end of file From 1d4e7e4b496886dc937184dfe4b07ac079918e9b Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 21 Apr 2021 10:43:22 +0300 Subject: [PATCH 4/4] Small fixes --- reactive/kotlinx-coroutines-reactor/src/Flux.kt | 9 ++++----- .../kotlinx-coroutines-reactor/src/ReactorContext.kt | 11 ++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index f8d76918ca..df5f64f262 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -11,15 +11,14 @@ import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import reactor.util.context.* -import java.lang.IllegalArgumentException import kotlin.coroutines.* /** - * Creates a cold reactive [Flux] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flux] that runs the given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) - * when the coroutine completes, or the channel is explicitly closed and emits an error ([Subscriber.onError]) - * if the coroutine throws an exception or closes the channel with a cause. + * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) + * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, + * emits the error ([Subscriber.onError]) and closes the channel with the cause. * Unsubscribing cancels the running coroutine. * * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 8d4cee401d..8969662adc 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -12,12 +12,12 @@ import reactor.util.context.* /** * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between * Reactor and kotlinx.coroutines. - * [Context.asCoroutineContext] is defined to place Reactor's [Context] elements into a [CoroutineContext], + * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext], * which can be used to propagate the information about Reactor's [Context] through coroutines. * - * This context element is implicitly propagated through subscriber's context by all Reactive integrations, + * This context element is implicitly propagated through subscribers' context by all Reactive integrations, * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream + * Functions that subscribe to a reactive stream * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext] * to the subscriber's [Context]. ** @@ -52,6 +52,7 @@ import reactor.util.context.* @ExperimentalCoroutinesApi public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { + // `Context.of` is zero-cost if the argument is a `Context` public constructor(contextView: ContextView): this(Context.of(contextView)) public companion object Key : CoroutineContext.Key @@ -69,8 +70,8 @@ public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(thi * and later used via `coroutineContext[ReactorContext]`. */ @ExperimentalCoroutinesApi -@Deprecated("Use the more general version for ContextView instead", level = DeprecationLevel.HIDDEN) -public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() +@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN) +public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost. /** * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.