From bd0034fc87aff6823a6e7bb6aa3a89b935efe71a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 19 Jun 2019 15:50:13 +0300 Subject: [PATCH 1/3] Decouple asFlow from batchSize and move it to buffer instead, promote it to experimental --- .../kotlinx-coroutines-core.txt | 15 +++ .../kotlinx-coroutines-reactive.txt | 1 - .../common/src/flow/internal/ChannelFlow.kt | 3 +- .../src/flow/PublisherAsFlow.kt | 105 +++++++++++++----- .../test/flow/PublisherAsFlowTest.kt | 57 ++++++++++ .../test/flow/RangePublisherBufferedTest.kt | 30 +++++ 6 files changed, 179 insertions(+), 32 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 365df9bd55..ff95a0d1ec 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -909,6 +909,21 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V } +public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow { + public final field capacity I + public final field context Lkotlin/coroutines/CoroutineContext; + public fun (Lkotlin/coroutines/CoroutineContext;I)V + public fun additionalToStringProps ()Ljava/lang/String; + public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel; + public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; + public final fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; + public fun toString ()Ljava/lang/String; + public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; + public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow; +} + public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector { public fun (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 7191ca10a3..2afa3136cc 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -31,6 +31,5 @@ public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt { public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt { public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; - public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index 57a0132ff5..30a3522fde 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -16,7 +16,8 @@ internal fun Flow.asChannelFlow(): ChannelFlow = this as? ChannelFlow ?: ChannelFlowOperatorImpl(this) // Operators that use channels extend this ChannelFlow and are always fused with each other -internal abstract class ChannelFlow( +@InternalCoroutinesApi +public abstract class ChannelFlow( // upstream context @JvmField val context: CoroutineContext, // buffer capacity between upstream and downstream context diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt index e2b23c98b5..93a5c70117 100644 --- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt @@ -7,66 +7,111 @@ package kotlinx.coroutines.reactive.flow import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.internal.* import org.reactivestreams.* +import kotlin.coroutines.* /** * Transforms the given reactive [Publisher] into [Flow]. - * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements - * and [Subscription.request] size. + * Use [buffer] operator on the resulting flow to specify the size of the backpressure. + * More precisely, to it specifies the value of the subscription's [request][Subscription.request]. + * `1` is used by default. * * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. */ +@JvmName("from") +@ExperimentalCoroutinesApi +public fun Publisher.asFlow(): Flow = + PublisherAsFlow(this, 1) + @FlowPreview -@JvmOverloads // For nice Java API @JvmName("from") -public fun Publisher.asFlow(batchSize: Int = 1): Flow = - PublisherAsFlow(this, batchSize) +@Deprecated( + message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"]) +) +public fun Publisher.asFlow(batchSize: Int): Flow = asFlow().buffer(batchSize) -private class PublisherAsFlow(private val publisher: Publisher, private val batchSize: Int) : Flow { + +private class PublisherAsFlow( + private val publisher: Publisher, capacity: Int +) : ChannelFlow( + EmptyCoroutineContext, + capacity +) { + + override fun create(context: CoroutineContext, capacity: Int): ChannelFlow { + return PublisherAsFlow(publisher, capacity) + } + + /* + * It's possible to get rid of the second channel here, but it requires intrusive changes in ChannelFlow. + * Do it after API stabilization (including produceIn/broadcastIn). + */ + override suspend fun collectTo(scope: ProducerScope) = collect { scope.send(it) } override suspend fun collect(collector: FlowCollector) { - val channel = Channel(batchSize) - val subscriber = ReactiveSubscriber(channel, batchSize) + val channel = Channel(capacity) + val subscriber = ReactiveSubscriber(channel, capacity) publisher.subscribe(subscriber) try { var consumed = 0 for (i in channel) { collector.emit(i) - if (++consumed == batchSize) { + if (++consumed == capacity) { + consumed = 0 + subscriber.subscription.request(capacity.toLong()) + } + } + } finally { + subscriber.subscription.cancel() + } + } + + private suspend inline fun collect(emit: (element: T) -> Unit) { + val channel = Channel(capacity) + val subscriber = ReactiveSubscriber(channel, capacity) + publisher.subscribe(subscriber) + try { + var consumed = 0 + for (i in channel) { + emit(i) + if (++consumed == capacity) { consumed = 0 - subscriber.subscription.request(batchSize.toLong()) + subscriber.subscription.request(capacity.toLong()) } } } finally { subscriber.subscription.cancel() } } +} - @Suppress("SubscriberImplementation") - private class ReactiveSubscriber( - private val channel: Channel, - private val batchSize: Int - ) : Subscriber { +@Suppress("SubscriberImplementation") +private class ReactiveSubscriber( + private val channel: Channel, + private val requestSize: Int +) : Subscriber { - lateinit var subscription: Subscription + lateinit var subscription: Subscription - override fun onComplete() { - channel.close() - } + override fun onComplete() { + channel.close() + } - override fun onSubscribe(s: Subscription) { - subscription = s - s.request(batchSize.toLong()) - } + override fun onSubscribe(s: Subscription) { + subscription = s + s.request(requestSize.toLong()) + } - override fun onNext(t: T) { - // Controlled by batch-size - require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" } - } + override fun onNext(t: T) { + // Controlled by requestSize + require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" } + } - override fun onError(t: Throwable?) { - channel.close(t) - } + override fun onError(t: Throwable?) { + channel.close(t) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt index 6c3501df1c..3e4d0f8e4b 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive.flow import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import kotlin.test.* @@ -42,4 +43,60 @@ class PublisherAsFlowTest : TestBase() { assertEquals(1, onError) assertEquals(1, onCancelled) } + + @Test + fun testBufferSize1() = runTest { + val publisher = publish { + expect(1) + send(3) + + expect(2) + send(5) + + expect(4) + send(7) + expect(6) + } + + publisher.asFlow().collect { + expect(it) + } + + finish(8) + } + + @Test + fun testBufferSize10() = runTest { + val publisher = publish { + expect(1) + send(5) + + expect(2) + send(6) + + expect(3) + send(7) + expect(4) + } + + publisher.asFlow().buffer(10).collect { + expect(it) + } + + finish(8) + } + + @Test + fun testProduce() = runTest { + val flow = publish { repeat(10) { send(it) } }.asFlow() + check(flow.produceIn(this)) + check(flow.buffer(2).produceIn(this)) + check(flow.buffer(Channel.UNLIMITED).produceIn(this)) + } + + private suspend fun check(channel: ReceiveChannel) { + val result = ArrayList(10) + channel.consumeEach { result.add(it) } + assertEquals((0..9).toList(), result) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt new file mode 100644 index 0000000000..24be62554f --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive.flow + +import kotlinx.coroutines.flow.* +import org.junit.* +import org.reactivestreams.* +import org.reactivestreams.example.unicast.* +import org.reactivestreams.tck.* + +class RangePublisherBufferedTest : PublisherVerification(TestEnvironment(50, 50)) { + + override fun createPublisher(elements: Long): Publisher { + return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher() + } + + override fun createFailedPublisher(): Publisher? { + return null + } + + @Ignore + override fun required_spec309_requestZeroMustSignalIllegalArgumentException() { + } + + @Ignore + override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + } +} \ No newline at end of file From ac8c2799a59d9b0850df66c667b45cd342d695af Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 20 Jun 2019 00:01:05 -0700 Subject: [PATCH 2/3] Optimize Publisher.asFlow and fix conflation * Fixed the case of Publisher.asFlow().conflate(). * Fixed the cast of Publisher.asFlow().buffer(UNLIMITED) to request Long.MAX_VALUE items as a reactive-streams way to indicate "all" * Use channel.receiveOrNull instead of for loop iteration (it is more efficient) * Calling Publisher.asFlow().produceIn(...) uses a single channel and is implemented via Publisher.openSubscription() * Added tests for conflation and for produceIn cancellation --- .../kotlinx-coroutines-core.txt | 7 +- .../common/src/flow/internal/ChannelFlow.kt | 8 +- .../common/src/flow/internal/Concurrent.kt | 10 +- .../common/src/flow/internal/FlowCoroutine.kt | 2 - .../src/Channel.kt | 4 +- .../src/flow/PublisherAsFlow.kt | 111 ++++++++++-------- .../test/flow/PublisherAsFlowTest.kt | 59 +++++++++- .../test/flow/RangePublisherBufferedTest.kt | 5 +- 8 files changed, 140 insertions(+), 66 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index ff95a0d1ec..0fee45a185 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -918,7 +918,7 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; - public final fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; + public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; public fun toString ()Ljava/lang/String; public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow; @@ -929,6 +929,11 @@ public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coro public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector { + public fun (Lkotlinx/coroutines/channels/SendChannel;)V + public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher { public synthetic fun (II)V public synthetic fun (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index 30a3522fde..2839f65d09 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -15,7 +15,11 @@ import kotlin.jvm.* internal fun Flow.asChannelFlow(): ChannelFlow = this as? ChannelFlow ?: ChannelFlowOperatorImpl(this) -// Operators that use channels extend this ChannelFlow and are always fused with each other +/** + * Operators that use channels extend this ChannelFlow and are always fused with each other. + * + * @suppress **This an internal API and should not be used from general code.** + */ @InternalCoroutinesApi public abstract class ChannelFlow( // upstream context @@ -63,7 +67,7 @@ public abstract class ChannelFlow( fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel = scope.broadcast(context, produceCapacity, start, block = collectToFun) - fun produceImpl(scope: CoroutineScope): ReceiveChannel = + open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.flowProduce(context, produceCapacity, block = collectToFun) override suspend fun collect(collector: FlowCollector) = diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt index 6119d3dbc1..f37cc1caec 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.flow.internal import kotlinx.atomicfu.* +import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.ArrayChannel import kotlinx.coroutines.flow.* @@ -17,8 +18,13 @@ internal fun FlowCollector.asConcurrentFlowCollector(): ConcurrentFlowCol // Two basic implementations are here: SendingCollector and ConcurrentFlowCollector internal interface ConcurrentFlowCollector : FlowCollector -// Concurrent collector because it sends to a channel -internal class SendingCollector( +/** + * Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently. + * + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public class SendingCollector( private val channel: SendChannel ) : ConcurrentFlowCollector { override suspend fun emit(value: T) = channel.send(value) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt index 98f5cec597..258869f135 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -70,7 +70,6 @@ private class FlowCoroutine( context: CoroutineContext, uCont: Continuation ) : ScopeCoroutine(context, uCont) { - public override fun childCancelled(cause: Throwable): Boolean { if (cause is ChildCancelledException) return true return cancelImpl(cause) @@ -81,7 +80,6 @@ private class FlowProduceCoroutine( parentContext: CoroutineContext, channel: Channel ) : ProducerCoroutine(parentContext, channel) { - public override fun childCancelled(cause: Throwable): Boolean { if (cause is ChildCancelledException) return true return cancelImpl(cause) diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index d589a0d0c8..89487ec911 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -40,8 +40,8 @@ public suspend inline fun Publisher.consumeEach(action: (T) -> Unit) = public suspend inline fun Publisher.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) -@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") -private class SubscriptionChannel( +@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation") +internal class SubscriptionChannel( private val request: Int ) : LinkedListChannel(), Subscriber { init { diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt index 93a5c70117..17a232f39c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.reactive.* import org.reactivestreams.* import kotlin.coroutines.* @@ -34,84 +35,96 @@ public fun Publisher.asFlow(): Flow = ) public fun Publisher.asFlow(batchSize: Int): Flow = asFlow().buffer(batchSize) - private class PublisherAsFlow( - private val publisher: Publisher, capacity: Int -) : ChannelFlow( - EmptyCoroutineContext, - capacity -) { - - override fun create(context: CoroutineContext, capacity: Int): ChannelFlow { - return PublisherAsFlow(publisher, capacity) + private val publisher: Publisher, + capacity: Int +) : ChannelFlow(EmptyCoroutineContext, capacity) { + override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = + PublisherAsFlow(publisher, capacity) + + override fun produceImpl(scope: CoroutineScope): ReceiveChannel { + // use another channel for conflation (cannot do openSubscription) + if (capacity < 0) return super.produceImpl(scope) + // Open subscription channel directly + val channel = publisher.openSubscription(capacity) + val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause -> + channel.cancel(cause?.let { + it as? CancellationException ?: CancellationException("Job was cancelled", it) + }) + } + if (handle != null && handle !== NonDisposableHandle) { + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + (channel as SubscriptionChannel<*>).invokeOnClose { + handle.dispose() + } + } + return channel } - /* - * It's possible to get rid of the second channel here, but it requires intrusive changes in ChannelFlow. - * Do it after API stabilization (including produceIn/broadcastIn). - */ - override suspend fun collectTo(scope: ProducerScope) = collect { scope.send(it) } + private val requestSize: Long + get() = when (capacity) { + Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming + Channel.RENDEZVOUS -> 1L // need to request at least one anyway + Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE + else -> capacity.toLong().also { check(it >= 1) } + } override suspend fun collect(collector: FlowCollector) { - val channel = Channel(capacity) - val subscriber = ReactiveSubscriber(channel, capacity) + val subscriber = ReactiveSubscriber(capacity, requestSize) publisher.subscribe(subscriber) try { - var consumed = 0 - for (i in channel) { - collector.emit(i) - if (++consumed == capacity) { - consumed = 0 - subscriber.subscription.request(capacity.toLong()) + var consumed = 0L + while (true) { + val value = subscriber.takeNextOrNull() ?: break + collector.emit(value) + if (++consumed == requestSize) { + consumed = 0L + subscriber.makeRequest() } } } finally { - subscriber.subscription.cancel() + subscriber.cancel() } } - private suspend inline fun collect(emit: (element: T) -> Unit) { - val channel = Channel(capacity) - val subscriber = ReactiveSubscriber(channel, capacity) - publisher.subscribe(subscriber) - try { - var consumed = 0 - for (i in channel) { - emit(i) - if (++consumed == capacity) { - consumed = 0 - subscriber.subscription.request(capacity.toLong()) - } - } - } finally { - subscriber.subscription.cancel() - } - } + // The second channel here is used only for broadcast + override suspend fun collectTo(scope: ProducerScope) = + collect(SendingCollector(scope.channel)) } @Suppress("SubscriberImplementation") private class ReactiveSubscriber( - private val channel: Channel, - private val requestSize: Int + capacity: Int, + private val requestSize: Long ) : Subscriber { + private lateinit var subscription: Subscription + private val channel = Channel(capacity) + + suspend fun takeNextOrNull(): T? = channel.receiveOrNull() - lateinit var subscription: Subscription + override fun onNext(value: T) { + // Controlled by requestSize + require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" } + } override fun onComplete() { channel.close() } + override fun onError(t: Throwable?) { + channel.close(t) + } + override fun onSubscribe(s: Subscription) { subscription = s - s.request(requestSize.toLong()) + makeRequest() } - override fun onNext(t: T) { - // Controlled by requestSize - require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" } + fun makeRequest() { + subscription.request(requestSize) } - override fun onError(t: Throwable?) { - channel.close(t) + fun cancel() { + subscription.cancel() } } diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt index 3e4d0f8e4b..74f5914d20 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt @@ -11,7 +11,6 @@ import kotlinx.coroutines.reactive.* import kotlin.test.* class PublisherAsFlowTest : TestBase() { - @Test fun testCancellation() = runTest { var onNext = 0 @@ -86,17 +85,65 @@ class PublisherAsFlowTest : TestBase() { finish(8) } + @Test + fun testConflated() = runTest { + val publisher = publish { + for (i in 1..5) send(i) + } + val list = publisher.asFlow().conflate().toList() + assertEquals(listOf(1, 5), list) + } + @Test fun testProduce() = runTest { val flow = publish { repeat(10) { send(it) } }.asFlow() - check(flow.produceIn(this)) - check(flow.buffer(2).produceIn(this)) - check(flow.buffer(Channel.UNLIMITED).produceIn(this)) + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(2).produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check(listOf(0, 9), flow.conflate().produceIn(this)) } - private suspend fun check(channel: ReceiveChannel) { + private suspend fun check(expected: List, channel: ReceiveChannel) { val result = ArrayList(10) channel.consumeEach { result.add(it) } - assertEquals((0..9).toList(), result) + assertEquals(expected, result) + } + + @Test + fun testProduceCancellation() = runTest { + expect(1) + // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled + val flow = publish { + expect(3) + repeat(10) { value -> + when (value) { + in 0..6 -> send(value) + 7 -> try { + send(value) + } catch (e: CancellationException) { + finish(6) + throw e + } + else -> expectUnreached() + } + } + }.asFlow() + assertFailsWith { + coroutineScope { + expect(2) + val channel = flow.produceIn(this) + channel.consumeEach { value -> + when (value) { + in 0..4 -> {} + 5 -> { + expect(4) + throw TestException() + } + else -> expectUnreached() + } + } + } + } + expect(5) } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt index 24be62554f..2ff96eb176 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt @@ -10,8 +10,9 @@ import org.reactivestreams.* import org.reactivestreams.example.unicast.* import org.reactivestreams.tck.* -class RangePublisherBufferedTest : PublisherVerification(TestEnvironment(50, 50)) { - +class RangePublisherBufferedTest : + PublisherVerification(TestEnvironment(50, 50)) +{ override fun createPublisher(elements: Long): Publisher { return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher() } From 5f48e6a5ec5903b5d09dbbc20ce451b8a86c5cd9 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 25 Jun 2019 15:08:58 +0300 Subject: [PATCH 3/3] Make SubscriptionChannel private again --- reactive/kotlinx-coroutines-reactive/src/Channel.kt | 2 +- .../kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 89487ec911..88572a6243 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -41,7 +41,7 @@ public suspend inline fun Publisher.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation") -internal class SubscriptionChannel( +private class SubscriptionChannel( private val request: Int ) : LinkedListChannel(), Subscriber { init { diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt index 17a232f39c..50338de605 100644 --- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt @@ -53,8 +53,7 @@ private class PublisherAsFlow( }) } if (handle != null && handle !== NonDisposableHandle) { - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - (channel as SubscriptionChannel<*>).invokeOnClose { + (channel as SendChannel<*>).invokeOnClose { handle.dispose() } }