Skip to content

Decouple asFlow from batchSize and move it to buffer instead, promote… #1279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,21 @@ public final class kotlinx/coroutines/flow/MigrationKt {
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
}

public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
public final field capacity I
public final field context Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
public fun additionalToStringProps ()Ljava/lang/String;
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public final fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public fun toString ()Ljava/lang/String;
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}

public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)

// Operators that use channels extend this ChannelFlow and are always fused with each other
internal abstract class ChannelFlow<T>(
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
// upstream context
@JvmField val context: CoroutineContext,
// buffer capacity between upstream and downstream context
Expand Down
105 changes: 75 additions & 30 deletions reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,111 @@ package kotlinx.coroutines.reactive.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import org.reactivestreams.*
import kotlin.coroutines.*

/**
* Transforms the given reactive [Publisher] into [Flow].
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
* and [Subscription.request] size.
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
* `1` is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)

@FlowPreview
@JvmOverloads // For nice Java API
@JvmName("from")
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int = 1): Flow<T> =
PublisherAsFlow(this, batchSize)
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
)
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)

private class PublisherAsFlow<T : Any>(private val publisher: Publisher<T>, private val batchSize: Int) : Flow<T> {

private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>, capacity: Int
) : ChannelFlow<T>(
EmptyCoroutineContext,
capacity
) {

override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> {
return PublisherAsFlow(publisher, capacity)
}

/*
* It's possible to get rid of the second channel here, but it requires intrusive changes in ChannelFlow.
* Do it after API stabilization (including produceIn/broadcastIn).
*/
override suspend fun collectTo(scope: ProducerScope<T>) = collect { scope.send(it) }

override suspend fun collect(collector: FlowCollector<T>) {
val channel = Channel<T>(batchSize)
val subscriber = ReactiveSubscriber(channel, batchSize)
val channel = Channel<T>(capacity)
val subscriber = ReactiveSubscriber(channel, capacity)
publisher.subscribe(subscriber)
try {
var consumed = 0
for (i in channel) {
collector.emit(i)
if (++consumed == batchSize) {
if (++consumed == capacity) {
consumed = 0
subscriber.subscription.request(capacity.toLong())
}
}
} finally {
subscriber.subscription.cancel()
}
}

private suspend inline fun collect(emit: (element: T) -> Unit) {
val channel = Channel<T>(capacity)
val subscriber = ReactiveSubscriber(channel, capacity)
publisher.subscribe(subscriber)
try {
var consumed = 0
for (i in channel) {
emit(i)
if (++consumed == capacity) {
consumed = 0
subscriber.subscription.request(batchSize.toLong())
subscriber.subscription.request(capacity.toLong())
}
}
} finally {
subscriber.subscription.cancel()
}
}
}

@Suppress("SubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
private val channel: Channel<T>,
private val batchSize: Int
) : Subscriber<T> {
@Suppress("SubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
private val channel: Channel<T>,
private val requestSize: Int
) : Subscriber<T> {

lateinit var subscription: Subscription
lateinit var subscription: Subscription

override fun onComplete() {
channel.close()
}
override fun onComplete() {
channel.close()
}

override fun onSubscribe(s: Subscription) {
subscription = s
s.request(batchSize.toLong())
}
override fun onSubscribe(s: Subscription) {
subscription = s
s.request(requestSize.toLong())
}

override fun onNext(t: T) {
// Controlled by batch-size
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
}
override fun onNext(t: T) {
// Controlled by requestSize
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
}

override fun onError(t: Throwable?) {
channel.close(t)
}
override fun onError(t: Throwable?) {
channel.close(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import kotlin.test.*
Expand Down Expand Up @@ -42,4 +43,60 @@ class PublisherAsFlowTest : TestBase() {
assertEquals(1, onError)
assertEquals(1, onCancelled)
}

@Test
fun testBufferSize1() = runTest {
val publisher = publish {
expect(1)
send(3)

expect(2)
send(5)

expect(4)
send(7)
expect(6)
}

publisher.asFlow().collect {
expect(it)
}

finish(8)
}

@Test
fun testBufferSize10() = runTest {
val publisher = publish {
expect(1)
send(5)

expect(2)
send(6)

expect(3)
send(7)
expect(4)
}

publisher.asFlow().buffer(10).collect {
expect(it)
}

finish(8)
}

@Test
fun testProduce() = runTest {
val flow = publish { repeat(10) { send(it) } }.asFlow()
check(flow.produceIn(this))
check(flow.buffer(2).produceIn(this))
check(flow.buffer(Channel.UNLIMITED).produceIn(this))
}

private suspend fun check(channel: ReceiveChannel<Int>) {
val result = ArrayList<Int>(10)
channel.consumeEach { result.add(it) }
assertEquals((0..9).toList(), result)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.reactive.flow

import kotlinx.coroutines.flow.*
import org.junit.*
import org.reactivestreams.*
import org.reactivestreams.example.unicast.*
import org.reactivestreams.tck.*

class RangePublisherBufferedTest : PublisherVerification<Int>(TestEnvironment(50, 50)) {

override fun createPublisher(elements: Long): Publisher<Int> {
return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher()
}

override fun createFailedPublisher(): Publisher<Int>? {
return null
}

@Ignore
override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
}

@Ignore
override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
}
}