Skip to content

Commit 90cb996

Browse files
committed
Context passing between coroutines and Reactor Mono/Flux
Fixes #284
1 parent 8273a75 commit 90cb996

File tree

8 files changed

+168
-7
lines changed

8 files changed

+168
-7
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+17
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,20 @@ public final class kotlinx/coroutines/reactive/PublishKt {
2323
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
2424
}
2525

26+
public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
27+
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
28+
public fun cancel ()V
29+
public fun close (Ljava/lang/Throwable;)Z
30+
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
31+
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
32+
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
33+
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
34+
public fun isClosedForSend ()Z
35+
public fun isFull ()Z
36+
public fun offer (Ljava/lang/Object;)Z
37+
public synthetic fun onCompleted (Ljava/lang/Object;)V
38+
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
39+
public fun request (J)V
40+
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
41+
}
42+

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

+13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,19 @@ public final class kotlinx/coroutines/reactor/MonoKt {
1515
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
1616
}
1717

18+
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
19+
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
20+
public fun <init> (Lreactor/util/context/Context;)V
21+
public final fun getContext ()Lreactor/util/context/Context;
22+
}
23+
24+
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
25+
}
26+
27+
public final class kotlinx/coroutines/reactor/ReactorContextKt {
28+
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlin/coroutines/CoroutineContext;
29+
}
30+
1831
public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
1932
public fun <init> (Lreactor/core/scheduler/Scheduler;)V
2033
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
5454
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
5555

