diff --git a/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api index d4bc1698ef..1f5bdec7d0 100644 --- a/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api +++ b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api @@ -15,6 +15,8 @@ public final class kotlinx/coroutines/jdk9/PublishKt { public final class kotlinx/coroutines/jdk9/ReactiveFlowKt { public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow; public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher; + public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher; + public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher; public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt index 89caf82c54..5d546dffd3 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt @@ -4,12 +4,14 @@ package kotlinx.coroutines.jdk9 +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.asPublisher import kotlinx.coroutines.reactive.collect +import org.reactivestreams.* +import kotlin.coroutines.* import java.util.concurrent.Flow as JFlow -import org.reactivestreams.FlowAdapters /** * Transforms the given reactive [Publisher] into [Flow]. @@ -25,9 +27,15 @@ public fun JFlow.Publisher.asFlow(): Flow = /** * Transforms the given flow to a reactive specification compliant [Publisher]. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ -public fun Flow.asPublisher(): JFlow.Publisher { - val reactivePublisher : org.reactivestreams.Publisher = this.asPublisher() +@JvmOverloads // binary compatibility +public fun Flow.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher { + val reactivePublisher : org.reactivestreams.Publisher = this.asPublisher(context) return FlowAdapters.toFlowPublisher(reactivePublisher) } diff --git a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api index bed065d582..5783edeaa1 100644 --- a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api +++ b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api @@ -32,7 +32,7 @@ public final class kotlinx/coroutines/reactive/FlowKt { public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription { public final field flow Lkotlinx/coroutines/flow/Flow; public final field subscriber Lorg/reactivestreams/Subscriber; - public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V + public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;Lkotlin/coroutines/CoroutineContext;)V public fun cancel ()V public fun request (J)V } @@ -65,5 +65,7 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro public final class kotlinx/coroutines/reactive/ReactiveFlowKt { public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; + public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; + public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; } diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index efa9c9c9f1..a51f583b77 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -34,8 +34,15 @@ public fun Publisher.asFlow(): Flow = * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ -public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) +@JvmOverloads // binary compatibility +public fun Flow.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = + FlowAsPublisher(this, Dispatchers.Unconfined + context) private class PublisherAsFlow( private val publisher: Publisher, @@ -153,11 +160,14 @@ internal fun Publisher.injectCoroutineContext(coroutineContext: Coroutine * Adapter that transforms [Flow] into TCK-complaint [Publisher]. * [cancel] invocation cancels the original flow. */ -@Suppress("PublisherImplementation") -private class FlowAsPublisher(private val flow: Flow) : Publisher { +@Suppress("ReactiveStreamsPublisherImplementation") +private class FlowAsPublisher( + private val flow: Flow, + private val context: CoroutineContext +) : Publisher { override fun subscribe(subscriber: Subscriber?) { if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe(FlowSubscription(flow, subscriber)) + subscriber.onSubscribe(FlowSubscription(flow, subscriber, context)) } } @@ -165,8 +175,9 @@ private class FlowAsPublisher(private val flow: Flow) : Publisher @InternalCoroutinesApi public class FlowSubscription( @JvmField public val flow: Flow, - @JvmField public val subscriber: Subscriber -) : Subscription, AbstractCoroutine(Dispatchers.Unconfined, true) { + @JvmField public val subscriber: Subscriber, + context: CoroutineContext +) : Subscription, AbstractCoroutine(context, true) { private val requested = atomic(0L) private val producer = atomic?>(createInitialContinuation()) diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index c044d92725..e7b8cb17ae 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -8,10 +8,10 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test import org.reactivestreams.* +import java.util.concurrent.* import kotlin.test.* class FlowAsPublisherTest : TestBase() { - @Test fun testErrorOnCancellationIsReported() { expect(1) @@ -75,4 +75,78 @@ class FlowAsPublisherTest : TestBase() { }) finish(4) } + + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asPublisher().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsPublisherTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } } diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 422f36b1ea..3b5c6b9522 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -38,6 +38,8 @@ public final class kotlinx/coroutines/reactor/ReactorContextKt { public final class kotlinx/coroutines/reactor/ReactorFlowKt { public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux; + public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Flux; + public static synthetic fun asFlux$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lreactor/core/publisher/Flux; } public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index d665c88d35..a478ab1ef8 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -4,25 +4,38 @@ package kotlinx.coroutines.reactor +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.reactive.FlowSubscription +import org.reactivestreams.* import reactor.core.CoreSubscriber import reactor.core.publisher.Flux +import kotlin.coroutines.* /** * Converts the given flow to a cold flux. * The original flow is cancelled when the flux subscriber is disposed. * * This function is integrated with [ReactorContext], see its documentation for additional details. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ -public fun Flow.asFlux(): Flux = FlowAsFlux(this) +@JvmOverloads // binary compatibility +public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = + FlowAsFlux(this, Dispatchers.Unconfined + context) -private class FlowAsFlux(private val flow: Flow) : Flux() { +private class FlowAsFlux( + private val flow: Flow, + private val context: CoroutineContext +) : Flux() { override fun subscribe(subscriber: CoreSubscriber?) { if (subscriber == null) throw NullPointerException() val hasContext = !subscriber.currentContext().isEmpty val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow - subscriber.onSubscribe(FlowSubscription(source, subscriber)) + subscriber.onSubscribe(FlowSubscription(source, subscriber, context)) } } diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index e4bd8b315b..cecc89592e 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -4,10 +4,13 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test +import org.reactivestreams.* import reactor.core.publisher.* import reactor.util.context.Context +import java.util.concurrent.* import kotlin.test.* +@Suppress("ReactiveStreamsSubscriberImplementation") class FlowAsFluxTest : TestBase() { @Test fun testFlowAsFluxContextPropagation() { @@ -68,4 +71,78 @@ class FlowAsFluxTest : TestBase() { } finish(4) } -} \ No newline at end of file + + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asFlux().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsFluxTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 22f40384f0..06ddb68e9c 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -35,6 +35,10 @@ public final class kotlinx/coroutines/rx2/RxConvertKt { public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single; public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable; public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable; + public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable; + public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable; + public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable; + public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable; } public final class kotlinx/coroutines/rx2/RxFlowableKt { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 0be606ffc2..264cdad43c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* +import org.reactivestreams.* import java.util.concurrent.atomic.* import kotlin.coroutines.* @@ -106,15 +107,21 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. + * + * An optional [context] can be specified to control the execution context of calls to [Observer] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ +@JvmOverloads // binary compatibility @JvmName("from") @ExperimentalCoroutinesApi -public fun Flow.asObservable() : Observable = Observable.create { emitter -> +public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if * asObservable is already invoked from unconfined */ - val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { + val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) { try { collect { value -> emitter.onNext(value) } emitter.onComplete() @@ -135,7 +142,14 @@ public fun Flow.asObservable() : Observable = Observable.create { /** * Converts the given flow to a cold flowable. * The original flow is cancelled when the flowable subscriber is disposed. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ +@JvmOverloads // binary compatibility @JvmName("from") @ExperimentalCoroutinesApi -public fun Flow.asFlowable(): Flowable = Flowable.fromPublisher(asPublisher()) +public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = + Flowable.fromPublisher(asPublisher(context)) diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt new file mode 100644 index 0000000000..1cbded6dc3 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Test +import org.reactivestreams.* +import java.util.concurrent.* +import kotlin.test.* + +@Suppress("ReactiveStreamsSubscriberImplementation") +class FlowAsFlowableTest : TestBase() { + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asFlowable().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsFlowableTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt index 0908b34cf2..3cde182260 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt @@ -4,9 +4,12 @@ package kotlinx.coroutines.rx2 +import io.reactivex.* +import io.reactivex.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class FlowAsObservableTest : TestBase() { @@ -139,4 +142,70 @@ class FlowAsObservableTest : TestBase() { observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) }) } + + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asObservable().subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsObservableTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asObservable(dispatcher).subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(e: Throwable) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 27c3d3dfa0..4f15eda7d4 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -30,6 +30,10 @@ public final class kotlinx/coroutines/rx3/RxConvertKt { public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single; public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable; public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable; + public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable; + public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable; + public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable; + public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable; } public final class kotlinx/coroutines/rx3/RxFlowableKt { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index f9e2e2158f..c7ab237cea 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* +import org.reactivestreams.* import java.util.concurrent.atomic.* import kotlin.coroutines.* @@ -91,15 +92,21 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. + * + * An optional [context] can be specified to control the execution context of calls to [Observer] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ +@JvmOverloads // binary compatibility @JvmName("from") @ExperimentalCoroutinesApi -public fun Flow.asObservable() : Observable = Observable.create { emitter -> +public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if * asObservable is already invoked from unconfined */ - val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { + val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) { try { collect { value -> emitter.onNext(value) } emitter.onComplete() @@ -120,7 +127,14 @@ public fun Flow.asObservable() : Observable = Observable.create { /** * Converts the given flow to a cold flowable. * The original flow is cancelled when the flowable subscriber is disposed. + * + * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. + * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher + * is used, so calls are performed from an arbitrary thread. */ +@JvmOverloads // binary compatibility @JvmName("from") @ExperimentalCoroutinesApi -public fun Flow.asFlowable(): Flowable = Flowable.fromPublisher(asPublisher()) +public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = + Flowable.fromPublisher(asPublisher(context)) diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt new file mode 100644 index 0000000000..a73fee469e --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Test +import org.reactivestreams.* +import java.util.concurrent.* +import kotlin.test.* + +@Suppress("ReactiveStreamsSubscriberImplementation") +class FlowAsFlowableTest : TestBase() { + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asFlowable().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsFlowableTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onSubscribe(s: Subscription) { + expect(2) + subscription = s + subscription.request(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt index 50c4ae7dad..5759f9f426 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt @@ -4,9 +4,12 @@ package kotlinx.coroutines.rx3 +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class FlowAsObservableTest : TestBase() { @@ -139,4 +142,70 @@ class FlowAsObservableTest : TestBase() { observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) }) } + + @Test + fun testUnconfinedDefaultContext() { + expect(1) + val thread = Thread.currentThread() + fun checkThread() { + assertSame(thread, Thread.currentThread()) + } + flowOf(42).asObservable().subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + finish(5) + } + + @Test + fun testConfinedContext() { + expect(1) + val threadName = "FlowAsObservableTest.testConfinedContext" + fun checkThread() { + val currentThread = Thread.currentThread() + assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") + } + val completed = CountDownLatch(1) + newSingleThreadContext(threadName).use { dispatcher -> + flowOf(42).asObservable(dispatcher).subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + } + + override fun onNext(t: Int) { + checkThread() + expect(3) + assertEquals(42, t) + } + + override fun onComplete() { + checkThread() + expect(4) + completed.countDown() + } + + override fun onError(e: Throwable) { + expectUnreached() + } + }) + completed.await() + } + finish(5) + } }