Skip to content

Commit ebe310b

Browse files
committed
Support context in Flow.asPublisher and similar methods
Fixes #2155
1 parent 716d21c commit ebe310b

File tree

16 files changed

+557
-19
lines changed

16 files changed

+557
-19
lines changed

reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public final class kotlinx/coroutines/jdk9/PublishKt {
1515
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
1616
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
1717
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
18+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
19+
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
1820
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1921
}
2022

reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt

+11-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
package kotlinx.coroutines.jdk9
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
89
import kotlinx.coroutines.reactive.asFlow
910
import kotlinx.coroutines.reactive.asPublisher
1011
import kotlinx.coroutines.reactive.collect
12+
import org.reactivestreams.*
13+
import kotlin.coroutines.*
1114
import java.util.concurrent.Flow as JFlow
12-
import org.reactivestreams.FlowAdapters
1315

1416
/**
1517
* Transforms the given reactive [Publisher] into [Flow].
@@ -25,9 +27,15 @@ public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
2527

2628
/**
2729
* Transforms the given flow to a reactive specification compliant [Publisher].
30+
*
31+
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
32+
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
33+
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
34+
* is used, so calls are performed from an arbitrary thread.
2835
*/
29-
public fun <T : Any> Flow<T>.asPublisher(): JFlow.Publisher<T> {
30-
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>()
36+
@JvmOverloads // binary compatibility
37+
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
38+
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
3139
return FlowAdapters.toFlowPublisher(reactivePublisher)
3240
}
3341

reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class kotlinx/coroutines/reactive/FlowKt {
3232
public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
3333
public final field flow Lkotlinx/coroutines/flow/Flow;
3434
public final field subscriber Lorg/reactivestreams/Subscriber;
35-
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
35+
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;Lkotlin/coroutines/CoroutineContext;)V
3636
public fun cancel ()V
3737
public fun request (J)V
3838
}
@@ -65,5 +65,7 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
6565
public final class kotlinx/coroutines/reactive/ReactiveFlowKt {
6666
public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
6767
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
68+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
69+
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
6870
}
6971

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+16-5
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,15 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
3434
*
3535
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
3636
* see its documentation for additional details.
37+
*
38+
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
39+
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
40+
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
41+
* is used, so calls are performed from an arbitrary thread.
3742
*/
38-
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
43+
@JvmOverloads // binary compatibility
44+
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
45+
FlowAsPublisher(this, Dispatchers.Unconfined + context)
3946