5656
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
57-
private class PublisherCoroutine<in T>(
57+
@InternalCoroutinesApi
58+
public class PublisherCoroutine<in T>(
5859
parentContext: CoroutineContext,
5960
private val subscriber: Subscriber<T>
6061
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {

reactive/kotlinx-coroutines-reactor/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,12 @@ tasks.withType(dokka.getClass()) {
1111
externalDocumentationLink {
1212
url = new URL('https://projectreactor.io/docs/core/3.2.5.RELEASE/api/')
1313
}
14+
}
15+
16+
compileTestKotlin {
17+
kotlinOptions.jvmTarget = "1.8"
18+
}
19+
20+
compileKotlin {
21+
kotlinOptions.jvmTarget = "1.8"
1422
}

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+21-3
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
package kotlinx.coroutines.reactor
66

7+
import kotlin.coroutines.*
78
import kotlinx.coroutines.*
89
import kotlinx.coroutines.channels.*
910
import kotlinx.coroutines.reactive.*
11+
import org.reactivestreams.*
1012
import reactor.core.publisher.*
11-
import kotlin.coroutines.*
13+
import reactor.core.*
1214

1315
/**
1416
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
@@ -32,9 +34,25 @@ import kotlin.coroutines.*
3234
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
3335
* to cancellation and error handling may change in the future.
3436
*/
37+
3538
@ExperimentalCoroutinesApi
3639
fun <T> CoroutineScope.flux(
3740
context: CoroutineContext = EmptyCoroutineContext,
3841
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
39-
): Flux<T> =
40-
Flux.from(publish(newCoroutineContext(context), block = block))
42+
): Flux<T> = Flux.from(reactorPublish(newCoroutineContext(context), block = block))
43+
44+
45+
private fun <T> CoroutineScope.reactorPublish(
46+
context: CoroutineContext = EmptyCoroutineContext,
47+
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
48+
): Publisher<T> = Publisher { subscriber ->
49+
// specification requires NPE on null subscriber
50+
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
51+
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
52+
val currentContext = subscriber.currentContext()
53+
val reactorContext = (coroutineContext[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
54+
val newContext = newCoroutineContext(context + reactorContext)
55+
val coroutine = PublisherCoroutine(newContext, subscriber)
56+
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
57+
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
58+
}

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ fun <T> CoroutineScope.mono(
3232
context: CoroutineContext = EmptyCoroutineContext,
3333
block: suspend CoroutineScope.() -> T?
3434
): Mono<T> = Mono.create { sink ->
35-
val newContext = newCoroutineContext(context)
35+
val reactorContext = (coroutineContext[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
36+
val newContext = newCoroutineContext(context + reactorContext)
3637
val coroutine = MonoCoroutine(newContext, sink)
3738
sink.onDispose(coroutine)
3839
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -53,12 +54,11 @@ private class MonoCoroutine<in T>(
5354
override fun onCompletedExceptionally(exception: Throwable) {
5455
if (!disposed) sink.error(exception)
5556
}
56-
57+
5758
override fun dispose() {
5859
disposed = true
5960
cancel()
6061
}
6162

6263
override fun isDisposed(): Boolean = disposed
6364
}
64-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.ExperimentalCoroutinesApi
4+
import reactor.util.context.Context
5+
import kotlin.coroutines.*
6+
7+
/**
8+
* Marks coroutine context element that contains Reactor's [Context] elements in [context] for seamless integration
9+
* between [CoroutineContext] and Reactor's [Context].
10+
*
11+
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
12+
*
13+
* Reactor builders: [mono], [flux] can extract the reactor context from their coroutine context and
14+
* pass it on. Modifications of reactor context can be retrieved by `coroutineContext[ReactorContext]`.
15+
*
16+
* Example usage:
17+
*
18+
* Passing reactor context from coroutine builder to reactor entity:
19+
*
20+
* ```
21+
* launch(Context.of("key", "value").asCoroutineContext()) {
22+
* mono {
23+
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
24+
* }.subscribe()
25+
* }
26+
* ```
27+
*
28+
* Accessing modified reactor context enriched from downstream via coroutine context:
29+
*
30+
* ```
31+
* launch {
32+
* mono {
33+
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
34+
* }.subscriberContext(Context.of("key", "value"))
35+
* .subscribe()
36+
* }
37+
* ```
38+
*/
39+
@ExperimentalCoroutinesApi
40+
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
41+
companion object Key : CoroutineContext.Key<ReactorContext>
42+
}
43+
44+
45+
/**
46+
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
47+
* and later retrieved via `coroutineContext[ReactorContext]`.
48+
*/
49+
@ExperimentalCoroutinesApi
50+
public fun Context.asCoroutineContext(): CoroutineContext = ReactorContext(this)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.reactive.*
5+
import org.junit.Test
6+
import reactor.util.context.Context
7+
import kotlin.test.assertEquals
8+
9+
class ReactorContextTest {
10+
@Test
11+
fun testMonoHookedContext() = runBlocking(Context.of(1, "1", 7, "7").asCoroutineContext()) {
12+
val mono = mono {
13+
val ctx = coroutineContext[ReactorContext]?.context
14+
buildString {
15+
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
16+
} }
17+
.subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
18+
.subscriberContext { ctx -> ctx.put(6, "6") }
19+
assertEquals(mono.awaitFirst(), "1234567")
20+
}
21+
22+
@Test
23+
fun testFluxContext() = runBlocking<Unit>(Context.of(1, "1", 7, "7").asCoroutineContext()) {
24+
val flux = flux<String?> {
25+
val ctx = coroutineContext[ReactorContext]!!.context
26+
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
27+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
28+
.subscriberContext { ctx -> ctx.put(6, "6") }
29+
var i = 0
30+
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
31+
}
32+
33+
@Test
34+
fun testContextFromCoroutineBuilderToReactor() = runBlocking<Unit> {
35+
launch(Context.of(1, "1").asCoroutineContext()) {
36+
val mono = mono {
37+
assertEquals(coroutineContext[ReactorContext]!!.context[1], "1")
38+
}
39+
mono.subscribe()
40+
}
41+
}
42+
43+
@Test
44+
fun testContextFromReactorToCoroutine() = runBlocking<Unit>(Context.of(1, "global 1").asCoroutineContext()) {
45+
mono {
46+
assertEquals(coroutineContext[ReactorContext]!!.context[2], "2")
47+
assertEquals(coroutineContext[ReactorContext]!!.context[1], "global 1")
48+
}.subscriberContext(Context.of(2, "2", 3, "3"))
49+
.subscribe()
50+
mono {
51+
assertEquals(coroutineContext[ReactorContext]!!.context.getOrDefault(1, "no value"), "global 1")
52+
}.subscribe()
53+
}
54+
}

0 commit comments

Comments
 (0)