From b3e70a17f7e55b4e7acfd4f396e65da24b7f219a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 21 May 2019 15:33:32 +0300 Subject: [PATCH 1/3] New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method Rationale: * Can be used in different context without breaking context preservation * Can be used to build concurrent operators such as merge * Can be used to integrate with callbacks * Is less error-prone than flowViaChannel because requires explicit await() call Partially fixes #1210 --- .../kotlinx-coroutines-core.txt | 7 + .../common/src/channels/Produce.kt | 26 +++ .../common/src/flow/Builders.kt | 121 ++++++++++---- .../common/test/channels/ProduceTest.kt | 54 ++++++ .../flow/channels/ChannelBuildersFlowTest.kt | 150 +++++++++++++++++ .../test/flow/channels/ChannelFlowTest.kt | 155 +++++------------- .../test/flow/channels/FlowCallbackTest.kt | 48 ++++++ .../test/flow/channels/FlowViaChannelTest.kt | 84 ---------- ...FromChannelTest.kt => CallbackFlowTest.kt} | 24 ++- 9 files changed, 440 insertions(+), 229 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt delete mode 100644 kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt rename kotlinx-coroutines-core/jvm/test/flow/{FlowFromChannelTest.kt => CallbackFlowTest.kt} (81%) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index f986b647d9..4090054c6a 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -718,6 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli } public final class kotlinx/coroutines/channels/ProduceKt { + public static final fun await (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun await$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; @@ -796,6 +798,11 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel; public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel; + public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 9d1df1d430..c1b8a0e592 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -25,6 +25,32 @@ public interface ProducerScope : CoroutineScope, SendChannel { val channel: SendChannel } +/** + * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel] + * and invokes the given [block] before resuming the coroutine. + * + * Example of usage: + * ``` + * val callbackEventsStream = produce { + * val disposable = registerChannelInCallback(channel) + * await { disposable.dispose() } + * } + * ``` + */ +@ExperimentalCoroutinesApi +public suspend fun ProducerScope.await(block: () -> Unit = {}) { + check(kotlin.coroutines.coroutineContext[Job] === this) { "await() can be invoke only from the producer context" } + suspendCancellableCoroutine { cont -> + invokeOnClose { + try { + block() + } finally { + cont.resume(Unit) + } + } + } +} + /** * Launches new coroutine to produce a stream of values by sending them to a channel * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 06a5c00e2f..55ee1c6bda 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -200,38 +200,14 @@ public fun LongRange.asFlow(): Flow = flow { } /** - * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel] - * that is provided to the builder's [block] of code. It allows elements to be - * produced by the code that is running in a different context, e.g. from a callback-based API. - * - * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator - * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder - * should be used instead. - * - * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. - * The provided channel can later be used by any external service to communicate with flow and its buffer determines - * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used). - * - * Example of usage: - * ``` - * fun flowFrom(api: CallbackBasedApi): Flow = flowViaChannel { channel -> - * val callback = object : Callback { // implementation of some callback interface - * override fun onNextValue(value: T) { - * channel.offer(value) // Note: offer drops value when buffer is full - * } - * override fun onApiError(cause: Throwable) { - * channel.cancel("API Error", CancellationException(cause)) - * } - * override fun onCompleted() = channel.close() - * } - * api.register(callback) - * channel.invokeOnClose { - * api.unregister(callback) - * } - * } - * ``` + * @suppress */ @FlowPreview +@Deprecated( + message = "Use channelFlow instead", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("channelFlow(bufferSize, block)") +) public fun flowViaChannel( bufferSize: Int = 16, @BuilderInference block: CoroutineScope.(channel: SendChannel) -> Unit @@ -249,3 +225,88 @@ public fun flowViaChannel( } } } + +/** + * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel] + * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be + * produced by the code that is running in a different context or running concurrently. + * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator + * on the resulting flow. + * + * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts. + * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [await] can be used. + * For more detailed example please refer to [callbackFlow] documentation. + * + * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. + * The provided channel can later be used by any external service to communicate with the flow and its buffer determines + * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used). + * + * Examples of usage: + * ``` + * fun Flow.merge(other: Flow): Flow = channelFlow { + * launch { + * collect { value -> send(value) } + * } + * other.collect { value -> send(value) } + * } + * + * fun contextualFlow(): Flow = channelFlow { + * launch(Dispatchers.IO) { + * send(computeIoValue()) + * } + * + * launch(Dispatchers.Default) { + * send(computeCpuValue()) + * } + * } + * ``` + */ +@FlowPreview +public fun channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope.() -> Unit): Flow = + flow { + coroutineScope { + val channel = produce(capacity = bufferSize, block = block) + channel.consumeEach { value -> + emit(value) + } + } + } + +/** + * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel] + * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be + * produced by the code that is running in a different context or running concurrently. + * + * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator + * on the resulting flow. + * + * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context, + * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the + * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources, + * [await] extension should be used. [await] argument will be invoked when either flow consumer cancels flow collection + * or when callback-based API invokes [SendChannel.close] manually. + * + * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. + * The provided channel can later be used by any external service to communicate with the flow and its buffer determines + * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used). + * + * Example of usage: + * ``` + * fun flowFrom(api: CallbackBasedApi): Flow = callbackFlow { + * val callback = object : Callback { // implementation of some callback interface + * override fun onNextValue(value: T) { + * offer(value) // Note: offer drops value when buffer is full + * } + * override fun onApiError(cause: Throwable) { + * cancel("API Error", CancellationException(cause)) + * } + * override fun onCompleted() = channel.close() + * } + * api.register(callback) + * // Suspend until either onCompleted or external cancellation are invoked + * await { api.unregister(callback) } + * } + * ``` + */ +public inline fun callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope.() -> Unit): Flow = + channelFlow(bufferSize) { block() } diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 5137dd740d..0001ca831d 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -94,6 +94,60 @@ class ProduceTest : TestBase() { cancelOnCompletion(coroutineContext) } + @Test + fun testAwaitConsumerCancellation() = runTest { + val parent = Job() + val channel = produce(parent) { + expect(2) + await { expect(4) } + } + expect(1) + yield() + expect(3) + channel.cancel() + parent.complete() + parent.join() + finish(5) + } + + @Test + fun testAwaitProducerCancellation() = runTest { + val parent = Job() + produce(parent) { + expect(2) + launch { + expect(3) + this@produce.cancel() + } + await { expect(4) } + } + expect(1) + parent.complete() + parent.join() + finish(5) + } + + @Test + fun testAwaitParentCancellation() = runTest { + val parent = Job() + produce(parent) { + expect(2) + await { expect(4) } + } + expect(1) + yield() + expect(3) + parent.cancelAndJoin() + finish(5) + } + + @Test + fun testAwaitIllegalState() = runTest { + val channel = produce { } + @Suppress("RemoveExplicitTypeArguments") // KT-31525 + assertFailsWith { (channel as ProducerScope<*>).await() } + } + private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply { val source = Channel() expect(1) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt new file mode 100644 index 0000000000..3c74b0fb41 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -0,0 +1,150 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class ChannelBuildersFlowTest : TestBase() { + @Test + fun testBroadcastChannelAsFlow() = runTest { + val channel = broadcast { + repeat(10) { + send(it + 1) + } + } + + val sum = channel.asFlow().sum() + assertEquals(55, sum) + } + + @Test + fun testExceptionInBroadcast() = runTest { + expect(1) + val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well + repeat(10) { + send(it + 1) + } + throw TestException() + } + assertEquals(15, channel.asFlow().take(5).sum()) + + // Workaround for JS bug + try { + channel.asFlow().collect { /* Do nothing */ } + expectUnreached() + } catch (e: TestException) { + finish(2) + } + } + + @Test + fun testBroadcastChannelAsFlowLimits() = runTest { + val channel = BroadcastChannel(1) + val flow = channel.asFlow().map { it * it }.drop(1).take(2) + + var expected = 0 + launch { + assertTrue(channel.offer(1)) // Handed to the coroutine + assertTrue(channel.offer(2)) // Buffered + assertFalse(channel.offer(3)) // Failed to offer + channel.send(3) + yield() + assertEquals(1, expected) + assertTrue(channel.offer(4)) // Handed to the coroutine + assertTrue(channel.offer(5)) // Buffered + assertFalse(channel.offer(6)) // Failed to offer + channel.send(6) + assertEquals(2, expected) + } + + val sum = flow.sum() + assertEquals(13, sum) + ++expected + val sum2 = flow.sum() + assertEquals(61, sum2) + ++expected + } + + @Test + fun flowAsBroadcast() = runTest { + val flow = flow { + repeat(10) { + emit(it) + } + } + + val channel = flow.broadcastIn(this) + assertEquals((0..9).toList(), channel.openSubscription().toList()) + } + + @Test + fun flowAsBroadcastMultipleSubscription() = runTest { + val flow = flow { + repeat(10) { + emit(it) + } + } + + val broadcast = flow.broadcastIn(this) + val channel = broadcast.openSubscription() + val channel2 = broadcast.openSubscription() + + assertEquals(0, channel.receive()) + assertEquals(0, channel2.receive()) + yield() + assertEquals(1, channel.receive()) + assertEquals(1, channel2.receive()) + + channel.cancel() + channel2.cancel() + yield() + ensureActive() + } + + @Test + fun flowAsBroadcastException() = runTest { + val flow = flow { + repeat(10) { + emit(it) + } + + throw TestException() + } + + val channel = flow.broadcastIn(this + NonCancellable) + assertFailsWith { channel.openSubscription().toList() } + assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel + } + + // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains + @Test + fun testFlowAsBroadcastAsFlow() = runTest { + val flow = flow { + emit(1) + emit(2) + emit(3) + }.broadcastIn(this).asFlow() + + assertEquals(6, flow.sum()) + assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold + } + + @Test + fun testBroadcastAsFlowAsBroadcast() = runTest { + val channel = broadcast { + send(1) + }.asFlow().broadcastIn(this) + + channel.openSubscription().consumeEach { + assertEquals(1, it) + } + + channel.openSubscription().consumeEach { + fail() + } + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index c2051e7d33..5d0292ef1e 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -10,141 +10,72 @@ import kotlin.test.* class ChannelFlowTest : TestBase() { @Test - fun testBroadcastChannelAsFlow() = runTest { - val channel = broadcast { - repeat(10) { - send(it + 1) - } + fun testRegular() = runTest { + val flow = channelFlow { + assertTrue(offer(1)) + assertTrue(offer(2)) + assertTrue(offer(3)) } - - val sum = channel.asFlow().sum() - assertEquals(55, sum) + assertEquals(listOf(1, 2, 3), flow.toList()) } @Test - fun testExceptionInBroadcast() = runTest { - expect(1) - val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well - repeat(10) { - send(it + 1) - } - throw TestException() - } - assertEquals(15, channel.asFlow().take(5).sum()) - - // Workaround for JS bug - try { - channel.asFlow().collect { /* Do nothing */ } - expectUnreached() - } catch (e: TestException) { - finish(2) + fun testBuffer() = runTest { + val flow = channelFlow(bufferSize = 1) { + assertTrue(offer(1)) + assertTrue(offer(2)) + assertFalse(offer(3)) } + assertEquals(listOf(1, 2), flow.toList()) } @Test - fun testBroadcastChannelAsFlowLimits() = runTest { - val channel = BroadcastChannel(1) - val flow = channel.asFlow().map { it * it }.drop(1).take(2) - - var expected = 0 - launch { - assertTrue(channel.offer(1)) // Handed to the coroutine - assertTrue(channel.offer(2)) // Buffered - assertFalse(channel.offer(3)) // Failed to offer - channel.send(3) - yield() - assertEquals(1, expected) - assertTrue(channel.offer(4)) // Handed to the coroutine - assertTrue(channel.offer(5)) // Buffered - assertFalse(channel.offer(6)) // Failed to offer - channel.send(6) - assertEquals(2, expected) + fun testConflated() = runTest { + val flow = channelFlow(bufferSize = Channel.CONFLATED) { + assertTrue(offer(1)) + assertTrue(offer(2)) } - - val sum = flow.sum() - assertEquals(13, sum) - ++expected - val sum2 = flow.sum() - assertEquals(61, sum2) - ++expected + assertEquals(listOf(1), flow.toList()) } @Test - fun flowAsBroadcast() = runTest { - val flow = flow { - repeat(10) { - emit(it) + fun testFailureCancelsChannel() = runTest { + val flow = channelFlow { + offer(1) + invokeOnClose { + expect(2) } - } + }.onEach { throw TestException() } - val channel = flow.broadcastIn(this) - assertEquals((0..9).toList(), channel.openSubscription().toList()) + expect(1) + assertFailsWith(flow) + finish(3) } @Test - fun flowAsBroadcastMultipleSubscription() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - } - - val broadcast = flow.broadcastIn(this) - val channel = broadcast.openSubscription() - val channel2 = broadcast.openSubscription() - - assertEquals(0, channel.receive()) - assertEquals(0, channel2.receive()) - yield() - assertEquals(1, channel.receive()) - assertEquals(1, channel2.receive()) + fun testFailureInSourceCancelsConsumer() = runTest { + val flow = channelFlow { + expect(2) + throw TestException() + }.onEach { expectUnreached() } - channel.cancel() - channel2.cancel() - yield() - ensureActive() + expect(1) + assertFailsWith(flow) + finish(3) } @Test - fun flowAsBroadcastException() = runTest { - val flow = flow { - repeat(10) { - emit(it) + fun testScopedCancellation() = runTest { + val flow = channelFlow { + expect(2) + launch(start = CoroutineStart.ATOMIC) { + hang { expect(3) } } - throw TestException() - } - - val channel = flow.broadcastIn(this + NonCancellable) - assertFailsWith { channel.openSubscription().toList() } - assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel - } - - // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains - @Test - fun testFlowAsBroadcastAsFlow() = runTest { - val flow = flow { - emit(1) - emit(2) - emit(3) - }.broadcastIn(this).asFlow() + }.onEach { expectUnreached() } - assertEquals(6, flow.sum()) - assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold - } - - @Test - fun testBroadcastAsFlowAsBroadcast() = runTest { - val channel = broadcast { - send(1) - }.asFlow().broadcastIn(this) - - channel.openSubscription().consumeEach { - assertEquals(1, it) - } - - channel.openSubscription().consumeEach { - fail() - } + expect(1) + assertFailsWith(flow) + finish(4) } } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt new file mode 100644 index 0000000000..388c48b61f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class FlowCallbackTest : TestBase() { + @Test + fun testClosedPrematurely() = runTest(unhandled = listOf({ e -> e is ClosedSendChannelException })) { + val outerScope = this + val flow = channelFlow { + // ~ callback-based API + outerScope.launch(Job()) { + expect(2) + send(1) + expectUnreached() + } + expect(1) + } + assertEquals(emptyList(), flow.toList()) + finish(3) + } + + @Test + fun testNotClosedPrematurely() = runTest { + val outerScope = this + val flow = channelFlow { + // ~ callback-based API + outerScope.launch(Job()) { + expect(2) + send(1) + close() + } + expect(1) + await() + } + + assertEquals(listOf(1), flow.toList()) + finish(3) + } +} + diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt deleted file mode 100644 index 364cd84119..0000000000 --- a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlin.test.* - -class FlowViaChannelTest : TestBase() { - @Test - fun testRegular() = runTest { - val flow = flowViaChannel { - assertTrue(it.offer(1)) - assertTrue(it.offer(2)) - assertTrue(it.offer(3)) - it.close() - } - assertEquals(listOf(1, 2, 3), flow.toList()) - } - - @Test - fun testBuffer() = runTest { - val flow = flowViaChannel(bufferSize = 1) { - assertTrue(it.offer(1)) - assertTrue(it.offer(2)) - assertFalse(it.offer(3)) - it.close() - } - assertEquals(listOf(1, 2), flow.toList()) - } - - @Test - fun testConflated() = runTest { - val flow = flowViaChannel(bufferSize = Channel.CONFLATED) { - assertTrue(it.offer(1)) - assertTrue(it.offer(2)) - it.close() - } - assertEquals(listOf(1), flow.toList()) - } - - @Test - fun testFailureCancelsChannel() = runTest { - val flow = flowViaChannel { - it.offer(1) - it.invokeOnClose { - expect(2) - } - }.onEach { throw TestException() } - - expect(1) - assertFailsWith(flow) - finish(3) - } - - @Test - fun testFailureInSourceCancelsConsumer() = runTest { - val flow = flowViaChannel { - expect(2) - throw TestException() - }.onEach { expectUnreached() } - - expect(1) - assertFailsWith(flow) - finish(3) - } - - @Test - fun testScopedCancellation() = runTest { - val flow = flowViaChannel { - expect(2) - launch(start = CoroutineStart.ATOMIC) { - hang { expect(3) } - } - throw TestException() - }.onEach { expectUnreached() } - - expect(1) - assertFailsWith(flow) - finish(4) - } -} diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt similarity index 81% rename from kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt rename to kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index 9d7799c9d8..1571ff8844 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -10,7 +10,7 @@ import org.junit.Test import kotlin.concurrent.* import kotlin.test.* -class FlowFromChannelTest : TestBase() { +class CallbackFlowTest : TestBase() { private class CallbackApi(val block: (SendChannel) -> Unit) { var started = false @@ -39,7 +39,7 @@ class FlowFromChannelTest : TestBase() { runCatching { it.offer(++i) } } - val flow = flowViaChannel { channel -> + val flow = channelFlow(16) { api.start(channel) channel.invokeOnClose { api.stop() @@ -83,7 +83,7 @@ class FlowFromChannelTest : TestBase() { } } - val flow = flowViaChannel { channel -> + val flow = channelFlow { api.start(channel) channel.invokeOnClose { api.stop() @@ -106,4 +106,22 @@ class FlowFromChannelTest : TestBase() { assertTrue(api.started) assertTrue(api.stopped) } + + + @Test + fun testMergeExample() = runTest { + // Too slow on JS + withContext(Dispatchers.Default) { + val f1 = (1..10_000).asFlow() + val f2 = (10_001..20_000).asFlow() + assertEquals((1..20_000).toSet(), f1.merge(f2).toSet()) + } + } + + private fun Flow.merge(other: Flow): Flow = channelFlow { + launch { + collect { send(it) } + } + other.collect { send(it) } + } } From c34c1d5c9e298415ae65341a19a6fba11c5fe0fc Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 27 May 2019 13:19:43 +0300 Subject: [PATCH 2/3] Improve documentation, rename await to awaitClose, improve awaitClose implementation --- .../kotlinx-coroutines-core.txt | 4 ++-- .../common/src/channels/Produce.kt | 20 +++++++++++-------- .../common/src/flow/Builders.kt | 10 ++++++---- .../common/test/channels/ProduceTest.kt | 8 ++++---- .../test/flow/channels/FlowCallbackTest.kt | 2 +- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 4090054c6a..81c4757b2d 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -718,8 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli } public final class kotlinx/coroutines/channels/ProduceKt { - public static final fun await (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun await$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel; public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index c1b8a0e592..8d34265f92 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -29,25 +29,29 @@ public interface ProducerScope : CoroutineScope, SendChannel { * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel] * and invokes the given [block] before resuming the coroutine. * + * Note that when producer channel is cancelled this function resumes with cancellation exception, + * so putting the code after calling this function would not lead to its execution in case of cancellation. + * That is why this code takes a lambda parameter. + * * Example of usage: * ``` * val callbackEventsStream = produce { * val disposable = registerChannelInCallback(channel) - * await { disposable.dispose() } + * awaitClose { disposable.dispose() } * } * ``` */ @ExperimentalCoroutinesApi -public suspend fun ProducerScope.await(block: () -> Unit = {}) { - check(kotlin.coroutines.coroutineContext[Job] === this) { "await() can be invoke only from the producer context" } - suspendCancellableCoroutine { cont -> - invokeOnClose { - try { - block() - } finally { +public suspend fun ProducerScope.awaitClose(block: () -> Unit = {}) { + check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" } + try { + suspendCancellableCoroutine { cont -> + invokeOnClose { cont.resume(Unit) } } + } finally { + block() } } diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 55ee1c6bda..733bf63e6a 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -234,7 +234,7 @@ public fun flowViaChannel( * on the resulting flow. * * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts. - * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [await] can be used. + * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used. * For more detailed example please refer to [callbackFlow] documentation. * * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. @@ -283,7 +283,7 @@ public fun channelFlow(bufferSize: Int = 16, @BuilderInference block: suspen * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context, * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources, - * [await] extension should be used. [await] argument will be invoked when either flow consumer cancels flow collection + * [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection * or when callback-based API invokes [SendChannel.close] manually. * * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. @@ -295,10 +295,12 @@ public fun channelFlow(bufferSize: Int = 16, @BuilderInference block: suspen * fun flowFrom(api: CallbackBasedApi): Flow = callbackFlow { * val callback = object : Callback { // implementation of some callback interface * override fun onNextValue(value: T) { - * offer(value) // Note: offer drops value when buffer is full + * // Note: offer drops value when buffer is full + * // Channel.UNLIMITED can be used to avoid overfill + * offer(value) * } * override fun onApiError(cause: Throwable) { - * cancel("API Error", CancellationException(cause)) + * cancel(CancellationException("API Error", cause)) * } * override fun onCompleted() = channel.close() * } diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 0001ca831d..cbaf7085c4 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -99,7 +99,7 @@ class ProduceTest : TestBase() { val parent = Job() val channel = produce(parent) { expect(2) - await { expect(4) } + awaitClose { expect(4) } } expect(1) yield() @@ -119,7 +119,7 @@ class ProduceTest : TestBase() { expect(3) this@produce.cancel() } - await { expect(4) } + awaitClose { expect(4) } } expect(1) parent.complete() @@ -132,7 +132,7 @@ class ProduceTest : TestBase() { val parent = Job() produce(parent) { expect(2) - await { expect(4) } + awaitClose { expect(4) } } expect(1) yield() @@ -145,7 +145,7 @@ class ProduceTest : TestBase() { fun testAwaitIllegalState() = runTest { val channel = produce { } @Suppress("RemoveExplicitTypeArguments") // KT-31525 - assertFailsWith { (channel as ProducerScope<*>).await() } + assertFailsWith { (channel as ProducerScope<*>).awaitClose() } } private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply { diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt index 388c48b61f..d992d06e48 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt @@ -38,7 +38,7 @@ class FlowCallbackTest : TestBase() { close() } expect(1) - await() + awaitClose() } assertEquals(listOf(1), flow.toList()) From 83b31965fd3a1cc717fc38a4e9f081b8ee9bd37c Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 28 May 2019 00:36:56 +0300 Subject: [PATCH 3/3] Fix race in tests --- .../reference-public-api/kotlinx-coroutines-core.txt | 1 - kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 81c4757b2d..dc3180b996 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -802,7 +802,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; - public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index 1571ff8844..0a66ae66a2 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -41,7 +41,7 @@ class CallbackFlowTest : TestBase() { val flow = channelFlow(16) { api.start(channel) - channel.invokeOnClose { + awaitClose { api.stop() } } @@ -85,7 +85,7 @@ class CallbackFlowTest : TestBase() { val flow = channelFlow { api.start(channel) - channel.invokeOnClose { + awaitClose { api.stop() } }