Skip to content

Decouple asFlow from batchSize and move it to buffer instead, promote… #1279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -909,11 +909,31 @@ public final class kotlinx/coroutines/flow/MigrationKt {
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
}

public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
public final field capacity I
public final field context Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
public fun additionalToStringProps ()Ljava/lang/String;
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public fun toString ()Ljava/lang/String;
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}

public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {
public fun <init> (Lkotlinx/coroutines/channels/SendChannel;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
public synthetic fun <init> (II)V
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

11 changes: 8 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import kotlin.jvm.*
internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)

// Operators that use channels extend this ChannelFlow and are always fused with each other
internal abstract class ChannelFlow<T>(
/**
* Operators that use channels extend this ChannelFlow and are always fused with each other.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
// upstream context
@JvmField val context: CoroutineContext,
// buffer capacity between upstream and downstream context
Expand Down Expand Up @@ -62,7 +67,7 @@ internal abstract class ChannelFlow<T>(
fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
scope.broadcast(context, produceCapacity, start, block = collectToFun)

fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.flowProduce(context, produceCapacity, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>) =
Expand Down
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.flow.internal

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.ArrayChannel
import kotlinx.coroutines.flow.*
Expand All @@ -17,8 +18,13 @@ internal fun <T> FlowCollector<T>.asConcurrentFlowCollector(): ConcurrentFlowCol
// Two basic implementations are here: SendingCollector and ConcurrentFlowCollector
internal interface ConcurrentFlowCollector<T> : FlowCollector<T>

// Concurrent collector because it sends to a channel
internal class SendingCollector<T>(
/**
* Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : ConcurrentFlowCollector<T> {
override suspend fun emit(value: T) = channel.send(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ private class FlowCoroutine<T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
Expand All @@ -81,7 +80,6 @@ private class FlowProduceCoroutine<T>(
parentContext: CoroutineContext,
channel: Channel<T>
) : ProducerCoroutine<T>(parentContext, channel) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactive/src/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T>(
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
internal class SubscriptionChannel<T>(
private val request: Int
) : LinkedListChannel<T>(), Subscriber<T> {
init {
Expand Down
128 changes: 93 additions & 35 deletions reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,124 @@ package kotlinx.coroutines.reactive.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import kotlin.coroutines.*

/**
* Transforms the given reactive [Publisher] into [Flow].
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
* and [Subscription.request] size.
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
* `1` is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)

@FlowPreview
@JvmOverloads // For nice Java API
@JvmName("from")
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int = 1): Flow<T> =
PublisherAsFlow(this, batchSize)
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
)
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)

private class PublisherAsFlow<T : Any>(private val publisher: Publisher<T>, private val batchSize: Int) : Flow<T> {
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(publisher, capacity)

override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
// use another channel for conflation (cannot do openSubscription)
if (capacity < 0) return super.produceImpl(scope)
// Open subscription channel directly
val channel = publisher.openSubscription(capacity)
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
channel.cancel(cause?.let {
it as? CancellationException ?: CancellationException("Job was cancelled", it)
})
}
if (handle != null && handle !== NonDisposableHandle) {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
(channel as SubscriptionChannel<*>).invokeOnClose {
handle.dispose()
}
}
return channel
}

private val requestSize: Long
get() = when (capacity) {
Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming
Channel.RENDEZVOUS -> 1L // need to request at least one anyway
Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE
else -> capacity.toLong().also { check(it >= 1) }
}

override suspend fun collect(collector: FlowCollector<T>) {
val channel = Channel<T>(batchSize)
val subscriber = ReactiveSubscriber(channel, batchSize)
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
publisher.subscribe(subscriber)
try {
var consumed = 0
for (i in channel) {
collector.emit(i)
if (++consumed == batchSize) {
consumed = 0
subscriber.subscription.request(batchSize.toLong())
var consumed = 0L
while (true) {
val value = subscriber.takeNextOrNull() ?: break
collector.emit(value)
if (++consumed == requestSize) {
consumed = 0L
subscriber.makeRequest()
}
}
} finally {
subscriber.subscription.cancel()
subscriber.cancel()
}
}

@Suppress("SubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
private val channel: Channel<T>,
private val batchSize: Int
) : Subscriber<T> {
// The second channel here is used only for broadcast
override suspend fun collectTo(scope: ProducerScope<T>) =
collect(SendingCollector(scope.channel))
}

lateinit var subscription: Subscription
@Suppress("SubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
capacity: Int,
private val requestSize: Long
) : Subscriber<T> {
private lateinit var subscription: Subscription
private val channel = Channel<T>(capacity)

override fun onComplete() {
channel.close()
}
suspend fun takeNextOrNull(): T? = channel.receiveOrNull()

override fun onSubscribe(s: Subscription) {
subscription = s
s.request(batchSize.toLong())
}
override fun onNext(value: T) {
// Controlled by requestSize
require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
}

override fun onNext(t: T) {
// Controlled by batch-size
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
}
override fun onComplete() {
channel.close()
}

override fun onError(t: Throwable?) {
channel.close(t)
}
override fun onError(t: Throwable?) {
channel.close(t)
}

override fun onSubscribe(s: Subscription) {
subscription = s
makeRequest()
}

fun makeRequest() {
subscription.request(requestSize)
}

fun cancel() {
subscription.cancel()
}
}
106 changes: 105 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package kotlinx.coroutines.reactive.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import kotlin.test.*

class PublisherAsFlowTest : TestBase() {

@Test
fun testCancellation() = runTest {
var onNext = 0
Expand Down Expand Up @@ -42,4 +42,108 @@ class PublisherAsFlowTest : TestBase() {
assertEquals(1, onError)
assertEquals(1, onCancelled)
}

@Test
fun testBufferSize1() = runTest {
val publisher = publish {
expect(1)
send(3)

expect(2)
send(5)

expect(4)
send(7)
expect(6)
}

publisher.asFlow().collect {
expect(it)
}

finish(8)
}

@Test
fun testBufferSize10() = runTest {
val publisher = publish {
expect(1)
send(5)

expect(2)
send(6)

expect(3)
send(7)
expect(4)
}

publisher.asFlow().buffer(10).collect {
expect(it)
}

finish(8)
}

@Test
fun testConflated() = runTest {
val publisher = publish {
for (i in 1..5) send(i)
}
val list = publisher.asFlow().conflate().toList()
assertEquals(listOf(1, 5), list)
}

@Test
fun testProduce() = runTest {
val flow = publish { repeat(10) { send(it) } }.asFlow()
check((0..9).toList(), flow.produceIn(this))
check((0..9).toList(), flow.buffer(2).produceIn(this))
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
check(listOf(0, 9), flow.conflate().produceIn(this))
}

private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
val result = ArrayList<Int>(10)
channel.consumeEach { result.add(it) }
assertEquals(expected, result)
}

@Test
fun testProduceCancellation() = runTest {
expect(1)
// publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
val flow = publish {
expect(3)
repeat(10) { value ->
when (value) {
in 0..6 -> send(value)
7 -> try {
send(value)
} catch (e: CancellationException) {
finish(6)
throw e
}
else -> expectUnreached()
}
}
}.asFlow()
assertFailsWith<TestException> {
coroutineScope {
expect(2)
val channel = flow.produceIn(this)
channel.consumeEach { value ->
when (value) {
in 0..4 -> {}
5 -> {
expect(4)
throw TestException()
}
else -> expectUnreached()
}
}
}
}
expect(5)
}
}
Loading