Skip to content

Commit bd0034f

Browse files
committed
Decouple asFlow from batchSize and move it to buffer instead, promote it to experimental
1 parent 2ace4c6 commit bd0034f

File tree

6 files changed

+179
-32
lines changed

6 files changed

+179
-32
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+15
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,21 @@ public final class kotlinx/coroutines/flow/MigrationKt {
909909
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
910910
}
911911

912+
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
913+
public final field capacity I
914+
public final field context Lkotlin/coroutines/CoroutineContext;
915+
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
916+
public fun additionalToStringProps ()Ljava/lang/String;
917+
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
918+
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
919+
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
920+
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
921+
public final fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
922+
public fun toString ()Ljava/lang/String;
923+
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
924+
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
925+
}
926+
912927
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
913928
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
914929
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,5 @@ public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
3131
public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
3232
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
3333
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
34-
public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
3534
}
3635

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
1616
this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
1717

1818
// Operators that use channels extend this ChannelFlow and are always fused with each other
19-
internal abstract class ChannelFlow<T>(
19+
@InternalCoroutinesApi
20+
public abstract class ChannelFlow<T>(
2021
// upstream context
2122
@JvmField val context: CoroutineContext,
2223
// buffer capacity between upstream and downstream context

reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt

+75-30
Original file line numberDiff line numberDiff line change
@@ -7,66 +7,111 @@ package kotlinx.coroutines.reactive.flow
77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
99
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.flow.internal.*
1011
import org.reactivestreams.*
12+
import kotlin.coroutines.*
1113

1214
/**
1315
* Transforms the given reactive [Publisher] into [Flow].
14-
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
15-
* and [Subscription.request] size.
16+
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
17+
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
18+
* `1` is used by default.
1619
*
1720
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
1821
* are discarded.
1922
*/
23+
@JvmName("from")
24+
@ExperimentalCoroutinesApi
25+
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
26+
PublisherAsFlow(this, 1)
27+
2028
@FlowPreview
21-
@JvmOverloads // For nice Java API
2229
@JvmName("from")
23-
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int = 1): Flow<T> =
24-
PublisherAsFlow(this, batchSize)
30+
@Deprecated(
31+
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
32+
level = DeprecationLevel.ERROR,
33+
replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
34+
)
35+
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)
2536

