Skip to content

Commit 1dcfd97

Browse files
SokolovaMariaqwwdfsad
authored andcommitted
Coroutine context propagation for Reactor to coroutines API migration (#1377)
* Propagation of the coroutine context of await calls into Mono/Flux builder * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher * Flow.asFlux transform * Optimized FlowSubscription * kotlinx.coroutines.reactor.flow package is replaced with kotlinx.coroutines.reactor Fixes #284
1 parent c7e9b56 commit 1dcfd97

25 files changed

+441
-138
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+4
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,10 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c
977977
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
978978
}
979979

980+
public final class kotlinx/coroutines/intrinsics/CancellableKt {
981+
public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
982+
}
983+
980984
public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
981985
public synthetic fun <init> (II)V
982986
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+18-9
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
1414
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
1515
}
1616

17+
public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
18+
public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
19+
}
20+
1721
public final class kotlinx/coroutines/reactive/ConvertKt {
1822
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
1923
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
2024
}
2125

26+
public final class kotlinx/coroutines/reactive/FlowKt {
27+
public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
28+
public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
29+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
30+
}
31+
32+
public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
33+
public final field flow Lkotlinx/coroutines/flow/Flow;
34+
public final field subscriber Lorg/reactivestreams/Subscriber;
35+
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
36+
public fun cancel ()V
37+
public fun request (J)V
38+
}
39+
2240
public final class kotlinx/coroutines/reactive/PublishKt {
2341
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
2442
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
@@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
4462
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4563
}
4664

47-
public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
48-
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
49-
}
50-
51-
public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
52-
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
53-
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
54-
}
55-

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
55
public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
66
}
77

8+
public final class kotlinx/coroutines/reactor/FlowKt {
9+
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
10+
}
11+
812
public final class kotlinx/coroutines/reactor/FluxKt {
913
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
1014
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;

kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import kotlin.coroutines.intrinsics.*
1212
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
1313
* while waiting to be dispatched.
1414
*/
15-
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
15+
@InternalCoroutinesApi
16+
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
1617
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
1718
}
1819

reactive/kotlinx-coroutines-reactive/src/Await.kt

+12-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
1010
import org.reactivestreams.Publisher
1111
import org.reactivestreams.Subscriber
1212
import org.reactivestreams.Subscription
13+
import java.util.*
1314
import kotlin.coroutines.*
1415

