Skip to content

Commit ff06d83

Browse files
committed
Context passing between coroutines and Reactor Mono/Flux
Fixes #284
1 parent f22604b commit ff06d83

File tree

8 files changed

+147
-7
lines changed

8 files changed

+147
-7
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+17
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,20 @@ public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
3636
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
3737
}
3838

39+
public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
40+
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
41+
public fun cancel ()V
42+
public fun close (Ljava/lang/Throwable;)Z
43+
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
44+
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
45+
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
46+
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
47+
public fun isClosedForSend ()Z
48+
public fun isFull ()Z
49+
public fun offer (Ljava/lang/Object;)Z
50+
public synthetic fun onCompleted (Ljava/lang/Object;)V
51+
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
52+
public fun request (J)V
53+
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
54+
}
55+

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@ public final class kotlinx/coroutines/reactor/MonoKt {
1919
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
2020
}
2121

22+
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
23+
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
24+
public fun <init> (Lreactor/util/context/Context;)V
25+
public final fun getContext ()Lreactor/util/context/Context;
26+
}
27+
28+
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
29+
}
30+
31+
public final class kotlinx/coroutines/reactor/ReactorContextKt {
32+
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlin/coroutines/CoroutineContext;
33+
}
34+
2235
public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
2336
public fun <init> (Lreactor/core/scheduler/Scheduler;)V
2437
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
7676
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
7777

7878
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
79-
private class PublisherCoroutine<in T>(
79+
@InternalCoroutinesApi
80+
public class PublisherCoroutine<in T>(
8081
parentContext: CoroutineContext,
8182
private val subscriber: Subscriber<T>
8283
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {

reactive/kotlinx-coroutines-reactor/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,12 @@ tasks.withType(dokka.getClass()) {
1212
url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
1313
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
1414
}
15+
}
16+
17+
compileTestKotlin {
18+
kotlinOptions.jvmTarget = "1.8"
19+
}
20+
21+
compileKotlin {
22+
kotlinOptions.jvmTarget = "1.8"
1523
}

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+22-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/*
23
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
34
*/
@@ -9,6 +10,8 @@ package kotlinx.coroutines.reactor
910
import kotlinx.coroutines.*
1011
import kotlinx.coroutines.channels.*
1112
import kotlinx.coroutines.reactive.*
13+
import org.reactivestreams.Publisher
14+
import reactor.core.CoreSubscriber
1215
import reactor.core.publisher.*
1316
import kotlin.coroutines.*
1417
import kotlin.internal.LowPriorityInOverloadResolution
@@ -41,8 +44,8 @@ public fun <T> flux(
4144
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
4245
): Flux<T> {
4346
require(context[Job] === null) { "Flux context cannot contain job in it." +
44-
"Its lifecycle should be managed via Disposable handle. Had $context" }
45-
return Flux.from(publishInternal(GlobalScope, context, block))
47+
"Its lifecycle should be managed via Disposable handle. Had $context" }
48+
return Flux.from(reactorPublish(GlobalScope, context, block))
4649
}
4750

4851
@Deprecated(
@@ -55,4 +58,20 @@ public fun <T> CoroutineScope.flux(
5558
context: CoroutineContext = EmptyCoroutineContext,
5659
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
5760
): Flux<T> =
58-
Flux.from(publishInternal(this, context, block))
61+
Flux.from(reactorPublish(this, context, block))
62+
63+
private fun <T> reactorPublish(
64+
scope: CoroutineScope,
65+
context: CoroutineContext = EmptyCoroutineContext,
66+
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
67+
): Publisher<T> = Publisher { subscriber ->
68+
// specification requires NPE on null subscriber
69+
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
70+
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
71+
val currentContext = subscriber.currentContext()
72+
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
73+
val newContext = scope.newCoroutineContext(context + reactorContext)
74+
val coroutine = PublisherCoroutine(newContext, subscriber)
75+
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
76+
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
77+
}

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ private fun <T> monoInternal(
5353
context: CoroutineContext,
5454
block: suspend CoroutineScope.() -> T?
5555
): Mono<T> = Mono.create { sink ->
56-
val newContext = scope.newCoroutineContext(context)
56+
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
57+
val newContext = scope.newCoroutineContext(context + reactorContext)
5758
val coroutine = MonoCoroutine(newContext, sink)
5859
sink.onDispose(coroutine)
5960
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -78,12 +79,11 @@ private class MonoCoroutine<in T>(
7879
handleCoroutineException(context, cause)
7980
}
8081
}
81-
82+
8283
override fun dispose() {
8384
disposed = true
8485
cancel()
8586
}
8687

8788
override fun isDisposed(): Boolean = disposed
8889
}
89-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.ExperimentalCoroutinesApi
4+
import reactor.util.context.Context
5+
import kotlin.coroutines.*
6+
7+
/**
8+
* Marks coroutine context element that contains Reactor's [Context] elements in [context] for seamless integration
9+
* between [CoroutineContext] and Reactor's [Context].
10+
*
11+
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
12+
*
13+
* Reactor builders: [mono], [flux] can extract the reactor context from their coroutine context and
14+
* pass it on. Modifications of reactor context can be retrieved by `coroutineContext[ReactorContext]`.
15+
*
16+
* Example usage:
17+
*
18+
* Passing reactor context from coroutine builder to reactor entity:
19+
*
20+
* ```
21+
* launch(Context.of("key", "value").asCoroutineContext()) {
22+
* mono {
23+
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
24+
* }.subscribe()
25+
* }
26+
* ```
27+
*
28+
* Accessing modified reactor context enriched from downstream via coroutine context:
29+
*
30+
* ```
31+
* launch {
32+
* mono {
33+
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
34+
* }.subscriberContext(Context.of("key", "value"))
35+
* .subscribe()
36+
* }
37+
* ```
38+
*/
39+
@ExperimentalCoroutinesApi
40+
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
41+
companion object Key : CoroutineContext.Key<ReactorContext>
42+
}
43+
44+
45+
/**
46+
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
47+
* and later retrieved via `coroutineContext[ReactorContext]`.
48+
*/
49+
@ExperimentalCoroutinesApi
50+
public fun Context.asCoroutineContext(): CoroutineContext = ReactorContext(this)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.reactive.*
5+
import org.junit.Test
6+
import reactor.util.context.Context
7+
import kotlin.test.assertEquals
8+
9+
class ReactorContextTest {
10+
@Test
11+
fun testMonoHookedContext() = runBlocking {
12+
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
13+
val ctx = coroutineContext[ReactorContext]?.context
14+
buildString {
15+
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
16+
}
17+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
18+
.subscriberContext { ctx -> ctx.put(6, "6") }
19+
assertEquals(mono.awaitFirst(), "1234567")
20+
}
21+
22+
@Test
23+
fun testFluxContext() = runBlocking<Unit> {
24+
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
25+
val ctx = coroutineContext[ReactorContext]!!.context
26+
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
27+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
28+
.subscriberContext { ctx -> ctx.put(6, "6") }
29+
var i = 0
30+
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
31+
}
32+
}

0 commit comments

Comments
 (0)