26-
private class PublisherAsFlow<T : Any>(private val publisher: Publisher<T>, private val batchSize: Int) : Flow<T> {
37+
38+
private class PublisherAsFlow<T : Any>(
39+
private val publisher: Publisher<T>, capacity: Int
40+
) : ChannelFlow<T>(
41+
EmptyCoroutineContext,
42+
capacity
43+
) {
44+
45+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> {
46+
return PublisherAsFlow(publisher, capacity)
47+
}
48+
49+
/*
50+
* It's possible to get rid of the second channel here, but it requires intrusive changes in ChannelFlow.
51+
* Do it after API stabilization (including produceIn/broadcastIn).
52+
*/
53+
override suspend fun collectTo(scope: ProducerScope<T>) = collect { scope.send(it) }
2754

2855
override suspend fun collect(collector: FlowCollector<T>) {
29-
val channel = Channel<T>(batchSize)
30-
val subscriber = ReactiveSubscriber(channel, batchSize)
56+
val channel = Channel<T>(capacity)
57+
val subscriber = ReactiveSubscriber(channel, capacity)
3158
publisher.subscribe(subscriber)
3259
try {
3360
var consumed = 0
3461
for (i in channel) {
3562
collector.emit(i)
36-
if (++consumed == batchSize) {
63+
if (++consumed == capacity) {
64+
consumed = 0
65+
subscriber.subscription.request(capacity.toLong())
66+
}
67+
}
68+
} finally {
69+
subscriber.subscription.cancel()
70+
}
71+
}
72+
73+
private suspend inline fun collect(emit: (element: T) -> Unit) {
74+
val channel = Channel<T>(capacity)
75+
val subscriber = ReactiveSubscriber(channel, capacity)
76+
publisher.subscribe(subscriber)
77+
try {
78+
var consumed = 0
79+
for (i in channel) {
80+
emit(i)
81+
if (++consumed == capacity) {
3782
consumed = 0
38-
subscriber.subscription.request(batchSize.toLong())
83+
subscriber.subscription.request(capacity.toLong())
3984
}
4085
}
4186
} finally {
4287
subscriber.subscription.cancel()
4388
}
4489
}
90+
}
4591

46-
@Suppress("SubscriberImplementation")
47-
private class ReactiveSubscriber<T : Any>(
48-
private val channel: Channel<T>,
49-
private val batchSize: Int
50-
) : Subscriber<T> {
92+
@Suppress("SubscriberImplementation")
93+
private class ReactiveSubscriber<T : Any>(
94+
private val channel: Channel<T>,
95+
private val requestSize: Int
96+
) : Subscriber<T> {
5197

52-
lateinit var subscription: Subscription
98+
lateinit var subscription: Subscription
5399

54-
override fun onComplete() {
55-
channel.close()
56-
}
100+
override fun onComplete() {
101+
channel.close()
102+
}
57103

58-
override fun onSubscribe(s: Subscription) {
59-
subscription = s
60-
s.request(batchSize.toLong())
61-
}
104+
override fun onSubscribe(s: Subscription) {
105+
subscription = s
106+
s.request(requestSize.toLong())
107+
}
62108

63-
override fun onNext(t: T) {
64-
// Controlled by batch-size
65-
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
66-
}
109+
override fun onNext(t: T) {
110+
// Controlled by requestSize
111+
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
112+
}
67113

68-
override fun onError(t: Throwable?) {
69-
channel.close(t)
70-
}
114+
override fun onError(t: Throwable?) {
115+
channel.close(t)
71116
}
72117
}

reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt

+57
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactive.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.reactive.*
1011
import kotlin.test.*
@@ -42,4 +43,60 @@ class PublisherAsFlowTest : TestBase() {
4243
assertEquals(1, onError)
4344
assertEquals(1, onCancelled)
4445
}
46+
47+
@Test
48+
fun testBufferSize1() = runTest {
49+
val publisher = publish {
50+
expect(1)
51+
send(3)
52+
53+
expect(2)
54+
send(5)
55+
56+
expect(4)
57+
send(7)
58+
expect(6)
59+
}
60+
61+
publisher.asFlow().collect {
62+
expect(it)
63+
}
64+
65+
finish(8)
66+
}
67+
68+
@Test
69+
fun testBufferSize10() = runTest {
70+
val publisher = publish {
71+
expect(1)
72+
send(5)
73+
74+
expect(2)
75+
send(6)
76+
77+
expect(3)
78+
send(7)
79+
expect(4)
80+
}
81+
82+
publisher.asFlow().buffer(10).collect {
83+
expect(it)
84+
}
85+
86+
finish(8)
87+
}
88+
89+
@Test
90+
fun testProduce() = runTest {
91+
val flow = publish { repeat(10) { send(it) } }.asFlow()
92+
check(flow.produceIn(this))
93+
check(flow.buffer(2).produceIn(this))
94+
check(flow.buffer(Channel.UNLIMITED).produceIn(this))
95+
}
96+
97+
private suspend fun check(channel: ReceiveChannel<Int>) {
98+
val result = ArrayList<Int>(10)
99+
channel.consumeEach { result.add(it) }
100+
assertEquals((0..9).toList(), result)
101+
}
45102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive.flow
6+
7+
import kotlinx.coroutines.flow.*
8+
import org.junit.*
9+
import org.reactivestreams.*
10+
import org.reactivestreams.example.unicast.*
11+
import org.reactivestreams.tck.*
12+
13+
class RangePublisherBufferedTest : PublisherVerification<Int>(TestEnvironment(50, 50)) {
14+
15+
override fun createPublisher(elements: Long): Publisher<Int> {
16+
return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher()
17+
}
18+
19+
override fun createFailedPublisher(): Publisher<Int>? {
20+
return null
21+
}
22+
23+
@Ignore
24+
override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
25+
}
26+
27+
@Ignore
28+
override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
29+
}
30+
}

0 commit comments

Comments
 (0)