diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 7cfc3500a2..00e9fd704d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -185,18 +185,17 @@ public fun LongRange.asFlow(): Flow = flow { /** * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel] * that is provided to the builder's [block] of code. It allows elements to be - * produced by the code that is running in a different context, - * e.g. from a callback-based API. + * produced by the code that is running in a different context, e.g. from a callback-based API. * * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator - * on the resulting flow. + * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder + * should be used instead. * * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory. * The provided channel can later be used by any external service to communicate with flow and its buffer determines * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used). * * Example of usage: - * * ``` * fun flowFrom(api: CallbackBasedApi): Flow = flowViaChannel { channel -> * val callback = object : Callback { // implementation of some callback interface @@ -206,6 +205,7 @@ public fun LongRange.asFlow(): Flow = flow { * override fun onApiError(cause: Throwable) { * channel.cancel("API Error", CancellationException(cause)) * } + * override fun onCompleted() = channel.close() * } * api.register(callback) * channel.invokeOnClose { @@ -217,7 +217,7 @@ public fun LongRange.asFlow(): Flow = flow { @FlowPreview public fun flowViaChannel( bufferSize: Int = 16, - @BuilderInference block: suspend (SendChannel) -> Unit + @BuilderInference block: CoroutineScope.(channel: SendChannel) -> Unit ): Flow { return flow { coroutineScope { diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt index e909735f2a..364cd84119 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt @@ -12,8 +12,20 @@ class FlowViaChannelTest : TestBase() { @Test fun testRegular() = runTest { val flow = flowViaChannel { - it.send(1) - it.send(2) + assertTrue(it.offer(1)) + assertTrue(it.offer(2)) + assertTrue(it.offer(3)) + it.close() + } + assertEquals(listOf(1, 2, 3), flow.toList()) + } + + @Test + fun testBuffer() = runTest { + val flow = flowViaChannel(bufferSize = 1) { + assertTrue(it.offer(1)) + assertTrue(it.offer(2)) + assertFalse(it.offer(3)) it.close() } assertEquals(listOf(1, 2), flow.toList()) @@ -22,10 +34,51 @@ class FlowViaChannelTest : TestBase() { @Test fun testConflated() = runTest { val flow = flowViaChannel(bufferSize = Channel.CONFLATED) { - it.send(1) - it.send(2) + assertTrue(it.offer(1)) + assertTrue(it.offer(2)) it.close() } assertEquals(listOf(1), flow.toList()) } + + @Test + fun testFailureCancelsChannel() = runTest { + val flow = flowViaChannel { + it.offer(1) + it.invokeOnClose { + expect(2) + } + }.onEach { throw TestException() } + + expect(1) + assertFailsWith(flow) + finish(3) + } + + @Test + fun testFailureInSourceCancelsConsumer() = runTest { + val flow = flowViaChannel { + expect(2) + throw TestException() + }.onEach { expectUnreached() } + + expect(1) + assertFailsWith(flow) + finish(3) + } + + @Test + fun testScopedCancellation() = runTest { + val flow = flowViaChannel { + expect(2) + launch(start = CoroutineStart.ATOMIC) { + hang { expect(3) } + } + throw TestException() + }.onEach { expectUnreached() } + + expect(1) + assertFailsWith(flow) + finish(4) + } }