1516
/**
@@ -81,6 +82,16 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
8182

8283
// ------------------------ private ------------------------
8384

85+
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
86+
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
87+
private val contextInjectors: Array<ContextInjector> =
88+
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
89+
90+
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
91+
contextInjectors.fold(this) { pub, contextInjector ->
92+
contextInjector.injectCoroutineContext(pub, coroutineContext)
93+
}
94+
8495
private enum class Mode(val s: String) {
8596
FIRST("awaitFirst"),
8697
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -93,7 +104,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
93104
mode: Mode,
94105
default: T? = null
95106
): T = suspendCancellableCoroutine { cont ->
96-
subscribe(object : Subscriber<T> {
107+
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
97108
private lateinit var subscription: Subscription
98109
private var value: T? = null
99110
private var seenValue = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package kotlinx.coroutines.reactive
2+
3+
import kotlinx.coroutines.InternalCoroutinesApi
4+
import org.reactivestreams.Publisher
5+
import kotlin.coroutines.CoroutineContext
6+
7+
/** @suppress */
8+
@InternalCoroutinesApi
9+
public interface ContextInjector {
10+
/**
11+
* Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
12+
* This API used as an indirection layer between `reactive` and `reactor` modules.
13+
*/
14+
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:JvmMultifileClass
6+
@file:JvmName("FlowKt")
7+
8+
package kotlinx.coroutines.reactive
9+
10+
import kotlinx.atomicfu.*
11+
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.flow.*
13+
import org.reactivestreams.*
14+
import kotlinx.coroutines.intrinsics.*
15+
16+
/**
17+
* Transforms the given flow to a spec-compliant [Publisher].
18+
*/
19+
@ExperimentalCoroutinesApi
20+
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
21+
22+
/**
23+
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
24+
* [cancel] invocation cancels the original flow.
25+
*/
26+
@Suppress("PublisherImplementation")
27+
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
28+
override fun subscribe(subscriber: Subscriber<in T>?) {
29+
if (subscriber == null) throw NullPointerException()
30+
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
31+
}
32+
}
33+
34+
/** @suppress */
35+
@InternalCoroutinesApi
36+
public class FlowSubscription<T>(
37+
@JvmField val flow: Flow<T>,
38+
@JvmField val subscriber: Subscriber<in T>
39+
) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
40+
private val requested = atomic(0L)
41+
private val producer = atomic<CancellableContinuation<Unit>?>(null)
42+
43+
override fun onStart() {
44+
::flowProcessing.startCoroutineCancellable(this)
45+
}
46+
47+
private suspend fun flowProcessing() {
48+
try {
49+
consumeFlow()
50+
subscriber.onComplete()
51+
} catch (e: Throwable) {
52+
try {
53+
if (e is CancellationException) {
54+
subscriber.onComplete()
55+
} else {
56+
subscriber.onError(e)
57+
}
58+
} catch (e: Throwable) {
59+
// Last ditch report
60+
handleCoroutineException(coroutineContext, e)
61+
}
62+
}
63+
}
64+
65+
/*
66+
* This method has at most one caller at any time (triggered from the `request` method)
67+
*/
68+
private suspend fun consumeFlow() {
69+
flow.collect { value ->
70+
/*
71+
* Flow is scopeless, thus if it's not active, its subscription was cancelled.
72+
* No intermediate "child failed, but flow coroutine is not" states are allowed.
73+
*/
74+
coroutineContext.ensureActive()
75+
if (requested.value <= 0L) {
76+
suspendCancellableCoroutine<Unit> {
77+
producer.value = it
78+
if (requested.value != 0L) it.resumeSafely()
79+
}
80+
}
81+
requested.decrementAndGet()
82+
subscriber.onNext(value)
83+
}
84+
}
85+
86+
override fun cancel() {
87+
cancel(null)
88+
}
89+
90+
override fun request(n: Long) {
91+
if (n <= 0) {
92+
return
93+
}
94+
start()
95+
requested.update { value ->
96+
val newValue = value + n
97+
if (newValue <= 0L) Long.MAX_VALUE else newValue
98+
}
99+
val producer = producer.getAndSet(null) ?: return
100+
producer.resumeSafely()
101+
}
102+
103+
private fun CancellableContinuation<Unit>.resumeSafely() {
104+
val token = tryResume(Unit)
105+
if (token != null) {
106+
completeResume(token)
107+
}
108+
}
109+
}

reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt renamed to reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt

+17-6
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package kotlinx.coroutines.reactive.flow
5+
@file:JvmMultifileClass
6+
@file:JvmName("FlowKt")
7+
8+
package kotlinx.coroutines.reactive
69

710
import kotlinx.coroutines.*
811
import kotlinx.coroutines.channels.*
912
import kotlinx.coroutines.flow.*
1013
import kotlinx.coroutines.flow.internal.*
11-
import kotlinx.coroutines.reactive.*
1214
import org.reactivestreams.*
15+
import java.util.*
1316
import kotlin.coroutines.*
1417

1518
/**
@@ -21,13 +24,11 @@ import kotlin.coroutines.*
2124
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
2225
* are discarded.
2326
*/
24-
@JvmName("from")
2527
@ExperimentalCoroutinesApi
2628
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
2729
PublisherAsFlow(this, 1)
2830

2931
@FlowPreview
30-
@JvmName("from")
3132
@Deprecated(
3233
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
3334
level = DeprecationLevel.ERROR,
@@ -46,7 +47,9 @@ private class PublisherAsFlow<T : Any>(
4647
// use another channel for conflation (cannot do openSubscription)
4748
if (capacity < 0) return super.produceImpl(scope)
4849
// Open subscription channel directly
49-
val channel = publisher.openSubscription(capacity)
50+
val channel = publisher
51+
.injectCoroutineContext(scope.coroutineContext)
52+
.openSubscription(capacity)
5053
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
5154
channel.cancel(cause?.let {
5255
it as? CancellationException ?: CancellationException("Job was cancelled", it)
@@ -70,7 +73,7 @@ private class PublisherAsFlow<T : Any>(
7073

7174
override suspend fun collect(collector: FlowCollector<T>) {
7275
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
73-
publisher.subscribe(subscriber)
76+
publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
7477
try {
7578
var consumed = 0L
7679
while (true) {
@@ -127,3 +130,11 @@ private class ReactiveSubscriber<T : Any>(
127130
subscription.cancel()
128131
}
129132
}
133+
134+
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
135+
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
136+
private val contextInjectors: List<ContextInjector> =
137+
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
138+
139+
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
140+
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }

0 commit comments

Comments
 (0)