Skip to content

Commit 4c5e3af

Browse files
committed
Add support for ContextView in Reactor integration
1 parent 5cbef7b commit 4c5e3af

File tree

4 files changed

+25
-6
lines changed

4 files changed

+25
-6
lines changed

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ public final class kotlinx/coroutines/reactor/MonoKt {
3333
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
3434
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
3535
public fun <init> (Lreactor/util/context/Context;)V
36+
public fun <init> (Lreactor/util/context/ContextView;)V
3637
public final fun getContext ()Lreactor/util/context/Context;
3738
}
3839

3940
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
4041
}
4142

4243
public final class kotlinx/coroutines/reactor/ReactorContextKt {
43-
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
44+
public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
45+
public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext;
4446
}
4547

4648
public final class kotlinx/coroutines/reactor/ReactorFlowKt {

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private fun <T> reactorPublish(
5050
return@onSubscribe
5151
}
5252
val currentContext = subscriber.currentContext()
53-
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
53+
val reactorContext = context.extendReactorContext(currentContext)
5454
val newContext = scope.newCoroutineContext(context + reactorContext)
5555
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
5656
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private fun <T> monoInternal(
8383
context: CoroutineContext,
8484
block: suspend CoroutineScope.() -> T?
8585
): Mono<T> = Mono.create { sink ->
86-
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
86+
val reactorContext = context.extendReactorContext(sink.currentContext())
8787
val newContext = scope.newCoroutineContext(context + reactorContext)
8888
val coroutine = MonoCoroutine(newContext, sink)
8989
sink.onDispose(coroutine)

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+20-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.ExperimentalCoroutinesApi
8-
import reactor.util.context.Context
98
import kotlin.coroutines.*
109
import kotlinx.coroutines.reactive.*
10+
import reactor.util.context.*
1111

1212
/**
1313
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
@@ -51,12 +51,29 @@ import kotlinx.coroutines.reactive.*
5151
*/
5252
@ExperimentalCoroutinesApi
5353
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
54+
55+
public constructor(contextView: ContextView): this(Context.of(contextView))
56+
5457
public companion object Key : CoroutineContext.Key<ReactorContext>
5558
}
5659

5760
/**
58-
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
61+
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
62+
* and later used via `coroutineContext[ReactorContext]`.
63+
*/
64+
@ExperimentalCoroutinesApi
65+
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
66+
67+
/**
68+
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
5969
* and later used via `coroutineContext[ReactorContext]`.
6070
*/
6171
@ExperimentalCoroutinesApi
62-
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
72+
@Deprecated("Use the more general version for ContextView instead", level = DeprecationLevel.HIDDEN)
73+
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext()
74+
75+
/**
76+
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
77+
*/
78+
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
79+
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()

0 commit comments

Comments
 (0)