Skip to content

Commit 975ed04

Browse files
committed
fixup! Reactor coroutine context propagation in more places
1 parent 4c069fc commit 975ed04

File tree

4 files changed

+36
-26
lines changed

4 files changed

+36
-26
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

-5
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines
3232
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
3333
}
3434

35-
public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector {
36-
public fun <init> ()V
37-
public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
38-
}
39-
4035
public final class kotlinx/coroutines/reactor/ReactorContextKt {
4136
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
4237
}

reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ private class PublisherAsFlow<T : Any>(
4747
// use another channel for conflation (cannot do openSubscription)
4848
if (capacity < 0) return super.produceImpl(scope)
4949
// Open subscription channel directly
50-
val channel = publisher.openSubscription(capacity)
50+
val channel = publisher
51+
.injectCoroutineContext(scope.coroutineContext)
52+
.openSubscription(capacity)
5153
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
5254
channel.cancel(cause?.let {
5355
it as? CancellationException ?: CancellationException("Job was cancelled", it)

reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt

+6-10
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package kotlinx.coroutines.reactor
22

3-
import kotlinx.coroutines.InternalCoroutinesApi
4-
import kotlinx.coroutines.reactive.ContextInjector
5-
import org.reactivestreams.Publisher
6-
import reactor.core.publisher.Flux
7-
import reactor.core.publisher.Mono
8-
import reactor.util.context.Context
9-
import kotlin.coroutines.CoroutineContext
3+
import kotlinx.coroutines.reactive.*
4+
import org.reactivestreams.*
5+
import reactor.core.publisher.*
6+
import reactor.util.context.*
7+
import kotlin.coroutines.*
108

11-
/** @suppress */
12-
@InternalCoroutinesApi
13-
class ReactorContextInjector : ContextInjector {
9+
internal class ReactorContextInjector : ContextInjector {
1410
/**
1511
* Injects all values from the [ReactorContext] entry of the given coroutine context
1612
* into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+27-10
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package kotlinx.coroutines.reactor
22

33
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.channels.*
5+
import kotlinx.coroutines.flow.*
46
import kotlinx.coroutines.reactive.*
57
import org.junit.Test
68
import reactor.core.publisher.*
7-
import reactor.util.context.Context
8-
import kotlin.test.assertEquals
9-
import kotlinx.coroutines.flow.*
9+
import reactor.util.context.*
10+
import kotlin.test.*
1011

1112
class ReactorContextTest {
1213
@Test
@@ -55,7 +56,9 @@ class ReactorContextTest {
5556
}
5657

5758
@Test
58-
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) {
59+
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
60+
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
61+
) {
5962
assertEquals(f().awaitFirst(), "1")
6063
assertEquals(f().awaitFirstOrDefault("noValue"), "1")
6164
assertEquals(f().awaitFirstOrNull(), "1")
@@ -77,18 +80,32 @@ class ReactorContextTest {
7780
}
7881

7982
@Test
80-
fun testFlowToFluxContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) {
83+
fun testFlowToFluxContextPropagation() = runBlocking(
84+
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
85+
) {
8186
var i = 0
87+
// call "collect" on the converted Flow
8288
bar().collect { str ->
8389
i++; assertEquals(str, i.toString())
8490
}
8591
assertEquals(i, 3)
8692
}
8793

88-
suspend fun bar(): Flow<String> {
89-
return flux {
90-
val ctx = coroutineContext[ReactorContext]!!.context
91-
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
92-
}.asFlow()
94+
@Test
95+
fun testFlowToFluxDirectContextPropagation() = runBlocking(
96+
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
97+
) {
98+
var i = 0
99+
// convert resulting flow to channel using "produceIn"
100+
val channel = bar().produceIn(this)
101+
channel.consumeEach { str ->
102+
i++; assertEquals(str, i.toString())
103+
}
104+
assertEquals(i, 3)
93105
}
106+
107+
private fun bar(): Flow<String> = flux {
108+
val ctx = coroutineContext[ReactorContext]!!.context
109+
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
110+
}.asFlow()
94111
}

0 commit comments

Comments
 (0)