Skip to content

Commit 6947e0d

Browse files
committed
Propagation of the coroutine context of await calls into Mono/Flux builders
Fixes #284
1 parent 1156e1c commit 6947e0d

File tree

8 files changed

+135
-8
lines changed

8 files changed

+135
-8
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
1414
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
1515
}
1616

17+
public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
18+
public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
19+
}
20+
1721
public final class kotlinx/coroutines/reactive/ConvertKt {
1822
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
1923
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines
2828
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
2929
}
3030

31+
public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector {
32+
public fun <init> ()V
33+
public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
34+
}
35+
3136
public final class kotlinx/coroutines/reactor/ReactorContextKt {
3237
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
3338
}

reactive/kotlinx-coroutines-reactive/src/Await.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
1010
import org.reactivestreams.Publisher
1111
import org.reactivestreams.Subscriber
1212
import org.reactivestreams.Subscription
13+
import java.util.*
1314
import kotlin.coroutines.*
1415

1516
/**
@@ -22,7 +23,7 @@ import kotlin.coroutines.*
2223
*
2324
* @throws NoSuchElementException if publisher does not emit any value
2425
*/
25-
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
26+
public suspend fun <T> Publisher<T>.awaitFirst(): T = injectCoroutineContext().awaitOne(Mode.FIRST)
2627

2728
/**
2829
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
@@ -32,7 +33,8 @@ public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
3233
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
3334
* immediately resumes with [CancellationException].
3435
*/
35-
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
36+
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T =
37+
injectCoroutineContext().awaitOne(Mode.FIRST_OR_DEFAULT, default)
3638

3739
/**
3840
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
@@ -42,7 +44,7 @@ public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOn
4244
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
4345
* immediately resumes with [CancellationException].
4446
*/
45-
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
47+
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = injectCoroutineContext().awaitOne(Mode.FIRST_OR_DEFAULT)
4648

4749
/**
4850
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
@@ -52,7 +54,8 @@ public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST
5254
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
5355
* immediately resumes with [CancellationException].
5456
*/
55-
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
57+
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
58+
injectCoroutineContext().awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
5659

5760
/**
5861
* Awaits for the last value from the given publisher without blocking a thread and
@@ -64,7 +67,7 @@ public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
6467
*
6568
* @throws NoSuchElementException if publisher does not emit any value
6669
*/
67-
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
70+
public suspend fun <T> Publisher<T>.awaitLast(): T = injectCoroutineContext().awaitOne(Mode.LAST)
6871

6972
/**
7073
* Awaits for the single value from the given publisher without blocking a thread and
@@ -77,10 +80,21 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
7780
* @throws NoSuchElementException if publisher does not emit any value
7881
* @throws IllegalArgumentException if publisher emits more than one value
7982
*/
80-
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
83+
public suspend fun <T> Publisher<T>.awaitSingle(): T = injectCoroutineContext().awaitOne(Mode.SINGLE)
8184

8285
// ------------------------ private ------------------------
8386

