From 6fdb8c55648fdcc1cc7458847acb749738bf5e95 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 27 Jan 2020 16:34:56 +0300 Subject: [PATCH] Request elements in batches in ReactiveFlow to avoid requesting elements one by one in a default configuration Also, partially masks #1766 --- .../src/ReactiveFlow.kt | 13 ++++-- .../test/PublisherAsFlowTest.kt | 40 ++++++++++++++++++- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 5accbf243c..3a81c0696d 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -18,9 +18,9 @@ import kotlin.coroutines.* * Transforms the given reactive [Publisher] into [Flow]. * Use [buffer] operator on the resulting flow to specify the size of the backpressure. * More precisely, it specifies the value of the subscription's [request][Subscription.request]. - * `1` is used by default. + * [buffer] default capacity is used by default. * - * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements + * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements * are discarded. * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, @@ -40,16 +40,23 @@ public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) private class PublisherAsFlow( private val publisher: Publisher, context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = 1 + capacity: Int = Channel.BUFFERED ) : ChannelFlow(context, capacity) { override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = PublisherAsFlow(publisher, context, capacity) + /* + * Suppress for Channel.CHANNEL_DEFAULT_CAPACITY. + * It's too counter-intuitive to be public and moving it to Flow companion + * will also create undesired effect. + */ + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") private val requestSize: Long get() = when (capacity) { Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming Channel.RENDEZVOUS -> 1L // need to request at least one anyway Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE + Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong() else -> capacity.toLong().also { check(it >= 1) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index e458c918a5..61f88f6af3 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -56,7 +56,43 @@ class PublisherAsFlowTest : TestBase() { expect(6) } + publisher.asFlow().buffer(1).collect { + expect(it) + } + + finish(8) + } + + @Test + fun testBufferSizeDefault() = runTest { + val publisher = publish(currentDispatcher()) { + repeat(64) { + send(it + 1) + expect(it + 1) + } + assertFalse { offer(-1) } + } + publisher.asFlow().collect { + expect(64 + it) + } + + finish(129) + } + + @Test + fun testDefaultCapacityIsProperlyOverwritten() = runTest { + val publisher = publish(currentDispatcher()) { + expect(1) + send(3) + expect(2) + send(5) + expect(4) + send(7) + expect(6) + } + + publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect { expect(it) } @@ -126,7 +162,7 @@ class PublisherAsFlowTest : TestBase() { else -> expectUnreached() } } - }.asFlow() + }.asFlow().buffer(1) assertFailsWith { coroutineScope { expect(2) @@ -145,4 +181,4 @@ class PublisherAsFlowTest : TestBase() { } finish(6) } -} \ No newline at end of file +}