Skip to content

Commit cd0d1e0

Browse files
committed
Add support for ContextView in Reactor integration
1 parent f1e457f commit cd0d1e0

File tree

5 files changed

+26
-7
lines changed

5 files changed

+26
-7
lines changed

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ public final class kotlinx/coroutines/reactor/MonoKt {
2626
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
2727
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
2828
public fun <init> (Lreactor/util/context/Context;)V
29+
public fun <init> (Lreactor/util/context/ContextView;)V
2930
public final fun getContext ()Lreactor/util/context/Context;
3031
}
3132

3233
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
3334
}
3435

3536
public final class kotlinx/coroutines/reactor/ReactorContextKt {
36-
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
37+
public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
38+
public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext;
3739
}
3840

3941
public final class kotlinx/coroutines/reactor/ReactorFlowKt {

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private fun <T> reactorPublish(
6666
return@onSubscribe
6767
}
6868
val currentContext = subscriber.currentContext()
69-
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
69+
val reactorContext = context.extendReactorContext(currentContext)
7070
val newContext = scope.newCoroutineContext(context + reactorContext)
7171
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
7272
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private fun <T> monoInternal(
4848
context: CoroutineContext,
4949
block: suspend CoroutineScope.() -> T?
5050
): Mono<T> = Mono.create { sink ->
51-
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
51+
val reactorContext = context.extendReactorContext(sink.currentContext())
5252
val newContext = scope.newCoroutineContext(context + reactorContext)
5353
val coroutine = MonoCoroutine(newContext, sink)
5454
sink.onDispose(coroutine)

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+20-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.ExperimentalCoroutinesApi
8-
import reactor.util.context.Context
98
import kotlin.coroutines.*
109
import kotlinx.coroutines.reactive.*
10+
import reactor.util.context.*
1111

1212
/**
1313
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
@@ -50,12 +50,29 @@ import kotlinx.coroutines.reactive.*
5050
*/
5151
@ExperimentalCoroutinesApi
5252
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
53+
54+
public constructor(contextView: ContextView): this(Context.of(contextView))
55+
5356
public companion object Key : CoroutineContext.Key<ReactorContext>
5457
}
5558

5659
/**
57-
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
60+
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
61+
* and later used via `coroutineContext[ReactorContext]`.
62+
*/
63+
@ExperimentalCoroutinesApi
64+
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
65+
66+
/**
67+
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
5868
* and later used via `coroutineContext[ReactorContext]`.
5969
*/
6070
@ExperimentalCoroutinesApi
61-
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
71+
@Deprecated("Use the more general version for ContextView instead", level = DeprecationLevel.HIDDEN)
72+
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext()
73+
74+
/**
75+
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
76+
*/
77+
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
78+
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class ReactorContextTest : TestBase() {
5858
}
5959

6060
@Test
61-
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
61+
fun testFluxAwaitContextPropagation() = runBlocking(
6262
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
6363
) {
6464
assertEquals(createFlux().awaitFirst(), "1")

0 commit comments

Comments
 (0)