From 4c069fc8f365d59102e964912e64787a8e3158b5 Mon Sep 17 00:00:00 2001 From: sokolova Date: Wed, 24 Jul 2019 17:52:11 +0300 Subject: [PATCH 1/7] Reactor coroutine context propagation in more places * Propagation of the coroutine context of await calls into Mono/Flux builder * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher * Flow.asFlux transform Fixes #284 --- .../kotlinx-coroutines-reactive.txt | 27 +++-- .../kotlinx-coroutines-reactor.txt | 9 ++ .../kotlinx-coroutines-reactive/src/Await.kt | 13 ++- .../src/ContextInjector.kt | 14 +++ .../src/FlowAsPublisher.kt | 109 ++++++++++++++++++ .../src/{flow => }/PublisherAsFlow.kt | 19 ++- .../src/flow/FlowAsPublisher.kt | 103 ----------------- .../test/{flow => }/IterableFlowTckTest.kt | 2 +- .../test/{flow => }/PublisherAsFlowTest.kt | 3 +- .../{flow => }/RangePublisherBufferedTest.kt | 2 +- .../test/{flow => }/RangePublisherTest.kt | 2 +- .../UnboundedIntegerIncrementPublisherTest.kt | 2 +- ...otlinx.coroutines.reactive.ContextInjector | 1 + .../src/FlowAsFlux.kt | 29 +++++ .../kotlinx-coroutines-reactor/src/Flux.kt | 2 +- .../src/ReactorContext.kt | 14 +++ .../src/ReactorContextInjector.kt | 26 +++++ .../test/BackpressureTest.kt | 1 - .../test/FlowAsFluxTest.kt | 27 +++++ .../test/ReactorContextTest.kt | 66 ++++++++++- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 9 +- .../test/BackpressureTest.kt | 1 - 22 files changed, 347 insertions(+), 134 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt create mode 100644 reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt rename reactive/kotlinx-coroutines-reactive/src/{flow => }/PublisherAsFlow.kt (85%) delete mode 100644 reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt rename reactive/kotlinx-coroutines-reactive/test/{flow => }/IterableFlowTckTest.kt (98%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/PublisherAsFlowTest.kt (98%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/RangePublisherBufferedTest.kt (95%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/RangePublisherTest.kt (97%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/UnboundedIntegerIncrementPublisherTest.kt (97%) create mode 100644 reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector create mode 100644 reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt create mode 100644 reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt create mode 100644 reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 643f64170d..1b35578255 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt { public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; } +public abstract interface class kotlinx/coroutines/reactive/ContextInjector { + public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; +} + public final class kotlinx/coroutines/reactive/ConvertKt { public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; } +public final class kotlinx/coroutines/reactive/FlowKt { + public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; + public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; + public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; +} + +public final class kotlinx/coroutines/reactive/FlowSubscription : org/reactivestreams/Subscription { + public final field flow Lkotlinx/coroutines/flow/Flow; + public final field subscriber Lorg/reactivestreams/Subscriber; + public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V + public fun cancel ()V + public fun request (J)V +} + public final class kotlinx/coroutines/reactive/PublishKt { public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; @@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt { - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; -} - -public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt { - public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; - public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; -} - diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt index 46b35ed71f..9051a49d56 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt @@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt { public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono; } +public final class kotlinx/coroutines/reactor/FlowKt { + public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux; +} + public final class kotlinx/coroutines/reactor/FluxKt { public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; @@ -28,6 +32,11 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key { } +public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector { + public fun ()V + public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; +} + public final class kotlinx/coroutines/reactor/ReactorContextKt { public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; } diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index d12a6280eb..f7d08e7e2f 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import java.util.* import kotlin.coroutines.* /** @@ -81,6 +82,16 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) // ------------------------ private ------------------------ +// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only. +// If `kotlinx-coroutines-reactor` module is not included, the list is empty. +private val contextInjectors: Array = + ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList().toTypedArray() + +private fun Publisher.injectCoroutineContext(coroutineContext: CoroutineContext) = + contextInjectors.fold(this) { pub, contextInjector -> + contextInjector.injectCoroutineContext(pub, coroutineContext) + } + private enum class Mode(val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), @@ -93,7 +104,7 @@ private suspend fun Publisher.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Subscriber { + injectCoroutineContext(cont.context).subscribe(object : Subscriber { private lateinit var subscription: Subscription private var value: T? = null private var seenValue = false diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt new file mode 100644 index 0000000000..77181ba2ba --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt @@ -0,0 +1,14 @@ +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.InternalCoroutinesApi +import org.reactivestreams.Publisher +import kotlin.coroutines.CoroutineContext + +/** @suppress */ +@InternalCoroutinesApi +public interface ContextInjector { + /** + * Injects the coroutine context into the context of the publisher. + */ + public fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt new file mode 100644 index 0000000000..429977a575 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.reactivestreams.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +/** + * Transforms the given flow to a spec-compliant [Publisher]. + */ +@ExperimentalCoroutinesApi +public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) + +/** + * Adapter that transforms [Flow] into TCK-complaint [Publisher]. + * [cancel] invocation cancels the original flow. + */ +@Suppress("PublisherImplementation") +private class FlowAsPublisher(private val flow: Flow) : Publisher { + override fun subscribe(subscriber: Subscriber?) { + if (subscriber == null) throw NullPointerException() + subscriber.onSubscribe(FlowSubscription(flow, subscriber)) + } +} + +/** @suppress */ +@InternalCoroutinesApi +public class FlowSubscription( + @JvmField val flow: Flow, + @JvmField val subscriber: Subscriber +) : Subscription { + @Volatile + private var canceled: Boolean = false + private val requested = AtomicLong(0L) + private val producer: AtomicReference?> = AtomicReference() + + // This is actually optimizable + private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { + try { + consumeFlow() + subscriber.onComplete() + } catch (e: Throwable) { + // Failed with real exception, not due to cancellation + if (!coroutineContext[Job]!!.isCancelled) { + subscriber.onError(e) + } + } + } + + private suspend fun consumeFlow() { + flow.collect { value -> + if (!coroutineContext.isActive) { + subscriber.onComplete() + coroutineContext.ensureActive() + } + + if (requested.get() == 0L) { + suspendCancellableCoroutine { + producer.set(it) + if (requested.get() != 0L) it.resumeSafely() + } + } + + requested.decrementAndGet() + subscriber.onNext(value) + } + } + + override fun cancel() { + canceled = true + job.cancel() + } + + override fun request(n: Long) { + if (n <= 0) { + return + } + + if (canceled) return + + job.start() + var snapshot: Long + var newValue: Long + do { + snapshot = requested.get() + newValue = snapshot + n + if (newValue <= 0L) newValue = Long.MAX_VALUE + } while (!requested.compareAndSet(snapshot, newValue)) + + val prev = producer.get() + if (prev == null || !producer.compareAndSet(prev, null)) return + prev.resumeSafely() + } + + private fun CancellableContinuation.resumeSafely() { + val token = tryResume(Unit) + if (token != null) { + completeResume(token) + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt similarity index 85% rename from reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt rename to reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt index 50338de605..a8a4b6873c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt @@ -2,14 +2,17 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactive import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.reactive.* import org.reactivestreams.* +import java.util.* import kotlin.coroutines.* /** @@ -21,13 +24,11 @@ import kotlin.coroutines.* * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. */ -@JvmName("from") @ExperimentalCoroutinesApi public fun Publisher.asFlow(): Flow = PublisherAsFlow(this, 1) @FlowPreview -@JvmName("from") @Deprecated( message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.ERROR, @@ -70,7 +71,7 @@ private class PublisherAsFlow( override suspend fun collect(collector: FlowCollector) { val subscriber = ReactiveSubscriber(capacity, requestSize) - publisher.subscribe(subscriber) + publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber) try { var consumed = 0L while (true) { @@ -127,3 +128,11 @@ private class ReactiveSubscriber( subscription.cancel() } } + +// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only. +// If `kotlinx-coroutines-reactor` module is not included, the list is empty. +private val contextInjectors: List = + ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList() + +private fun Publisher.injectCoroutineContext(coroutineContext: CoroutineContext) = + contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt deleted file mode 100644 index 05f2391e36..0000000000 --- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.reactive.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.reactivestreams.* -import java.util.concurrent.atomic.* -import kotlin.coroutines.* - -/** - * Transforms the given flow to a spec-compliant [Publisher]. - */ -@JvmName("from") -@ExperimentalCoroutinesApi -public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) - -/** - * Adapter that transforms [Flow] into TCK-complaint [Publisher]. - * [cancel] invocation cancels the original flow. - */ -@Suppress("PublisherImplementation") -private class FlowAsPublisher(private val flow: Flow) : Publisher { - - override fun subscribe(subscriber: Subscriber?) { - if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe(FlowSubscription(flow, subscriber)) - } - - private class FlowSubscription(val flow: Flow, val subscriber: Subscriber) : Subscription { - @Volatile - internal var canceled: Boolean = false - private val requested = AtomicLong(0L) - private val producer: AtomicReference?> = AtomicReference() - - // This is actually optimizable - private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { - try { - consumeFlow() - subscriber.onComplete() - } catch (e: Throwable) { - // Failed with real exception, not due to cancellation - if (!coroutineContext[Job]!!.isCancelled) { - subscriber.onError(e) - } - } - } - - private suspend fun consumeFlow() { - flow.collect { value -> - if (!coroutineContext.isActive) { - subscriber.onComplete() - coroutineContext.ensureActive() - } - - if (requested.get() == 0L) { - suspendCancellableCoroutine { - producer.set(it) - if (requested.get() != 0L) it.resumeSafely() - } - } - - requested.decrementAndGet() - subscriber.onNext(value) - } - } - - override fun cancel() { - canceled = true - job.cancel() - } - - override fun request(n: Long) { - if (n <= 0) { - return - } - - if (canceled) return - - job.start() - var snapshot: Long - var newValue: Long - do { - snapshot = requested.get() - newValue = snapshot + n - if (newValue <= 0L) newValue = Long.MAX_VALUE - } while (!requested.compareAndSet(snapshot, newValue)) - - val prev = producer.get() - if (prev == null || !producer.compareAndSet(prev, null)) return - prev.resumeSafely() - } - - private fun CancellableContinuation.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) - } - } - } -} diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt similarity index 98% rename from reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt rename to reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt index 31c5a3c489..5dfd9d537d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt @@ -4,7 +4,7 @@ @file:Suppress("UNCHECKED_CAST") -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* import org.junit.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt similarity index 98% rename from reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt rename to reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index 3f33b33c8b..a37719de64 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -2,12 +2,11 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.reactive.* import kotlin.test.* class PublisherAsFlowTest : TestBase() { diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt similarity index 95% rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt index 2ff96eb176..b710c59064 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* import org.junit.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt similarity index 97% rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt index 1b37ee9974..72d5de5e82 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import org.junit.* import org.reactivestreams.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt similarity index 97% rename from reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt rename to reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt index 9e611008c2..63d444c19e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import org.junit.* import org.reactivestreams.example.unicast.AsyncIterablePublisher diff --git a/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector new file mode 100644 index 0000000000..0097ec3539 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector @@ -0,0 +1 @@ +kotlinx.coroutines.reactor.ReactorContextInjector \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt new file mode 100644 index 0000000000..cb16c38763 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt @@ -0,0 +1,29 @@ +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.reactive.FlowSubscription +import reactor.core.CoreSubscriber +import reactor.core.publisher.Flux + +/** + * Converts the given flow to a cold flux. + * The original flow is cancelled when the flux subscriber is disposed. + */ +@ExperimentalCoroutinesApi +public fun Flow.asFlux(): Flux = FlowAsFlux(this) + +private class FlowAsFlux(private val flow: Flow) : Flux() { + override fun subscribe(subscriber: CoreSubscriber?) { + if (subscriber == null) throw NullPointerException() + subscriber.onSubscribe( + FlowSubscription( + flow.flowOn(subscriber.currentContext().asCoroutineContext()), + subscriber + ) + ) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 316146b578..18b84ac117 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -74,4 +74,4 @@ private fun reactorPublish( val coroutine = PublisherCoroutine(newContext, subscriber) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -} +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 5a4ccd040e..61d75c17a7 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -30,6 +30,20 @@ import kotlin.coroutines.* * .subscribe() * } * ``` + * + * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance + * is propagated into [mono] and [flux] Reactor builders: + * + * ``` + * launch(Context.of("key", "value").asCoroutineContext()) { + * assertEquals(bar().awaitFirst(), "value") + * } + * + * fun bar(): Mono = mono { + * coroutineContext[ReactorContext]!!.context.get("key") + * } + * ``` +} */ @ExperimentalCoroutinesApi public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt new file mode 100644 index 0000000000..b7212b979a --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt @@ -0,0 +1,26 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.reactive.ContextInjector +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.util.context.Context +import kotlin.coroutines.CoroutineContext + +/** @suppress */ +@InternalCoroutinesApi +class ReactorContextInjector : ContextInjector { + /** + * Injects all values from the [ReactorContext] entry of the given coroutine context + * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux]. + */ + override fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher { + val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher + return when(publisher) { + is Mono -> publisher.subscriberContext(reactorContext) + is Flux -> publisher.subscriberContext(reactorContext) + else -> publisher + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt index 120cd72ba9..80feaeb865 100644 --- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.reactive.flow.* import org.junit.Test import reactor.core.publisher.* import kotlin.test.* diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt new file mode 100644 index 0000000000..2f8ce9ac42 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -0,0 +1,27 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* +import kotlinx.coroutines.runBlocking +import org.junit.Test +import reactor.core.publisher.Mono +import reactor.util.context.Context +import kotlin.test.assertEquals + +class FlowAsFluxTest { + @Test + fun testFlowToFluxContextPropagation() = runBlocking { + val flux = flow { + (1..4).forEach { i -> emit(m(i).awaitFirst()) } + } .asFlux() + .subscriberContext(Context.of(1, "1")) + .subscriberContext(Context.of(2, "2", 3, "3", 4, "4")) + var i = 0 + flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) } + } + + private fun m(i: Int): Mono = mono { + val ctx = coroutineContext[ReactorContext]?.context + ctx?.getOrDefault(i, "noValue") + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index 1fb4f0bb64..9e91b4337e 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -3,8 +3,10 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* import org.junit.Test +import reactor.core.publisher.* import reactor.util.context.Context import kotlin.test.assertEquals +import kotlinx.coroutines.flow.* class ReactorContextTest { @Test @@ -14,8 +16,8 @@ class ReactorContextTest { buildString { (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) } } - } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) - .subscriberContext { ctx -> ctx.put(6, "6") } + } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .subscriberContext { ctx -> ctx.put(6, "6") } assertEquals(mono.awaitFirst(), "1234567") } @@ -29,4 +31,64 @@ class ReactorContextTest { var i = 0 flux.subscribe { str -> i++; assertEquals(str, i.toString()) } } + + @Test + fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) { + val result = mono(Context.of(1, "1").asCoroutineContext()) { + val ctx = coroutineContext[ReactorContext]?.context + buildString { + (1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) } + } + } .subscriberContext(Context.of(2, "2")) + .awaitFirst() + assertEquals(result, "123") + } + + @Test + fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) { + assertEquals(m().awaitFirst(), "7") + assertEquals(m().awaitFirstOrDefault("noValue"), "7") + assertEquals(m().awaitFirstOrNull(), "7") + assertEquals(m().awaitFirstOrElse { "noValue" }, "7") + assertEquals(m().awaitLast(), "7") + assertEquals(m().awaitSingle(), "7") + } + + @Test + fun testFluxAwaitContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + assertEquals(f().awaitFirst(), "1") + assertEquals(f().awaitFirstOrDefault("noValue"), "1") + assertEquals(f().awaitFirstOrNull(), "1") + assertEquals(f().awaitFirstOrElse { "noValue" }, "1") + assertEquals(f().awaitLast(), "3") + var i = 0 + f().subscribe { str -> i++; assertEquals(str, i.toString()) } + } + + private fun m(): Mono = mono { + val ctx = coroutineContext[ReactorContext]?.context + ctx?.getOrDefault(7, "noValue") + } + + + private fun f(): Flux = flux { + val ctx = coroutineContext[ReactorContext]?.context + (1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) } + } + + @Test + fun testFlowToFluxContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + var i = 0 + bar().collect { str -> + i++; assertEquals(str, i.toString()) + } + assertEquals(i, 3) + } + + suspend fun bar(): Flow { + return flux { + val ctx = coroutineContext[ReactorContext]!!.context + (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } + }.asFlow() + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index d5678de921..4b12127189 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -8,8 +8,7 @@ import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.reactive.flow.* -import org.reactivestreams.* +import kotlinx.coroutines.reactive.* import kotlin.coroutines.* /** @@ -82,7 +81,7 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): /** * Converts the given flow to a cold observable. - * The original flow is cancelled if the observable subscriber was disposed. + * The original flow is cancelled when the observable subscriber is disposed. */ @JvmName("from") @ExperimentalCoroutinesApi @@ -106,8 +105,8 @@ public fun Flow.asObservable() : Observable = Observable.create { } /** - * Converts the given flow to a cold observable. - * The original flow is cancelled if the flowable subscriber was disposed. + * Converts the given flow to a cold flowable. + * The original flow is cancelled when the flowable subscriber is disposed. */ @JvmName("from") @ExperimentalCoroutinesApi diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt index 1904334144..ed0bc369c0 100644 --- a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt @@ -8,7 +8,6 @@ import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.reactive.flow.* import org.junit.Test import kotlin.test.* From 975ed04e65919d467cd6e37730453fc0bf01d764 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 31 Jul 2019 12:24:56 +0300 Subject: [PATCH 2/7] fixup! Reactor coroutine context propagation in more places --- .../kotlinx-coroutines-reactor.txt | 5 --- .../src/PublisherAsFlow.kt | 4 +- .../src/ReactorContextInjector.kt | 16 +++----- .../test/ReactorContextTest.kt | 37 ++++++++++++++----- 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt index 9051a49d56..20e20baad0 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt @@ -32,11 +32,6 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key { } -public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector { - public fun ()V - public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; -} - public final class kotlinx/coroutines/reactor/ReactorContextKt { public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; } diff --git a/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt index a8a4b6873c..8da106e537 100644 --- a/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt @@ -47,7 +47,9 @@ private class PublisherAsFlow( // use another channel for conflation (cannot do openSubscription) if (capacity < 0) return super.produceImpl(scope) // Open subscription channel directly - val channel = publisher.openSubscription(capacity) + val channel = publisher + .injectCoroutineContext(scope.coroutineContext) + .openSubscription(capacity) val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause -> channel.cancel(cause?.let { it as? CancellationException ?: CancellationException("Job was cancelled", it) diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt index b7212b979a..68309bbcdb 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt @@ -1,16 +1,12 @@ package kotlinx.coroutines.reactor -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.reactive.ContextInjector -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.util.context.Context -import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.reactive.* +import org.reactivestreams.* +import reactor.core.publisher.* +import reactor.util.context.* +import kotlin.coroutines.* -/** @suppress */ -@InternalCoroutinesApi -class ReactorContextInjector : ContextInjector { +internal class ReactorContextInjector : ContextInjector { /** * Injects all values from the [ReactorContext] entry of the given coroutine context * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux]. diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index 9e91b4337e..e9ac200f49 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -1,12 +1,13 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test import reactor.core.publisher.* -import reactor.util.context.Context -import kotlin.test.assertEquals -import kotlinx.coroutines.flow.* +import reactor.util.context.* +import kotlin.test.* class ReactorContextTest { @Test @@ -55,7 +56,9 @@ class ReactorContextTest { } @Test - fun testFluxAwaitContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + fun testFluxAwaitContextPropagation() = runBlocking( + Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext() + ) { assertEquals(f().awaitFirst(), "1") assertEquals(f().awaitFirstOrDefault("noValue"), "1") assertEquals(f().awaitFirstOrNull(), "1") @@ -77,18 +80,32 @@ class ReactorContextTest { } @Test - fun testFlowToFluxContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + fun testFlowToFluxContextPropagation() = runBlocking( + Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext() + ) { var i = 0 + // call "collect" on the converted Flow bar().collect { str -> i++; assertEquals(str, i.toString()) } assertEquals(i, 3) } - suspend fun bar(): Flow { - return flux { - val ctx = coroutineContext[ReactorContext]!!.context - (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } - }.asFlow() + @Test + fun testFlowToFluxDirectContextPropagation() = runBlocking( + Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext() + ) { + var i = 0 + // convert resulting flow to channel using "produceIn" + val channel = bar().produceIn(this) + channel.consumeEach { str -> + i++; assertEquals(str, i.toString()) + } + assertEquals(i, 3) } + + private fun bar(): Flow = flux { + val ctx = coroutineContext[ReactorContext]!!.context + (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } + }.asFlow() } \ No newline at end of file From 595bf11d1b827326282968a79c030dda53069e6f Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 6 Aug 2019 12:49:41 +0300 Subject: [PATCH 3/7] ~Properly handle cancellation in FlowSubscription, handle exceptions from onComplete and onError, make context injector R8-friendly, do not use flowOn for empty context --- .../kotlinx-coroutines-reactive/src/Await.kt | 2 +- .../src/ContextInjector.kt | 3 +- .../src/FlowAsPublisher.kt | 39 ++++----- .../test/FlowAsPublisherTest.kt | 79 +++++++++++++++++++ .../src/FlowAsFlux.kt | 9 +-- .../src/ReactorContext.kt | 2 - 6 files changed, 106 insertions(+), 28 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index f7d08e7e2f..072773a4fe 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -85,7 +85,7 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) // ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only. // If `kotlinx-coroutines-reactor` module is not included, the list is empty. private val contextInjectors: Array = - ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList().toTypedArray() + ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto private fun Publisher.injectCoroutineContext(coroutineContext: CoroutineContext) = contextInjectors.fold(this) { pub, contextInjector -> diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt index 77181ba2ba..45f6553093 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt @@ -8,7 +8,8 @@ import kotlin.coroutines.CoroutineContext @InternalCoroutinesApi public interface ContextInjector { /** - * Injects the coroutine context into the context of the publisher. + * Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher. + * This API used as an indirection layer between `reactive` and `reactor` modules. */ public fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt index 429977a575..1755acd468 100644 --- a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -7,10 +7,11 @@ package kotlinx.coroutines.reactive +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.reactivestreams.* -import java.util.concurrent.atomic.* +import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.* /** @@ -40,7 +41,7 @@ public class FlowSubscription( @Volatile private var canceled: Boolean = false private val requested = AtomicLong(0L) - private val producer: AtomicReference?> = AtomicReference() + private val producer = atomic?>(null) // This is actually optimizable private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { @@ -48,27 +49,32 @@ public class FlowSubscription( consumeFlow() subscriber.onComplete() } catch (e: Throwable) { - // Failed with real exception, not due to cancellation - if (!coroutineContext[Job]!!.isCancelled) { - subscriber.onError(e) + try { + if (e is CancellationException) { + subscriber.onComplete() + } else { + subscriber.onError(e) + } + } catch (e: Throwable) { + // Last ditch report + handleCoroutineException(coroutineContext, e) } } } private suspend fun consumeFlow() { flow.collect { value -> - if (!coroutineContext.isActive) { - subscriber.onComplete() - coroutineContext.ensureActive() - } - + /* + * Flow is scopeless, thus if it's not active, its subscription was cancelled. + * No intermediate "child failed, but flow coroutine is not" states are allowed. + */ + coroutineContext.ensureActive() if (requested.get() == 0L) { suspendCancellableCoroutine { - producer.set(it) + producer.value = it if (requested.get() != 0L) it.resumeSafely() } } - requested.decrementAndGet() subscriber.onNext(value) } @@ -80,12 +86,9 @@ public class FlowSubscription( } override fun request(n: Long) { - if (n <= 0) { + if (n <= 0 || canceled) { return } - - if (canceled) return - job.start() var snapshot: Long var newValue: Long @@ -95,7 +98,7 @@ public class FlowSubscription( if (newValue <= 0L) newValue = Long.MAX_VALUE } while (!requested.compareAndSet(snapshot, newValue)) - val prev = producer.get() + val prev = producer.value if (prev == null || !producer.compareAndSet(prev, null)) return prev.resumeSafely() } @@ -106,4 +109,4 @@ public class FlowSubscription( completeResume(token) } } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt new file mode 100644 index 0000000000..cb11f86a9f --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Test +import org.reactivestreams.* +import kotlin.test.* + +class FlowAsPublisherTest : TestBase() { + + @Test + fun testErrorOnCancellationIsReported() { + expect(1) + flow { + emit(2) + try { + hang { expect(3) } + } finally { + throw TestException() + } + }.asPublisher().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: Subscription?) { + subscription = s!! + subscription.request(2) + } + + override fun onNext(t: Int) { + expect(t) + subscription.cancel() + } + + override fun onError(t: Throwable?) { + assertTrue(t is TestException) + expect(4) + } + }) + finish(5) + } + + @Test + fun testCancellationIsNotReported() { + expect(1) + flow { + emit(2) + hang { expect(3) } + }.asPublisher().subscribe(object : Subscriber { + private lateinit var subscription: Subscription + + override fun onComplete() { + expect(4) + } + + override fun onSubscribe(s: Subscription?) { + subscription = s!! + subscription.request(2) + } + + override fun onNext(t: Int) { + expect(t) + subscription.cancel() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt index cb16c38763..7c6182bf53 100644 --- a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt @@ -19,11 +19,8 @@ public fun Flow.asFlux(): Flux = FlowAsFlux(this) private class FlowAsFlux(private val flow: Flow) : Flux() { override fun subscribe(subscriber: CoreSubscriber?) { if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe( - FlowSubscription( - flow.flowOn(subscriber.currentContext().asCoroutineContext()), - subscriber - ) - ) + val hasContext = subscriber.currentContext().isEmpty + val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow + subscriber.onSubscribe(FlowSubscription(source, subscriber)) } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 61d75c17a7..942ba7b66c 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -33,7 +33,6 @@ import kotlin.coroutines.* * * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance * is propagated into [mono] and [flux] Reactor builders: - * * ``` * launch(Context.of("key", "value").asCoroutineContext()) { * assertEquals(bar().awaitFirst(), "value") @@ -43,7 +42,6 @@ import kotlin.coroutines.* * coroutineContext[ReactorContext]!!.context.get("key") * } * ``` -} */ @ExperimentalCoroutinesApi public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) { From 151b54cb1533e150132471d22216198280b32568 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 6 Aug 2019 12:55:21 +0300 Subject: [PATCH 4/7] ~Use atomicfu instead of AtomicLong for requested counter --- .../kotlinx-coroutines-reactive/src/FlowAsPublisher.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt index 1755acd468..6eee3464f1 100644 --- a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -40,7 +40,7 @@ public class FlowSubscription( ) : Subscription { @Volatile private var canceled: Boolean = false - private val requested = AtomicLong(0L) + private val requested = atomic(0L) private val producer = atomic?>(null) // This is actually optimizable @@ -69,10 +69,10 @@ public class FlowSubscription( * No intermediate "child failed, but flow coroutine is not" states are allowed. */ coroutineContext.ensureActive() - if (requested.get() == 0L) { + if (requested.value == 0L) { suspendCancellableCoroutine { producer.value = it - if (requested.get() != 0L) it.resumeSafely() + if (requested.value != 0L) it.resumeSafely() } } requested.decrementAndGet() @@ -93,7 +93,7 @@ public class FlowSubscription( var snapshot: Long var newValue: Long do { - snapshot = requested.get() + snapshot = requested.value newValue = snapshot + n if (newValue <= 0L) newValue = Long.MAX_VALUE } while (!requested.compareAndSet(snapshot, newValue)) From 3b406ed2e579268cfa6c935101ad877929b82583 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 6 Aug 2019 13:06:01 +0300 Subject: [PATCH 5/7] ~Get rid of cancelled flag --- .../kotlinx-coroutines-reactive/src/FlowAsPublisher.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt index 6eee3464f1..9fc7350f27 100644 --- a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -38,13 +38,11 @@ public class FlowSubscription( @JvmField val flow: Flow, @JvmField val subscriber: Subscriber ) : Subscription { - @Volatile - private var canceled: Boolean = false private val requested = atomic(0L) private val producer = atomic?>(null) // This is actually optimizable - private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { + private val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { try { consumeFlow() subscriber.onComplete() @@ -81,12 +79,11 @@ public class FlowSubscription( } override fun cancel() { - canceled = true job.cancel() } override fun request(n: Long) { - if (n <= 0 || canceled) { + if (n <= 0) { return } job.start() From 629cb04e92fa482b19aa2c9fbb937e6db07c613a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 6 Aug 2019 14:42:57 +0300 Subject: [PATCH 6/7] Optimize FlowSubscription by making it abstract coroutine --- .../kotlinx-coroutines-core.txt | 4 ++++ .../kotlinx-coroutines-reactive.txt | 2 +- .../common/src/intrinsics/Cancellable.kt | 3 ++- .../src/FlowAsPublisher.kt | 21 ++++++++++++------- .../test/FlowAsPublisherTest.kt | 2 +- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 3e20e88bba..3634ab63b0 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -980,6 +980,10 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class kotlinx/coroutines/intrinsics/CancellableKt { + public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V +} + public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher { public synthetic fun (II)V public synthetic fun (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 1b35578255..fb24c874f9 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -29,7 +29,7 @@ public final class kotlinx/coroutines/reactive/FlowKt { public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; } -public final class kotlinx/coroutines/reactive/FlowSubscription : org/reactivestreams/Subscription { +public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription { public final field flow Lkotlinx/coroutines/flow/Flow; public final field subscriber Lorg/reactivestreams/Subscriber; public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt index c442c95a3a..246ae2c2f8 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt @@ -12,7 +12,8 @@ import kotlin.coroutines.intrinsics.* * Use this function to start coroutine in a cancellable way, so that it can be cancelled * while waiting to be dispatched. */ -internal fun (suspend () -> T).startCoroutineCancellable(completion: Continuation) = runSafely(completion) { +@InternalCoroutinesApi +public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation) = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit) } diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt index 9fc7350f27..3cd24e7e00 100644 --- a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -11,8 +11,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.reactivestreams.* -import java.util.concurrent.atomic.AtomicLong -import kotlin.coroutines.* +import kotlinx.coroutines.intrinsics.* /** * Transforms the given flow to a spec-compliant [Publisher]. @@ -37,12 +36,15 @@ private class FlowAsPublisher(private val flow: Flow) : Publisher public class FlowSubscription( @JvmField val flow: Flow, @JvmField val subscriber: Subscriber -) : Subscription { +) : Subscription, AbstractCoroutine(Dispatchers.Unconfined, false) { private val requested = atomic(0L) private val producer = atomic?>(null) - // This is actually optimizable - private val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { + override fun onStart() { + ::flowProcessing.startCoroutineCancellable(this) + } + + private suspend fun flowProcessing() { try { consumeFlow() subscriber.onComplete() @@ -60,6 +62,9 @@ public class FlowSubscription( } } + /* + * This method has at most one caller at any time (triggered from the `request` method) + */ private suspend fun consumeFlow() { flow.collect { value -> /* @@ -67,7 +72,7 @@ public class FlowSubscription( * No intermediate "child failed, but flow coroutine is not" states are allowed. */ coroutineContext.ensureActive() - if (requested.value == 0L) { + if (requested.value <= 0L) { suspendCancellableCoroutine { producer.value = it if (requested.value != 0L) it.resumeSafely() @@ -79,14 +84,14 @@ public class FlowSubscription( } override fun cancel() { - job.cancel() + cancel(null) } override fun request(n: Long) { if (n <= 0) { return } - job.start() + start() var snapshot: Long var newValue: Long do { diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index cb11f86a9f..8633492810 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -50,7 +50,7 @@ class FlowAsPublisherTest : TestBase() { @Test fun testCancellationIsNotReported() { expect(1) - flow { + flow { emit(2) hang { expect(3) } }.asPublisher().subscribe(object : Subscriber { From 07611bda0ce5e963527aa23f6fed947aa6c1541e Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 7 Aug 2019 14:39:12 +0300 Subject: [PATCH 7/7] ~Use AFU primitives instead of handrolled loops --- .../src/FlowAsPublisher.kt | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt index 3cd24e7e00..387c8e7750 100644 --- a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -92,17 +92,12 @@ public class FlowSubscription( return } start() - var snapshot: Long - var newValue: Long - do { - snapshot = requested.value - newValue = snapshot + n - if (newValue <= 0L) newValue = Long.MAX_VALUE - } while (!requested.compareAndSet(snapshot, newValue)) - - val prev = producer.value - if (prev == null || !producer.compareAndSet(prev, null)) return - prev.resumeSafely() + requested.update { value -> + val newValue = value + n + if (newValue <= 0L) Long.MAX_VALUE else newValue + } + val producer = producer.getAndSet(null) ?: return + producer.resumeSafely() } private fun CancellableContinuation.resumeSafely() {