87+
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
88+
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
89+
private val contextInjectors: List<ContextInjector> =
90+
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
91+
92+
private suspend fun <T> Publisher<T>.injectCoroutineContext(): Publisher<T> {
93+
return if (contextInjectors.isNotEmpty()) {
94+
contextInjectors[0].injectCoroutineContext(this, coroutineContext)
95+
} else this
96+
}
97+
8498
private enum class Mode(val s: String) {
8599
FIRST("awaitFirst"),
86100
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package kotlinx.coroutines.reactive
2+
3+
import kotlinx.coroutines.InternalCoroutinesApi
4+
import org.reactivestreams.Publisher
5+
import kotlin.coroutines.CoroutineContext
6+
7+
/** @suppress */
8+
@InternalCoroutinesApi
9+
public interface ContextInjector {
10+
/**
11+
* Injects the coroutine context into the context of the publisher.
12+
*/
13+
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
kotlinx.coroutines.reactor.ReactorContextInjector

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+16
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@ import kotlin.coroutines.*
3030
* .subscribe()
3131
* }
3232
* ```
33+
*
34+
* [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
35+
* is propagated into [mono] and [flux] Reactor builders.
36+
*
37+
* Example usage:
38+
*
39+
* ```
40+
* launch(Context.of("key", "value").asCoroutineContext()) {
41+
* assertEquals(bar().awaitFirst(), "value")
42+
* }
43+
*
44+
* fun bar(): Mono<String> = mono {
45+
* coroutineContext[ReactorContext]!!.context.get("key")
46+
* }
47+
* ```
48+
}
3349
*/
3450
@ExperimentalCoroutinesApi
3551
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.InternalCoroutinesApi
4+
import kotlinx.coroutines.reactive.ContextInjector
5+
import org.reactivestreams.Publisher
6+
import reactor.core.publisher.Flux
7+
import reactor.core.publisher.Mono
8+
import reactor.util.context.Context
9+
import kotlin.coroutines.CoroutineContext
10+
11+
/** @suppress */
12+
@InternalCoroutinesApi
13+
class ReactorContextInjector : ContextInjector {
14+
/**
15+
* Injects all values from the [ReactorContext] entry of the given coroutine context
16+
* into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].
17+
*/
18+
override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
19+
val reactorContext = coroutineContext[ReactorContext]?.context
20+
return when(publisher) {
21+
is Mono ->
22+
publisher.subscriberContext(reactorContext ?: Context.empty())
23+
is Flux ->
24+
publisher.subscriberContext(reactorContext ?: Context.empty())
25+
else -> publisher
26+
}
27+
}
28+
}

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+47-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines.reactor
33
import kotlinx.coroutines.*
44
import kotlinx.coroutines.reactive.*
55
import org.junit.Test
6+
import reactor.core.publisher.*
67
import reactor.util.context.Context
78
import kotlin.test.assertEquals
89

@@ -14,8 +15,8 @@ class ReactorContextTest {
1415
buildString {
1516
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
1617
}
17-
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
18-
.subscriberContext { ctx -> ctx.put(6, "6") }
18+
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
19+
.subscriberContext { ctx -> ctx.put(6, "6") }
1920
assertEquals(mono.awaitFirst(), "1234567")
2021
}
2122

@@ -29,4 +30,48 @@ class ReactorContextTest {
2930
var i = 0
3031
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
3132
}
33+
34+
@Test
35+
fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
36+
val result = mono(Context.of(1, "1").asCoroutineContext()) {
37+
val ctx = coroutineContext[ReactorContext]?.context
38+
buildString {
39+
(1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) }
40+
}
41+
} .subscriberContext(Context.of(2, "2"))
42+
.awaitFirst()
43+
assertEquals(result, "123")
44+
}
45+
46+
@Test
47+
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
48+
assertEquals(m().awaitFirst(), "7")
49+
assertEquals(m().awaitFirstOrDefault("noValue"), "7")
50+
assertEquals(m().awaitFirstOrNull(), "7")
51+
assertEquals(m().awaitFirstOrElse { "noValue" }, "7")
52+
assertEquals(m().awaitLast(), "7")
53+
assertEquals(m().awaitSingle(), "7")
54+
}
55+
56+
@Test
57+
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) {
58+
assertEquals(f().awaitFirst(), "1")
59+
assertEquals(f().awaitFirstOrDefault("noValue"), "1")
60+
assertEquals(f().awaitFirstOrNull(), "1")
61+
assertEquals(f().awaitFirstOrElse { "noValue" }, "1")
62+
assertEquals(f().awaitLast(), "3")
63+
var i = 0
64+
f().subscribe { str -> i++; assertEquals(str, i.toString()) }
65+
}
66+
67+
private fun m(): Mono<String> = mono {
68+
val ctx = coroutineContext[ReactorContext]?.context
69+
ctx?.getOrDefault(7, "noValue")
70+
}
71+
72+
73+
private fun f(): Flux<String?> = flux {
74+
val ctx = coroutineContext[ReactorContext]?.context
75+
(1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) }
76+
}
3277
}

0 commit comments

Comments
 (0)