Skip to content

Fixes for the Reactor integration #2622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ public final class kotlinx/coroutines/reactor/MonoKt {
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
public fun <init> (Lreactor/util/context/Context;)V
public fun <init> (Lreactor/util/context/ContextView;)V
public final fun getContext ()Lreactor/util/context/Context;
}

public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext;
}

public final class kotlinx/coroutines/reactor/ReactorFlowKt {
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/src/Convert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
/**
* Converts a stream of elements received from the channel to the hot reactive flux.
*
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in round-robin way.
* Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in a round-robin way.
* @param context -- the coroutine context from which the resulting flux is going to be signalled
*/
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
Expand Down
46 changes: 32 additions & 14 deletions reactive/kotlinx-coroutines-reactor/src/Flux.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ import reactor.util.context.*
import kotlin.coroutines.*

/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete])
* when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
* emits the error ([Subscriber.onError]) and closes the channel with the cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi
public fun <T> flux(
Expand All @@ -43,12 +43,13 @@ private fun <T> reactorPublish(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
if (subscriber !is CoreSubscriber) {
subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
return@onSubscribe
}
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val reactorContext = context.extendReactorContext(currentContext)
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
Expand All @@ -66,6 +67,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct
}
}

/** The proper way to reject the subscriber, according to
* [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
*/
private fun <T> Subscriber<T>?.reject(t: Throwable) {
if (this == null)
throw NullPointerException("The subscriber can not be null")
onSubscribe(object: Subscription {
override fun request(n: Long) {
// intentionally left blank
}
override fun cancel() {
// intentionally left blank
}
})
onError(t)
}

@Deprecated(
message = "CoroutineScope.flux is deprecated in favour of top-level flux",
level = DeprecationLevel.HIDDEN,
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private fun <T> monoInternal(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
Expand Down
40 changes: 30 additions & 10 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*
import reactor.util.context.*

/**
* Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
* Coroutine context element that propagates information about Reactor's [Context] through coroutines.
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
* Reactor and kotlinx.coroutines.
* [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
* which can be used to propagate the information about Reactor's [Context] through coroutines.
*
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst])
* also propagate the [ReactorContext] to the subscriber's [Context].
* This context element is implicitly propagated through subscribers' context by all Reactive integrations,
* such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to a reactive stream
* (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
* to the subscriber's [Context].
**
* ### Examples of Reactive context integration.
*
Expand Down Expand Up @@ -49,12 +51,30 @@ import kotlinx.coroutines.reactive.*
*/
@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {

// `Context.of` is zero-cost if the argument is a `Context`
public constructor(contextView: ContextView): this(Context.of(contextView))

public companion object Key : CoroutineContext.Key<ReactorContext>
}

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.

/**
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()