Skip to content

Commit 4a036fe

Browse files
committed
Remove context injection from JDK9 Flow integration
1 parent 0cc349d commit 4a036fe

File tree

4 files changed

+5
-43
lines changed

4 files changed

+5
-43
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-jdk9.txt

-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ public final class kotlinx/coroutines/jdk9/AwaitKt {
77
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
88
}
99

10-
public abstract interface class kotlinx/coroutines/jdk9/ContextInjector {
11-
public abstract fun injectCoroutineContext (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
12-
}
13-
1410
public final class kotlinx/coroutines/jdk9/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription {
1511
public final field flow Lkotlinx/coroutines/flow/Flow;
1612
public final field subscriber Ljava/util/concurrent/Flow$Subscriber;

reactive/kotlinx-coroutines-jdk9/src/Await.kt

+1-11
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,6 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
8282

8383
// ------------------------ private ------------------------
8484

85-
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
86-
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
87-
private val contextInjectors: Array<ContextInjector> =
88-
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
89-
90-
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
91-
contextInjectors.fold(this) { pub, contextInjector ->
92-
contextInjector.injectCoroutineContext(pub, coroutineContext)
93-
}
94-
9585
private enum class Mode(val s: String) {
9686
FIRST("awaitFirst"),
9787
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -104,7 +94,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
10494
mode: Mode,
10595
default: T? = null
10696
): T = suspendCancellableCoroutine { cont ->
107-
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
97+
subscribe(object : Subscriber<T> {
10898
private lateinit var subscription: Subscription
10999
private var value: T? = null
110100
private var seenValue = false

reactive/kotlinx-coroutines-jdk9/src/ContextInjector.kt

-15
This file was deleted.

reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt

+4-13
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private class PublisherAsFlow<T : Any>(
6565
val newDispatcher = context[ContinuationInterceptor]
6666
if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
6767
// fast path -- subscribe directly in this dispatcher
68-
return collectImpl(collectContext + context, collector)
68+
return collectImpl(collector)
6969
}
7070
// slow path -- produce in a separate dispatcher
7171
collectSlowPath(collector)
@@ -77,10 +77,10 @@ private class PublisherAsFlow<T : Any>(
7777
}
7878
}
7979

80-
private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
80+
private suspend fun collectImpl(collector: FlowCollector<T>) {
8181
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
8282
// inject subscribe context into publisher
83-
publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
83+
publisher.subscribe(subscriber)
8484
try {
8585
var consumed = 0L
8686
while (true) {
@@ -98,7 +98,7 @@ private class PublisherAsFlow<T : Any>(
9898

9999
// The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
100100
override suspend fun collectTo(scope: ProducerScope<T>) =
101-
collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
101+
collectImpl(SendingCollector(scope.channel))
102102
}
103103

104104
@Suppress("SubscriberImplementation")
@@ -138,15 +138,6 @@ private class ReactiveSubscriber<T : Any>(
138138
}
139139
}
140140

141-
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
142-
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
143-
private val contextInjectors: List<ContextInjector> =
144-
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
145-
146-
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
147-
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
148-
149-
150141
/**
151142
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
152143
* [cancel] invocation cancels the original flow.

0 commit comments

Comments
 (0)