4047
private class PublisherAsFlow<T : Any>(
4148
private val publisher: Publisher<T>,
@@ -154,19 +161,23 @@ internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: Coroutine
154161
* [cancel] invocation cancels the original flow.
155162
*/
156163
@Suppress("PublisherImplementation")
157-
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
164+
private class FlowAsPublisher<T : Any>(
165+
private val flow: Flow<T>,
166+
private val context: CoroutineContext
167+
) : Publisher<T> {
158168
override fun subscribe(subscriber: Subscriber<in T>?) {
159169
if (subscriber == null) throw NullPointerException()
160-
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
170+
subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
161171
}
162172
}
163173

164174
/** @suppress */
165175
@InternalCoroutinesApi
166176
public class FlowSubscription<T>(
167177
@JvmField public val flow: Flow<T>,
168-
@JvmField public val subscriber: Subscriber<in T>
169-
) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) {
178+
@JvmField public val subscriber: Subscriber<in T>,
179+
context: CoroutineContext
180+
) : Subscription, AbstractCoroutine<Unit>(context, true) {
170181
private val requested = atomic(0L)
171182
private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
172183

reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt

+75-1
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import org.junit.Test
1010
import org.reactivestreams.*
11+
import java.util.concurrent.*
1112
import kotlin.test.*
1213

1314
class FlowAsPublisherTest : TestBase() {
14-
1515
@Test
1616
fun testErrorOnCancellationIsReported() {
1717
expect(1)
@@ -75,4 +75,78 @@ class FlowAsPublisherTest : TestBase() {
7575
})
7676
finish(4)
7777
}
78+
79+
@Test
80+
fun testUnconfinedDefaultContext() {
81+
expect(1)
82+
val thread = Thread.currentThread()
83+
fun checkThread() {
84+
assertSame(thread, Thread.currentThread())
85+
}
86+
flowOf(42).asPublisher().subscribe(object : Subscriber<Int> {
87+
private lateinit var subscription: Subscription
88+
89+
override fun onSubscribe(s: Subscription) {
90+
expect(2)
91+
subscription = s
92+
subscription.request(2)
93+
}
94+
95+
override fun onNext(t: Int) {
96+
checkThread()
97+
expect(3)
98+
assertEquals(42, t)
99+
}
100+
101+
override fun onComplete() {
102+
checkThread()
103+
expect(4)
104+
}
105+
106+
override fun onError(t: Throwable?) {
107+
expectUnreached()
108+
}
109+
})
110+
finish(5)
111+
}
112+
113+
@Test
114+
fun testConfinedContext() {
115+
expect(1)
116+
val threadName = "FlowAsPublisherTest.testConfinedContext"
117+
fun checkThread() {
118+
val currentThread = Thread.currentThread()
119+
assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
120+
}
121+
val completed = CountDownLatch(1)
122+
newSingleThreadContext(threadName).use { dispatcher ->
123+
flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> {
124+
private lateinit var subscription: Subscription
125+
126+
override fun onSubscribe(s: Subscription) {
127+
expect(2)
128+
subscription = s
129+
subscription.request(2)
130+
}
131+
132+
override fun onNext(t: Int) {
133+
checkThread()
134+
expect(3)
135+
assertEquals(42, t)
136+
}
137+
138+
override fun onComplete() {
139+
checkThread()
140+
expect(4)
141+
completed.countDown()
142+
}
143+
144+
override fun onError(t: Throwable?) {
145+
expectUnreached()
146+
}
147+
})
148+
completed.await()
149+
}
150+
finish(5)
151+
}
78152
}

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public final class kotlinx/coroutines/reactor/ReactorContextKt {
3838

3939
public final class kotlinx/coroutines/reactor/ReactorFlowKt {
4040
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
41+
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Flux;
42+
public static synthetic fun asFlux$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
4143
}
4244

4345
public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {

reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt

+16-3
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,38 @@
44

55
package kotlinx.coroutines.reactor
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.Flow
89
import kotlinx.coroutines.flow.flowOn
910
import kotlinx.coroutines.reactive.FlowSubscription
11+
import org.reactivestreams.*
1012
import reactor.core.CoreSubscriber
1113
import reactor.core.publisher.Flux
14+
import kotlin.coroutines.*
1215

1316
/**
1417
* Converts the given flow to a cold flux.
1518
* The original flow is cancelled when the flux subscriber is disposed.
1619
*
1720
* This function is integrated with [ReactorContext], see its documentation for additional details.
21+
*
22+
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
23+
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
24+
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
25+
* is used, so calls are performed from an arbitrary thread.
1826
*/
19-
public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
27+
@JvmOverloads // binary compatibility
28+
public fun <T: Any> Flow<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> =
29+
FlowAsFlux(this, Dispatchers.Unconfined + context)
2030

21-
private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
31+
private class FlowAsFlux<T : Any>(
32+
private val flow: Flow<T>,
33+
private val context: CoroutineContext
34+
) : Flux<T>() {
2235
override fun subscribe(subscriber: CoreSubscriber<in T>?) {
2336
if (subscriber == null) throw NullPointerException()
2437
val hasContext = !subscriber.currentContext().isEmpty
2538
val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
26-
subscriber.onSubscribe(FlowSubscription(source, subscriber))
39+
subscriber.onSubscribe(FlowSubscription(source, subscriber, context))
2740
}
2841
}

reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt

+76
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import kotlinx.coroutines.*
44
import kotlinx.coroutines.flow.*
55
import kotlinx.coroutines.reactive.*
66
import org.junit.Test
7+
import org.reactivestreams.*
78
import reactor.core.publisher.*
89
import reactor.util.context.Context
10+
import java.util.concurrent.*
911
import kotlin.test.*
1012

1113
class FlowAsFluxTest : TestBase() {
@@ -68,4 +70,78 @@ class FlowAsFluxTest : TestBase() {
6870
}
6971
finish(4)
7072
}
73+
74+
@Test
75+
fun testUnconfinedDefaultContext() {
76+
expect(1)
77+
val thread = Thread.currentThread()
78+
fun checkThread() {
79+
assertSame(thread, Thread.currentThread())
80+
}
81+
flowOf(42).asFlux().subscribe(object : Subscriber<Int> {
82+
private lateinit var subscription: Subscription
83+
84+
override fun onSubscribe(s: Subscription) {
85+
expect(2)
86+
subscription = s
87+
subscription.request(2)
88+
}
89+
90+
override fun onNext(t: Int) {
91+
checkThread()
92+
expect(3)
93+
assertEquals(42, t)
94+
}
95+
96+
override fun onComplete() {
97+
checkThread()
98+
expect(4)
99+
}
100+
101+
override fun onError(t: Throwable?) {
102+
expectUnreached()
103+
}
104+
})
105+
finish(5)
106+
}
107+
108+
@Test
109+
fun testConfinedContext() {
110+
expect(1)
111+
val threadName = "FlowAsFluxTest.testConfinedContext"
112+
fun checkThread() {
113+
val currentThread = Thread.currentThread()
114+
assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
115+
}
116+
val completed = CountDownLatch(1)
117+
newSingleThreadContext(threadName).use { dispatcher ->
118+
flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber<Int> {
119+
private lateinit var subscription: Subscription
120+
121+
override fun onSubscribe(s: Subscription) {
122+
expect(2)
123+
subscription = s
124+
subscription.request(2)
125+
}
126+
127+
override fun onNext(t: Int) {
128+
checkThread()
129+
expect(3)
130+
assertEquals(42, t)
131+
}
132+
133+
override fun onComplete() {
134+
checkThread()
135+
expect(4)
136+
completed.countDown()
137+
}
138+
139+
override fun onError(t: Throwable?) {
140+
expectUnreached()
141+
}
142+
})
143+
completed.await()
144+
}
145+
finish(5)
146+
}
71147
}

reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
3535
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
3636
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
3737
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
38+
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
39+
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
40+
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
41+
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
3842
}
3943

4044
public final class kotlinx/coroutines/rx2/RxFlowableKt {

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

+17-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.coroutines.*
1010
import kotlinx.coroutines.channels.*
1111
import kotlinx.coroutines.flow.*
1212
import kotlinx.coroutines.reactive.*
13+
import org.reactivestreams.*
1314
import java.util.concurrent.atomic.*
1415
import kotlin.coroutines.*
1516

@@ -106,15 +107,21 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
106107
/**
107108
* Converts the given flow to a cold observable.
108109
* The original flow is cancelled when the observable subscriber is disposed.
110+
*
111+
* An optional [context] can be specified to control the execution context of calls to [Observer] methods.
112+
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
113+
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
114+
* is used, so calls are performed from an arbitrary thread.
109115
*/
116+
@JvmOverloads // binary compatibility
110117
@JvmName("from")
111118
@ExperimentalCoroutinesApi
112-
public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
119+
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
113120
/*
114121
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
115122
* asObservable is already invoked from unconfined
116123
*/
117-
val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
124+
val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
118125
try {
119126
collect { value -> emitter.onNext(value) }
120127
emitter.onComplete()
@@ -135,7 +142,14 @@ public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create {
135142
/**
136143
* Converts the given flow to a cold flowable.
137144
* The original flow is cancelled when the flowable subscriber is disposed.
145+
*
146+
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
147+
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
148+
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
149+
* is used, so calls are performed from an arbitrary thread.
138150
*/
151+
@JvmOverloads // binary compatibility
139152
@JvmName("from")
140153
@ExperimentalCoroutinesApi
141-
public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
154+
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
155+
Flowable.fromPublisher(asPublisher(context))

0 commit comments

Comments
 (0)