diff --git a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt index d4c131ef3e..ecb02d870b 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt @@ -6,24 +6,34 @@ package kotlinx.coroutines.channels * * - [SUSPEND] — the upstream that is [sending][SendChannel.send] or * is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full. - * - [DROP_OLDEST] — drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend. - * - [DROP_LATEST] — drop **the latest** value that is being added to the buffer right now on buffer overflow - * (so that buffer contents stay the same), do not suspend. + * - [DROP_OLDEST] — **the oldest** value in the buffer is dropped on overflow, and the new value is added, + * all without suspending. + * - [DROP_LATEST] — the buffer remains unchanged on overflow, and the value that we were going to add + * gets discarded, all without suspending. */ public enum class BufferOverflow { /** * Suspend on buffer overflow. + * + * Use this to create backpressure, forcing the producers to slow down creation of new values in response to + * consumers not being able to process the incoming values in time. + * [SUSPEND] is a good choice when all elements must eventually be processed. */ SUSPEND, /** * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend. + * + * Use this in scenarios when only the last few values are important and skipping the processing of severely + * outdated ones is desirable. */ DROP_OLDEST, /** - * Drop **the latest** value that is being added to the buffer right now on buffer overflow - * (so that buffer contents stay the same), do not suspend. + * Leave the buffer unchanged on overflow, dropping the value that we were going to add, do not suspend. + * + * This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are + * equal, so it is not important which of them get thrown away. */ DROP_LATEST } diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index f8ea04c96a..15534b08fe 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -49,10 +49,43 @@ public fun ReceiveChannel.onReceiveOrNull(): SelectClause1 { } /** - * Makes sure that the given [block] consumes all elements from the given channel - * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. + * Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel. * - * The operation is _terminal_. + * It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so + * the operation is _terminal_. + * If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown. + * + * This function is useful for building more complex terminal operators while ensuring that the producers stop sending + * new elements to the channel. + * + * Example: + * ``` + * suspend fun ReceiveChannel.consumeFirst(): E = + * consume { return receive() } + * // Launch a coroutine that constantly sends new values + * val channel = produce(Dispatchers.Default) { + * var i = 0 + * while (true) { + * // Will fail with a `CancellationException` + * // after `consumeFirst` finishes. + * send(i++) + * } + * } + * // Grab the first value and discard everything else + * val firstElement = channel.consumeFirst() + * check(firstElement == 0) + * // *Note*: some elements could be lost in the channel! + * ``` + * + * In this example, the channel will get closed, and the producer coroutine will finish its work after the first + * element is obtained. + * If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active + * until it was cancelled some other way. + * + * [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so + * some channel elements may be lost. + * Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these + * elements during [ReceiveChannel.cancel]. */ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() -> R): R { contract { @@ -70,12 +103,58 @@ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() - } /** - * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] - * the channel after the execution of the block. - * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead. + * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward. + * + * This function stops processing elements when either the channel is [closed][SendChannel.close], + * the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the + * channel's buffer, + * [action] fails with an exception, + * or an early return from [action] happens. + * If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown. + * If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach]. + * + * When the channel does not need to be closed after iterating over its elements, + * a regular `for` loop (`for (element in channel)`) should be used instead. * * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel]. + * + * This function is useful in cases when this channel is only expected to have a single consumer that decides when + * the producer may stop. + * Example: + * + * ``` + * val channel = Channel(1) + * // Launch several procedures that create values + * repeat(5) { + * launch(Dispatchers.Default) { + * while (true) { + * channel.send(Random.nextInt(40, 50)) + * } + * } + * } + * // Launch the exclusive consumer + * val result = run { + * channel.consumeEach { + * if (it == 42) { + * println("Found the answer") + * return@run it // forcibly stop collection + * } + * } + * // *Note*: some elements could be lost in the channel! + * } + * check(result == 42) + * ``` + * + * In this example, several coroutines put elements into a single channel, and a single consumer processes the elements. + * Once it finds the elements it's looking for, it stops [consumeEach] by making an early return. + * + * **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after + * this function decided to close the channel. + * In this case, the elements will simply be lost. + * If the elements of the channel are resources that must be closed (like file handles, sockets, etc.), + * an `onUndeliveredElement` must be passed to the [Channel] on construction. + * It will be called for each element left in the channel at the point of cancellation. */ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit): Unit = consume { @@ -83,10 +162,31 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) } /** - * Returns a [List] containing all elements. + * Returns a [List] containing all the elements sent to this channel, preserving their order. + * + * This function will attempt to receive elements and put them into the list until the channel is + * [closed][SendChannel.close]. + * Calling [toList] on channels that are not eventually closed is always incorrect: + * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. + * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory + * until exhausting it. + * + * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * + * Example: + * ``` + * val values = listOf(1, 5, 2, 9, 3, 3, 1) + * // start a new coroutine that creates a channel, + * // sends elements to it, and closes it + * // once the coroutine's body finishes + * val channel = produce { + * values.forEach { send(it) } + * } + * check(channel.toList() == values) + * ``` */ public suspend fun ReceiveChannel.toList(): List = buildList { consumeEach { diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 3dd0bb4ff9..7a955597f1 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -19,18 +19,21 @@ public interface ProducerScope : CoroutineScope, SendChannel { } /** - * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel] - * and invokes the given [block] before resuming the coroutine. + * Suspends the current coroutine until the channel is either + * [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. * - * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this - * suspending function is waiting, this function immediately resumes with [CancellationException]. + * The given [block] will be executed unconditionally before this function returns. + * `awaitClose { cleanup() }` is a convenient shorthand for the often useful form + * `try { awaitClose() } finally { cleanup() }`. + * + * This function can only be invoked directly inside the same coroutine that is its receiver. + * Specifying the receiver of [awaitClose] explicitly is most probably a mistake. + * + * This suspending function is cancellable: if the [Job] of the current coroutine is [cancelled][CoroutineScope.cancel] + * while this suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if this function is ready to return, but was cancelled * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details. * - * Note that when the producer channel is cancelled, this function resumes with a cancellation exception. - * Therefore, in case of cancellation, no code after the call to this function will be executed. - * That's why this function takes a lambda parameter. - * * Example of usage: * ``` * val callbackEventsStream = produce { @@ -38,6 +41,21 @@ public interface ProducerScope : CoroutineScope, SendChannel { * awaitClose { disposable.dispose() } * } * ``` + * + * Internally, [awaitClose] is implemented using [SendChannel.invokeOnClose]. + * Currently, every channel can have at most one [SendChannel.invokeOnClose] handler. + * This means that calling [awaitClose] several times in a row or combining it with other [SendChannel.invokeOnClose] + * invocations is prohibited. + * An [IllegalStateException] will be thrown if this rule is broken. + * + * **Pitfall**: when used in [produce], if the channel is [cancelled][ReceiveChannel.cancel], [awaitClose] can either + * return normally or throw a [CancellationException] due to a race condition. + * The reason is that, for [produce], cancelling the channel and cancelling the coroutine of the [ProducerScope] is + * done simultaneously. + * + * @throws IllegalStateException if invoked from outside the [ProducerScope] (by leaking `this` outside the producer + * coroutine). + * @throws IllegalStateException if this channel already has a [SendChannel.invokeOnClose] handler registered. */ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" } @@ -58,35 +76,169 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine. * * The scope of the coroutine contains the [ProducerScope] interface, which implements - * both [CoroutineScope] and [SendChannel], so that the coroutine can invoke - * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close] - * when the coroutine completes. - * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel]. + * both [CoroutineScope] and [SendChannel], so that the coroutine can invoke [send][SendChannel.send] directly. + * + * The kind of the resulting channel depends on the specified [capacity] parameter. + * See the [Channel] interface documentation for details. + * By default, an unbuffered channel is created. + * + * ### Behavior on termination + * + * The channel is [closed][SendChannel.close] when the coroutine completes. + * + * ``` + * val values = listOf(1, 2, 3, 4) + * val channel = produce { + * for (value in values) { + * send(value) + * } + * } + * check(channel.toList() == values) + * ``` + * + * The running coroutine is cancelled when the channel is [cancelled][ReceiveChannel.cancel]. + * + * ``` + * val channel = produce { + * send(1) + * send(2) + * try { + * send(3) // will throw CancellationException + * } catch (e: CancellationException) { + * println("The channel was cancelled!) + * throw e // always rethrow CancellationException + * } + * } + * check(channel.receive() == 1) + * check(channel.receive() == 2) + * channel.cancel() + * ``` + * + * If this coroutine finishes with an exception, it will close the channel with that exception as the cause and + * the resulting channel will become _failed_, so after receiving all the existing elements, all further attempts + * to receive from it will throw the exception with which the coroutine finished. + * + * ``` + * val produceJob = Job() + * // create and populate a channel with a buffer + * val channel = produce(produceJob, capacity = Channel.UNLIMITED) { + * repeat(5) { send(it) } + * throw TestException() + * } + * produceJob.join() // wait for `produce` to fail + * check(produceJob.isCancelled == true) + * // prints 0, 1, 2, 3, 4, then throws `TestException` + * for (value in channel) { println(value) } + * ``` + * + * When the coroutine is cancelled via structured concurrency and not the `cancel` function, + * the channel does not automatically close until the coroutine completes, + * so it is possible that some elements will be sent even after the coroutine is cancelled: + * + * ``` + * val parentScope = CoroutineScope(Dispatchers.Default) + * val channel = parentScope.produce(capacity = Channel.UNLIMITED) { + * repeat(5) { + * send(it) + * } + * parentScope.cancel() + * // suspending after this point would fail, but sending succeeds + * send(-1) + * } + * for (c in channel) { + * println(c) // 0, 1, 2, 3, 4, -1 + * } // throws a `CancellationException` exception after reaching -1 + * ``` + * + * Note that cancelling `produce` via structured concurrency closes the channel with a cause, + * making it a _failed_ channel. + * + * The behavior around coroutine cancellation and error handling is experimental and may change in a future release. + * + * ### Coroutine context * * The coroutine context is inherited from this [CoroutineScope]. Additional context elements can be specified with the [context] argument. * If the context does not have any dispatcher or other [ContinuationInterceptor], then [Dispatchers.Default] is used. * The parent job is inherited from the [CoroutineScope] as well, but it can also be overridden * with a corresponding [context] element. * - * Any uncaught exception in this coroutine will close the channel with this exception as the cause and - * the resulting channel will become _failed_, so that any attempt to receive from it thereafter will throw an exception. + * See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines. * - * The kind of the resulting channel depends on the specified [capacity] parameter. - * See the [Channel] interface documentation for details. + * ### Undelivered elements * - * See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines. + * Some values that [produce] creates may be lost: + * + * ``` + * val channel = produce(Dispatchers.Default, capacity = 5) { + * repeat(100) { + * send(it) + * println("Sent $it") + * } + * } + * channel.cancel() // no elements can be received after this! + * ``` + * + * There is no way to recover these lost elements. + * If this is unsuitable, please create a [Channel] manually and pass the `onUndeliveredElement` callback to the + * constructor: [Channel(onUndeliveredElement = ...)][Channel]. + * + * ### Usage example + * + * ``` + * /* Generate random integers until we find the square root of 9801. + * To calculate whether the given number is that square root, + * use several coroutines that separately process these integers. + * Alternatively, we may randomly give up during value generation. + * `produce` is used to generate the integers and put them into a + * channel, from which the square-computing coroutines take them. */ + * val parentScope = CoroutineScope(SupervisorJob()) + * val channel = parentScope.produce( + * Dispatchers.IO, + * capacity = 16 // buffer of size 16 + * ) { + * // this code will run on Dispatchers.IO + * while (true) { + * val request = run { + * // simulate waiting for the next request + * delay(5.milliseconds) + * val randomInt = Random.nextInt(-1, 100) + * if (randomInt == -1) { + * // external termination request received + * println("Producer: no longer accepting requests") + * return@produce + * } + * println("Producer: sending a request ($randomInt)") + * randomInt + * } + * send(request) + * } + * } + * // Launch consumers + * repeat(4) { + * launch(Dispatchers.Default) { + * for (request in channel) { + * // simulate processing a request + * delay(25.milliseconds) + * println("Consumer $it: received a request ($request)") + * if (request * request == 9801) { + * println("Consumer $it found the square root of 9801!") + * /* the work is done, the producer may finish. + * the internal termination request will cancel + * the producer on the next suspension point. */ + * channel.cancel() + * } + * } + * } + * } + * ``` * * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. - * - * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. - * @param capacity capacity of the channel's buffer (no buffer by default). - * @param block the coroutine code. */ @ExperimentalCoroutinesApi public fun CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = 0, + capacity: Int = Channel.RENDEZVOUS, @BuilderInference block: suspend ProducerScope.() -> Unit ): ReceiveChannel = produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block) diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index d4d6887daa..235609c804 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -95,6 +95,15 @@ class ChannelsTest: TestBase() { } + @Test + fun testToListOnFailedChannel() = runTest { + val channel = Channel() + channel.close(TestException()) + assertFailsWith { + channel.toList() + } + } + private fun Iterable.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context) { for (element in this@asReceiveChannel) diff --git a/kotlinx-coroutines-core/common/test/channels/ConsumeTest.kt b/kotlinx-coroutines-core/common/test/channels/ConsumeTest.kt index f5df234059..57d9b5b721 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConsumeTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConsumeTest.kt @@ -91,6 +91,45 @@ class ConsumeTest: TestBase() { assertTrue(channel.isClosedForReceive) } + /** Checks that [ReceiveChannel.consumeEach] reacts to cancellation, but processes the elements that are + * readily available in the buffer. */ + @Test + fun testConsumeEachExitsOnCancellation() = runTest { + val undeliveredElements = mutableListOf() + val channel = Channel(2, onUndeliveredElement = { + undeliveredElements.add(it) + }) + launch { + // These two elements will be sent and put into the buffer: + channel.send(0) + channel.send(1) + // This element will not fit into the buffer, so `send` suspends: + channel.send(2) + // At this point, the consumer's `launch` is cancelled. + yield() // Allow the cancellation handler of the consumer to run. + // Try to send a new element, which will fail at this point: + channel.send(3) + fail("unreached") + } + launch { + channel.consumeEach { + cancel() + assertTrue(it in 0..2) + } + }.join() + assertTrue(channel.isClosedForReceive) + assertEquals(listOf(3), undeliveredElements) + } + + @Test + fun testConsumeEachThrowingOnChannelClosing() = runTest { + val channel = Channel() + channel.close(TestException()) + assertFailsWith { + channel.consumeEach { fail("unreached") } + } + } + /** Check that [BroadcastChannel.consume] does not suffer from KT-58685 */ @Suppress("DEPRECATION", "DEPRECATION_ERROR") @Test diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 1b68bacbad..654982ea8f 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -113,6 +113,36 @@ class ProduceTest : TestBase() { finish(6) } + @Test + fun testAwaitCloseOnlyAllowedOnce() = runTest { + expect(1) + val c = produce { + try { + awaitClose() + } catch (e: CancellationException) { + assertFailsWith { + awaitClose() + } + finish(2) + throw e + } + } + yield() // let the `produce` procedure run + c.cancel() + } + + @Test + fun testInvokeOnCloseWithAwaitClose() = runTest { + expect(1) + produce { + invokeOnClose { } + assertFailsWith { + awaitClose() + } + finish(2) + } + } + @Test fun testAwaitConsumerCancellation() = runTest { val parent = Job() @@ -178,6 +208,64 @@ class ProduceTest : TestBase() { finish(3) } + @Test + fun testUncaughtExceptionsInProduce() = runTest( + unhandled = listOf({ it is TestException }) + ) { + val c = produce { + launch(SupervisorJob()) { + throw TestException() + }.join() + send(3) + } + assertEquals(3, c.receive()) + } + + @Test + fun testCancellingProduceCoroutineButNotChannel() = runTest { + val c = produce(Job(), capacity = Channel.UNLIMITED) { + launch { throw TestException() } + try { + yield() + } finally { + repeat(10) { trySend(it) } + } + } + repeat(10) { assertEquals(it, c.receive()) } + } + + @Test + fun testReceivingValuesAfterFailingTheCoroutine() = runTest { + val produceJob = Job() + val c = produce(produceJob, capacity = Channel.UNLIMITED) { + repeat(5) { send(it) } + throw TestException() + } + produceJob.join() + assertTrue(produceJob.isCancelled) + repeat(5) { assertEquals(it, c.receive()) } + assertFailsWith { c.receive() } + } + + @Test + fun testSilentKillerInProduce() = runTest { + val parentScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + val channel = parentScope.produce(capacity = Channel.UNLIMITED) { + repeat(5) { + send(it) + } + parentScope.cancel() + // suspending after this point would fail, but sending succeeds + send(-1) + } + launch { + for (c in channel) { + println(c) // 0, 1, 2, 3, 4, -1 + } // throws an exception after reaching -1 + fail("unreached") + } + } + private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply { val source = Channel() expect(1)