Skip to content

Commit 6810745

Browse files
committed
Request elements in batches in ReactiveFlow to avoid requesting elements one by one in a default configuration
Also, partially masks #1766
1 parent 642989e commit 6810745

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

Diff for: reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+10-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import kotlin.coroutines.*
1818
* Transforms the given reactive [Publisher] into [Flow].
1919
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
2020
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
21-
* `1` is used by default.
21+
* [buffer] default capacity is used by default.
2222
*
23-
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
23+
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
2424
* are discarded.
2525
*
2626
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
@@ -40,16 +40,23 @@ public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
4040
private class PublisherAsFlow<T : Any>(
4141
private val publisher: Publisher<T>,
4242
context: CoroutineContext = EmptyCoroutineContext,
43-
capacity: Int = 1
43+
capacity: Int = Channel.BUFFERED
4444
) : ChannelFlow<T>(context, capacity) {
4545
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
4646
PublisherAsFlow(publisher, context, capacity)
4747

48+
/*
49+
* Suppress for Channel.CHANNEL_DEFAULT_CAPACITY.
50+
* It's too counter-intuitive to be public and moving it to Flow companion
51+
* will also create undesired effect.
52+
*/
53+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
4854
private val requestSize: Long
4955
get() = when (capacity) {
5056
Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming
5157
Channel.RENDEZVOUS -> 1L // need to request at least one anyway
5258
Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE
59+
Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong()
5360
else -> capacity.toLong().also { check(it >= 1) }
5461
}
5562

Diff for: reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt

+38-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,43 @@ class PublisherAsFlowTest : TestBase() {
5656
expect(6)
5757
}
5858

59+
publisher.asFlow().buffer(1).collect {
60+
expect(it)
61+
}
62+
63+
finish(8)
64+
}
65+
66+
@Test
67+
fun testBufferSizeDefault() = runTest {
68+
val publisher = publish(currentDispatcher()) {
69+
repeat(64) {
70+
send(it + 1)
71+
expect(it + 1)
72+
}
73+
assertFalse { offer(-1) }
74+
}
75+
5976
publisher.asFlow().collect {
77+
expect(64 + it)
78+
}
79+
80+
finish(129)
81+
}
82+
83+
@Test
84+
fun testDefaultCapacityIsProperlyOverwritten() = runTest {
85+
val publisher = publish(currentDispatcher()) {
86+
expect(1)
87+
send(3)
88+
expect(2)
89+
send(5)
90+
expect(4)
91+
send(7)
92+
expect(6)
93+
}
94+
95+
publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect {
6096
expect(it)
6197
}
6298

@@ -126,7 +162,7 @@ class PublisherAsFlowTest : TestBase() {
126162
else -> expectUnreached()
127163
}
128164
}
129-
}.asFlow()
165+
}.asFlow().buffer(1)
130166
assertFailsWith<TestException> {
131167
coroutineScope {
132168
expect(2)
@@ -145,4 +181,4 @@ class PublisherAsFlowTest : TestBase() {
145181
}
146182
finish(6)
147183
}
148-
}
184+
}

0 commit comments

Comments
 (0)