Skip to content

Commit 693142c

Browse files
SokolovaMariaqwwdfsad
authored andcommitted
Context passing between coroutines and Reactor Mono/Flux (#1138)
Fixes #284
1 parent 63b6e27 commit 693142c

File tree

8 files changed

+141
-7
lines changed

8 files changed

+141
-7
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+17
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,23 @@ public final class kotlinx/coroutines/reactive/PublishKt {
2727
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
2828
}
2929

30+
public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
31+
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
32+
public fun cancel ()V
33+
public fun close (Ljava/lang/Throwable;)Z
34+
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
35+
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
36+
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
37+
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
38+
public fun isClosedForSend ()Z
39+
public fun isFull ()Z
40+
public fun offer (Ljava/lang/Object;)Z
41+
public synthetic fun onCompleted (Ljava/lang/Object;)V
42+
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
43+
public fun request (J)V
44+
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
45+
}
46+
3047
public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
3148
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
3249
}

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@ public final class kotlinx/coroutines/reactor/MonoKt {
1919
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
2020
}
2121

22+
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
23+
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
24+
public fun <init> (Lreactor/util/context/Context;)V
25+
public final fun getContext ()Lreactor/util/context/Context;
26+
}
27+
28+
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
29+
}
30+
31+
public final class kotlinx/coroutines/reactor/ReactorContextKt {
32+
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
33+
}
34+
2235
public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
2336
public fun <init> (Lreactor/core/scheduler/Scheduler;)V
2437
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
7676
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
7777

7878
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
79-
private class PublisherCoroutine<in T>(
79+
@InternalCoroutinesApi
80+
public class PublisherCoroutine<in T>(
8081
parentContext: CoroutineContext,
8182
private val subscriber: Subscriber<T>
8283
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {

reactive/kotlinx-coroutines-reactor/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,12 @@ tasks.withType(dokka.getClass()) {
1212
url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
1313
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
1414
}
15+
}
16+
17+
compileTestKotlin {
18+
kotlinOptions.jvmTarget = "1.8"
19+
}
20+
21+
compileKotlin {
22+
kotlinOptions.jvmTarget = "1.8"
1523
}

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+22-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/*
23
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
34
*/
@@ -9,6 +10,8 @@ package kotlinx.coroutines.reactor
910
import kotlinx.coroutines.*
1011
import kotlinx.coroutines.channels.*
1112
import kotlinx.coroutines.reactive.*
13+
import org.reactivestreams.Publisher
14+
import reactor.core.CoreSubscriber
1215
import reactor.core.publisher.*
1316
import kotlin.coroutines.*
1417
import kotlin.internal.LowPriorityInOverloadResolution
@@ -41,8 +44,8 @@ public fun <T> flux(
4144
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
4245
): Flux<T> {
4346
require(context[Job] === null) { "Flux context cannot contain job in it." +
44-
"Its lifecycle should be managed via Disposable handle. Had $context" }
45-
return Flux.from(publishInternal(GlobalScope, context, block))
47+
"Its lifecycle should be managed via Disposable handle. Had $context" }
48+
return Flux.from(reactorPublish(GlobalScope, context, block))
4649
}
4750

4851
@Deprecated(
@@ -55,4 +58,20 @@ public fun <T> CoroutineScope.flux(
5558
context: CoroutineContext = EmptyCoroutineContext,
5659
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
5760
): Flux<T> =
58-
Flux.from(publishInternal(this, context, block))
61+
Flux.from(reactorPublish(this, context, block))
62+
63+
private fun <T> reactorPublish(
64+
scope: CoroutineScope,
65+
context: CoroutineContext = EmptyCoroutineContext,
66+
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
67+
): Publisher<T> = Publisher { subscriber ->
68+
// specification requires NPE on null subscriber
69+
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
70+
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
71+
val currentContext = subscriber.currentContext()
72+
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
73+
val newContext = scope.newCoroutineContext(context + reactorContext)
74+
val coroutine = PublisherCoroutine(newContext, subscriber)
75+
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
76+
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
77+
}

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ private fun <T> monoInternal(
5353
context: CoroutineContext,
5454
block: suspend CoroutineScope.() -> T?
5555
): Mono<T> = Mono.create { sink ->
56-
val newContext = scope.newCoroutineContext(context)
56+
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
57+
val newContext = scope.newCoroutineContext(context + reactorContext)
5758
val coroutine = MonoCoroutine(newContext, sink)
5859
sink.onDispose(coroutine)
5960
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -78,12 +79,11 @@ private class MonoCoroutine<in T>(
7879
handleCoroutineException(context, cause)
7980
}
8081
}
81-
82+
8283
override fun dispose() {
8384
disposed = true
8485
cancel()
8586
}
8687

8788
override fun isDisposed(): Boolean = disposed
8889
}
89-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.ExperimentalCoroutinesApi
4+
import reactor.util.context.Context
5+
import kotlin.coroutines.*
6+
7+
/**
8+
* Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
9+
*
10+
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
11+
*
12+
* Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`.
13+
*
14+
* ### Usages
15+
* Passing reactor context from coroutine builder to reactor entity:
16+
* ```
17+
* launch(Context.of("key", "value").asCoroutineContext()) {
18+
* mono {
19+
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
20+
* }.subscribe()
21+
* }
22+
* ```
23+
*
24+
* Accessing modified reactor context enriched from the downstream:
25+
* ```
26+
* launch {
27+
* mono {
28+
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
29+
* }.subscriberContext(Context.of("key", "value"))
30+
* .subscribe()
31+
* }
32+
* ```
33+
*/
34+
@ExperimentalCoroutinesApi
35+
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
36+
companion object Key : CoroutineContext.Key<ReactorContext>
37+
}
38+
39+
/**
40+
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
41+
* and later used via `coroutineContext[ReactorContext]`.
42+
*/
43+
@ExperimentalCoroutinesApi
44+
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.reactive.*
5+
import org.junit.Test
6+
import reactor.util.context.Context
7+
import kotlin.test.assertEquals
8+
9+
class ReactorContextTest {
10+
@Test
11+
fun testMonoHookedContext() = runBlocking {
12+
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
13+
val ctx = coroutineContext[ReactorContext]?.context
14+
buildString {
15+
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
16+
}
17+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
18+
.subscriberContext { ctx -> ctx.put(6, "6") }
19+
assertEquals(mono.awaitFirst(), "1234567")
20+
}
21+
22+
@Test
23+
fun testFluxContext() = runBlocking<Unit> {
24+
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
25+
val ctx = coroutineContext[ReactorContext]!!.context
26+
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
27+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
28+
.subscriberContext { ctx -> ctx.put(6, "6") }
29+
var i = 0
30+
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
31+
}
32+
}

0 commit comments

Comments
 (0)