From 09a1e030c04fc3c5386e1c1beba822bc94ad929d Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 1 Oct 2024 12:21:04 +0200 Subject: [PATCH 01/13] Update the documentation for the Channel interface --- .../common/src/channels/BufferOverflow.kt | 2 +- .../common/src/channels/Channel.kt | 1082 +++++++++++++---- .../common/src/channels/Produce.kt | 1 + .../test/channels/BufferedChannelTest.kt | 14 + .../common/test/channels/ProduceTest.kt | 7 + .../test/channels/RendezvousChannelTest.kt | 20 + .../selects/SelectRendezvousChannelTest.kt | 7 +- .../test/channels/CancelledChannelLeakTest.kt | 27 + 8 files changed, 929 insertions(+), 231 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/channels/CancelledChannelLeakTest.kt diff --git a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt index ecb02d870b..652f8d7c1e 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt @@ -13,7 +13,7 @@ package kotlinx.coroutines.channels */ public enum class BufferOverflow { /** - * Suspend on buffer overflow. + * Suspend until free space appears in the buffer. * * Use this to create backpressure, forcing the producers to slow down creation of new values in response to * consumers not being able to process the incoming values in time. diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 830d786259..e2533a0d9f 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -15,15 +15,60 @@ import kotlin.internal.* import kotlin.jvm.* /** - * Sender's interface to [Channel]. + * Sender's interface to a [Channel]. + * + * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface. + * + * It is not expected that this interface will be implemented directly. + * Instead, the existing [Channel] implementations can be used or delegated to. */ public interface SendChannel { /** * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel]. * This means that calling [send] will result in an exception. * - * Note that if this property returns `false`, it does not guarantee that consecutive call to [send] will succeed, as the - * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [trySend] instead. + * Note that if this property returns `false`, it does not guarantee that a subsequent call to [send] will succeed, + * as the channel can be concurrently closed right after the check. + * For such scenarios, [trySend] is the more robust solution: it attempts to send the element and returns + * a result that says whether the channel was closed, and if not, whether sending a value was successful. + * + * ``` + * // DANGER! THIS CHECK IS NOT RELIABLE! + * if (!channel.isClosedForSend) { + * channel.send(element) // can still fail! + * } else { + * println("Can not send: the channel is closed") + * } + * // DO THIS INSTEAD: + * channel.trySend(element).onClosed { + * println("Can not send: the channel is closed") + * } + * ``` + * + * The primary intended usage of this property is skipping some portions of code that should not be executed if the + * channel is already known to be closed. + * For example: + * + * ``` + * if (channel.isClosedForSend) { + * // fast path + * return + * } else { + * // slow path: actually computing the value + * val nextElement = run { + * // some heavy computation + * } + * channel.send(nextElement) // can fail anyway, + * // but at least we tried to avoid the computation + * } + * ``` + * + * However, in many cases, even that can be achieved more idiomatically by cancelling the coroutine producing the + * elements to send. + * See [produce] for a way to launch a coroutine that produces elements and cancels itself when the channel is + * closed. + * + * [isClosedForSend] can also be used for assertions and diagnostics to verify the expected state of the channel. * * @see SendChannel.trySend * @see SendChannel.close @@ -33,13 +78,33 @@ public interface SendChannel { public val isClosedForSend: Boolean /** - * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full - * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). + * Sends the specified [element] to this channel. * - * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation - * to abort, because closing a channel is conceptually like sending a special "close token" over this channel. - * All elements sent over the channel are delivered in first-in first-out order. The sent element - * will be delivered to receivers before the close token. + * This function suspends if it does not manage to pass the element to the channel's buffer + * (or directly the receiving side if there's no buffer), + * and it can be cancelled with or without having successfully passed the element. + * See the "Suspending and cancellation" section below for details. + * If the channel is [closed][close], an exception is thrown (see below). + * + * ``` + * val channel = Channel() + * launch { + * check(channel.receive() == 5) + * } + * channel.send(5) // suspends until 5 is received + * ``` + * + * ## Suspending and cancellation + * + * If the [BufferOverflow] strategy of this channel is [BufferOverflow.SUSPEND], + * this function may suspend. + * The exact scenarios differ depending on the channel's capacity: + * - If the channel is [rendezvous][RENDEZVOUS], + * the sender will be suspended until the receiver calls [ReceiveChannel.receive]. + * - If the channel is [unlimited][UNLIMITED] or [conflated][CONFLATED], + * the sender will never be suspended even with the [BufferOverflow.SUSPEND] strategy. + * - If the channel is buffered (either [BUFFERED] or uses a non-default buffer capacity), + * the sender will be suspended until the buffer has free space. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -47,74 +112,182 @@ public interface SendChannel { * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, an exception does not always mean a failure to deliver the element. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed: + * + * ``` + * // because of UNLIMITED, sending to this channel never suspends + * val channel = Channel(Channel.UNLIMITED) + * val job = launch { + * while (isActive) { + * channel.send(42) + * } + * // the loop exits when the job is cancelled + * } + * ``` + * + * This isn't needed if other cancellable functions are called inside the loop, like [delay]. + * + * ## Sending to a closed channel + * + * If a channel was [closed][close] before [send] was called and no cause was specified, + * an [ClosedSendChannelException] will be thrown from [send]. + * If a channel was [closed][close] with a cause before [send] was called, + * then [send] will rethrow the exact object that was passed to [close]. + * + * In both cases, it is guaranteed that the element was not delivered to the consumer, + * and the `onUndeliveredElement` callback will be called. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. + * + * [Closing][close] a channel _after_ this function suspends does not cause this suspended [send] invocation + * to abort: although subsequent invocations of [send] fail, the existing ones will continue to completion, + * unless the sending coroutine is cancelled. + * + * ## Related * * This function can be used in [select] invocations with the [onSend] clause. - * Use [trySend] to try sending to this channel without waiting. + * Use [trySend] to try sending to this channel without waiting and throwing. */ public suspend fun send(element: E) /** - * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified - * as the parameter is sent to the channel. When the clause is selected, the reference to this channel - * is passed into the corresponding block. + * Clause for the [select] expression of the [send] suspending function that selects when the element that is + * specified as the parameter is sent to the channel. + * When the clause is selected, the reference to this channel is passed into the corresponding block. * - * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). + * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] before + * the [select] suspends (see the "Sending to a closed channel" section of [send]). + * + * Example: + * ``` + * val sendChannels = List(4) { index -> + * Channel(onUndeliveredElement = { + * println("Undelivered element $it for $index") + * }).also { channel -> + * // launch a consumer for this channel + * launch { + * withTimeout(1.seconds) { + * println("Consumer $index receives: ${channel.receive()}") + * } + * } + * } + * } + * val element = 42 + * select { + * for (channel in sendChannels) { + * channel.onSend(element) { + * println("Sent to channel $it") + * } + * } + * } + * ``` + * Here, we start a [select] expression that waits for exactly one of the four [onSend] invocations + * to successfully send the element to the receiver, + * and the other three will instead invoke the `onUndeliveredElement` callback. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. + * + * Like [send], [onSend] obeys the rules of prompt cancellation: + * [select] may finish with a [CancellationException] even if the element was successfully sent. */ public val onSend: SelectClause2> /** - * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, - * and returns the successful result. Otherwise, returns failed or closed result. - * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws. + * Attempts to add the specified [element] to this channel without waiting. + * + * [trySend] never suspends and never throws exceptions. + * Instead, it returns a [ChannelResult] that encapsulates the result of the operation. + * This makes it different from [send], which can suspend and throw exceptions. * - * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and - * it does not call `onUndeliveredElement` that was installed for this channel. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * If this channel is currently full and cannot receive new elements at the time or is [closed][close], + * this function returns a result that indicates [a failure][ChannelResult.isFailure]. + * In this case, it is guaranteed that the element was not delivered to the consumer and the + * `onUndeliveredElement` callback, if one is provided during the [Channel]'s construction, does *not* get called. + * + * [trySend] can be used as a non-`suspend` alternative to [send] in cases where it's known beforehand + * that the channel's buffer can not overflow. + * ``` + * class Coordinates(val x: Int, val y: Int) + * // A channel for a single subscriber that stores the latest mouse position update. + * // If more than one subscriber is expected, consider using a `StateFlow` instead. + * val mousePositionUpdates = Channel(Channel.CONFLATED) + * // Notifies the subscriber about the new mouse position. + * // If the subscriber is slow, the intermediate updates are dropped. + * fun moveMouse(coordinates: Coordinates) { + * val result = mousePositionUpdates.trySend(coordinates) + * if (result.isClosed) { + * error("Mouse position is no longer being processed") + * } + * } + * ``` */ public fun trySend(element: E): ChannelResult /** - * Closes this channel. - * This is an idempotent operation — subsequent invocations of this function have no effect and return `false`. - * Conceptually, it sends a special "close token" over this channel. - * - * Immediately after invocation of this function, - * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive] - * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements - * are received. - * - * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] - * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive]. - * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or - * receive on a failed channel throw the specified [cause] exception. + * Closes this channel so that subsequent attempts to [send] to it fail. + * + * Returns `true` if the channel was not closed previously and the call to this function closed it. + * If the channel was already closed, this function does nothing and returns `false`. + * + * The existing elements in the channel remain there, and likewise, + * the calls to [send] an [onSend] that have suspended before [close] was called will not be affected. + * Only the subsequent calls to [send], [trySend], or [onSend] will fail. + * [isClosedForSend] will start returning `true` immediately after this function is called. + * + * Once all the existing elements are received, the channel will be considered closed for `receive` as well. + * This means that [receive][ReceiveChannel.receive] will also start throwing exceptions. + * At that point, [isClosedForReceive][ReceiveChannel.isClosedForReceive] will start returning `true`. + * + * If the [cause] is non-null, it will be thrown from all the subsequent attempts to [send] to this channel, + * as well as from all the attempts to [receive][ReceiveChannel.receive] from the channel after no elements remain. + * + * If the [cause] is null, the channel is considered to have completed normally. + * All subsequent calls to [send] will throw a [ClosedSendChannelException], + * whereas calling [receive][ReceiveChannel.receive] will throw a [ClosedReceiveChannelException] + * after there are no more elements. + * + * ``` + * val channel = Channel() + * channel.send(1) + * channel.close() + * try { + * channel.send(2) + * error("The channel is closed, so this line is never reached") + * } catch (e: ClosedSendChannelException) { + * // expected + * } + * ``` */ public fun close(cause: Throwable? = null): Boolean /** - * Registers a [handler] which is synchronously invoked once the channel is [closed][close] + * Registers a [handler] that is synchronously invoked once the channel is [closed][close] * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel]. * Only one handler can be attached to a channel during its lifetime. * The `handler` is invoked when [isClosedForSend] starts to return `true`. * If the channel is closed already, the handler is invoked immediately. * * The meaning of `cause` that is passed to the handler: - * - `null` if the channel was closed normally without the corresponding argument. - * - Instance of [CancellationException] if the channel was cancelled normally without the corresponding argument. + * - `null` if the channel was [closed][close] normally with `cause = null`. + * - Instance of [CancellationException] if the channel was [cancelled][ReceiveChannel.cancel] normally + * without the corresponding argument. * - The cause of `close` or `cancel` otherwise. * * ### Execution context and exception safety * - * The [handler] is executed as part of the closing or cancelling operation, and only after the channel reaches its final state. - * This means that if the handler throws an exception or hangs, the channel will still be successfully closed or cancelled. + * The [handler] is executed as part of the closing or cancelling operation, + * and only after the channel reaches its final state. + * This means that if the handler throws an exception or hangs, + * the channel will still be successfully closed or cancelled. * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller. * * Example of usage: * ``` - * val events = Channel(UNLIMITED) + * val events = Channel(Channel.UNLIMITED) * callbackBasedApi.registerCallback { event -> * events.trySend(event) * .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ } @@ -132,7 +305,7 @@ public interface SendChannel { * This restriction could be lifted in the future. * * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose]. - * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations + * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations. * * @throws IllegalStateException if another handler was already registered */ @@ -178,19 +351,39 @@ public interface SendChannel { } /** - * Receiver's interface to [Channel]. + * Receiver's interface to a [Channel]. + * + * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface. */ public interface ReceiveChannel { /** - * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel] - * side and all previously sent items were already received, or if the receiving side was [cancelled][ReceiveChannel.cancel]. + * Returns `true` if either the sending side of this channel was [closed][SendChannel.close] + * and all previously sent items were already received (which also happens for [cancelled][cancel] channels). * - * This means that calling [receive] will result in a [ClosedReceiveChannelException] or a corresponding cancellation cause. - * If the channel was closed because of an exception, it is considered closed, too, but is called a _failed_ channel. - * All suspending attempts to receive an element from a failed channel throw the original [close][SendChannel.close] cause exception. + * Note that if this property returns `false`, + * it does not guarantee that a subsequent call to [receive] will succeed, + * as the channel can be concurrently cancelled or closed right after the check. + * For such scenarios, [receiveCatching] is the more robust solution: + * if the channel is closed, instead of throwing an exception, [receiveCatching] returns a result that allows + * querying it. * - * Note that if this property returns `false`, it does not guarantee that consecutive call to [receive] will succeed, as the - * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [receiveCatching] instead. + * ``` + * // DANGER! THIS CHECK IS NOT RELIABLE! + * if (!channel.isClosedForReceive) { + * channel.receive() // can still fail! + * } else { + * println("Can not receive: the channel is closed") + * null + * } + * // DO THIS INSTEAD: + * channel.receiveCatching().onClosed { + * println("Can not receive: the channel is closed") + * }.getOrNull() + * ``` + * + * The primary intended usage of this property is for assertions and diagnostics to verify the expected state of + * the channel. + * Using it in production code is discouraged. * * @see ReceiveChannel.receiveCatching * @see ReceiveChannel.cancel @@ -200,107 +393,302 @@ public interface ReceiveChannel { public val isClosedForReceive: Boolean /** - * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend. - * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive]. + * Returns `true` if the channel contains no elements and isn't [closed for `receive`][isClosedForReceive]. + * + * If [isEmpty] returns `true`, it means that calling [receive] at exactly the same moment would suspend. + * However, calling [receive] immediately after checking [isEmpty] may or may not suspend, as new elements + * could have been added or removed or the channel could have been closed for `receive` between the two invocations. + * Consider using [tryReceive] in cases when suspensions are undesirable: + * + * ``` + * // DANGER! THIS CHECK IS NOT RELIABLE! + * while (!channel.isEmpty) { + * // can still suspend in other `receive` happens in parallel! + * val element = channel.receive() + * println(element) + * } + * // DO THIS INSTEAD: + * while (true) { + * val element = channel.tryReceive().getOrNull() ?: break + * println(element) + * } + * ``` */ @ExperimentalCoroutinesApi public val isEmpty: Boolean /** - * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, - * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive]. - * If the channel was closed because of an exception, it is called a _failed_ channel and this function - * will throw the original [close][SendChannel.close] cause exception. - * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled while this - * function is suspended, this function immediately resumes with a [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel, - * but then throw [CancellationException], thus failing to deliver the element. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * Retrieves an element, removing it from the channel. + * + * This function suspends if the channel is empty, waiting until an element is available. + * If the channel is [closed for `receive`][isClosedForReceive], an exception is thrown (see below). + * ``` + * val channel = Channel() + * launch { + * val element = channel.receive() // suspends until 5 is available + * check(element == 5) + * } + * channel.send(5) + * ``` + * + * ## Suspending and cancellation * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [receive] managed to retrieve the element from the channel, - * but was cancelled while suspended, [CancellationException] will be thrown. - * See [suspendCancellableCoroutine] for low-level details. + * but was cancelled while suspended, [CancellationException] will be thrown, and, if + * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. + * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation. * - * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * Note that this function does not check for cancellation when it manages to immediately receive an element without + * suspending. + * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed: * - * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * ``` + * val channel = Channel() + * launch { // a very fast producer + * while (true) { + * channel.send(42) + * } + * } + * val consumer = launch { // a slow consumer + * while (isActive) { + * val element = channel.receive() + * // some slow computation involving `element` + * } + * } + * delay(100.milliseconds) + * consumer.cancelAndJoin() + * ``` + * + * ## Receiving from a closed channel + * + * - Attempting to [receive] from a [closed][SendChannel.close] channel while there are still some elements + * will successfully retrieve an element from the channel. + * - When a channel is [closed][SendChannel.close] and there are no elements remaining, + * the channel becomes [closed for `receive`][isClosedForReceive]. + * After that, [receive] will rethrow the exact exception that was passed to [SendChannel.close], + * or [ClosedReceiveChannelException] if none was given. + * + * ## Related * * This function can be used in [select] invocations with the [onReceive] clause. * Use [tryReceive] to try receiving from this channel without waiting. + * Use [receiveCatching] to receive from this channel without throwing. */ public suspend fun receive(): E /** * Clause for the [select] expression of the [receive] suspending function that selects with the element * received from the channel. - * The [select] invocation fails with an exception if the channel - * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details). + * + * The [select] invocation fails with an exception if the channel [is closed for `receive`][isClosedForReceive] + * at any point, even if other [select] clauses could still work. + * + * Example: + * ``` + * class ScreenSize(val width: Int, val height: Int) + * class MouseClick(val x: Int, val y: Int) + * val screenResizes = Channel(Channel.CONFLATED) + * val mouseClicks = Channel(Channel.CONFLATED) + * + * launch(Dispatchers.Main) { + * while (true) { + * select { + * screenResizes.onReceive { newSize -> + * // update the UI to the new screen size + * } + * mouseClicks.onReceive { click -> + * // react to a mouse click + * } + * } + * } + * } + * ``` + * + * Like [receive], [onReceive] obeys the rules of prompt cancellation: + * [select] may finish with a [CancellationException] even if an element was successfully retrieved, + * in which case the `onUndeliveredElement` callback will be called. */ public val onReceive: SelectClause1 /** - * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. - * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel - * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally. - * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed]. + * Retrieves an element, removing it from the channel. + * + * A difference from [receive] is that this function encapsulates a failure in its return value instead of throwing + * an exception. + * However, it will still throw [CancellationException] if the coroutine calling [receiveCatching] is cancelled. + * + * It is guaranteed that the only way this function can return a [failed][ChannelResult.isFailure] result is when + * the channel is [closed for `receive`][isClosedForReceive], so [ChannelResult.isClosed] is also true. + * + * This function suspends if the channel is empty, waiting until an element is available. + * If the channel is [closed for `receive`][isClosedForReceive], an exception is thrown (see below). + * ``` + * val channel = Channel() + * launch { + * while (true) { + * val result = channel.receiveCatching() // suspends + * when (val element = result.getOrNull()) { + * null -> break // the channel is closed + * else -> check(element == 5) + * } + * } + * } + * channel.send(5) + * ``` + * + * ## Suspending and cancellation * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [receiveCatching] managed to retrieve the element from the - * channel, but was cancelled while suspended, [CancellationException] will be thrown. - * See [suspendCancellableCoroutine] for low-level details. + * channel, but was cancelled while suspended, [CancellationException] will be thrown, and, if + * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. + * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation. * - * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * Note that this function does not check for cancellation when it manages to immediately receive an element without + * suspending. + * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed: * - * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * ``` + * val channel = Channel() + * launch { // a very fast producer + * while (true) { + * channel.send(42) + * } + * } + * val consumer = launch { // a slow consumer + * while (isActive) { + * val element = channel.receiveCatching().getOrNull() ?: break + * // some slow computation involving `element` + * } + * } + * delay(100.milliseconds) + * consumer.cancelAndJoin() + * ``` + * + * ## Receiving from a closed channel + * + * - Attempting to [receiveCatching] from a [closed][SendChannel.close] channel while there are still some elements + * will successfully retrieve an element from the channel. + * - When a channel is [closed][SendChannel.close] and there are no elements remaining, + * the channel becomes [closed for `receive`][isClosedForReceive]. + * After that, [receiveCatching] will return a result with [ChannelResult.isClosed] set. + * [ChannelResult.exceptionOrNull] will be the exact exception that was passed to [SendChannel.close], + * or `null` if none was given. + * + * ## Related * * This function can be used in [select] invocations with the [onReceiveCatching] clause. * Use [tryReceive] to try receiving from this channel without waiting. + * Use [receive] to receive from this channel and throw exceptions on error. */ public suspend fun receiveCatching(): ChannelResult /** - * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value - * that is received from the channel or with a close cause if the channel - * [is closed for `receive`][isClosedForReceive]. + * Clause for the [select] expression of the [receiveCatching] suspending function that selects + * with a [ChannelResult] when an element is retrieved or the channel gets closed. + * + * Like [receiveCatching], [onReceiveCatching] obeys the rules of prompt cancellation: + * [select] may finish with a [CancellationException] even if an element was successfully retrieved, + * in which case the `onUndeliveredElement` callback will be called. */ + // TODO: think of an example of when this could be useful public val onReceiveCatching: SelectClause1> /** - * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] - * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] - * result if the channel is closed. + * Attempts to retrieve an element, removing it from the channel. + * + * - When the channel is non-empty, a [successful][ChannelResult.isSuccess] result is returned, + * and [ChannelResult.getOrNull] returns the retrieved element. + * - When the channel is empty, a [failed][ChannelResult.isFailure] result is returned. + * - When the channel is already [closed for `receive`][isClosedForReceive], + * returns the ["channel is closed"][ChannelResult.isClosed] result. + * If the channel was [closed][SendChannel.close] with a cause (for example, [cancelled][cancel]), + * [ChannelResult.exceptionOrNull] contains the cause. + * + * This function is useful when implementing on-demand allocation of resources to be stored in the channel: + * + * ``` + * val resourcePool = Channel(maxResources) + * + * suspend fun withResource(block: (Resource) -> Unit) { + * val result = resourcePool.tryReceive() + * val resource = result.getOrNull() + * ?: tryCreateNewResource() // try to create a new resource + * ?: resourcePool.receive() // could not create: actually wait for the resource + * try { + * block(resource) + * } finally { + * resourcePool.trySend(resource) + * } + * } + * ``` */ public fun tryReceive(): ChannelResult /** * Returns a new iterator to receive elements from this channel using a `for` loop. * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and - * throws the original [close][SendChannel.close] cause exception if the channel has _failed_. + * throws the exception passed to [close][SendChannel.close] if there was one. + * + * Instances of [ChannelIterator] are not thread-safe and shall not be used from concurrent coroutines. + * + * Example: + * + * ``` + * val channel = produce { + * repeat(1000) { + * send(it) + * } + * } + * for (v in channel) { + * println(v) + * } + * ``` + * + * Note that is an early return happens from the `for` loop, the channel does not get cancelled. + * To forbid sending new elements after the iteration is completed, use [consumeEach] or + * call [close] manually. */ public operator fun iterator(): ChannelIterator /** - * Cancels reception of remaining elements from this channel with an optional [cause]. - * This function closes the channel and removes all buffered sent elements from it. + * [Closes][SendChannel.close] the channel for new elements and removes all existing ones. * - * A cause can be used to specify an error message or to provide other details on + * A [cause] can be used to specify an error message or to provide other details on * the cancellation reason for debugging purposes. * If the cause is not specified, then an instance of [CancellationException] with a * default message is created to [close][SendChannel.close] the channel. * - * Immediately after invocation of this function [isClosedForReceive] and - * [isClosedForSend][SendChannel.isClosedForSend] - * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel - * will lead to a [CancellationException]. + * If the channel was already [closed][SendChannel.close], + * [cancel] only has the effect of removing all elements from the channel. + * + * Immediately after the invocation of this function, + * [isClosedForReceive] and, on the [SendChannel] side, [isClosedForSend][SendChannel.isClosedForSend] + * start returning `true`. + * Any attempt to send to or receive from this channel will lead to a [CancellationException]. + * + * If the channel has an `onUndeliveredElement` callback installed, this function will invoke it for each of the + * elements still in the channel, since these elements will be inaccessible otherwise. + * If the callback is not installed, these elements will simply be removed from the channel for garbage collection. + * + * ``` + * val channel = Channel() + * channel.send(1) + * channel.send(2) + * channel.cancel() + * channel.trySend(3) // returns ChannelResult.isClosed + * for (element in channel) { println(element) } // prints nothing + * ``` + * + * [consume] and [consumeEach] are convenient shorthands for cancelling the channel after the single consumer + * has finished processing. */ public fun cancel(cause: CancellationException? = null) @@ -396,72 +784,152 @@ public interface ReceiveChannel { } /** - * A discriminated union of channel operation result. - * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with - * an optional cause. + * A discriminated union representing a channel operation result. + * It encapsulates the knowledge of whether the operation succeeded, failed with an option to retry, + * or failed because the channel was closed. + * + * If the operation was [successful][isSuccess], [T] is the result of the operation: + * for example, for [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive], + * it is the element received from the channel, and for [Channel.trySend], it is [Unit], + * as the channel does not receive anything in return for sending a channel. + * This value can be retrieved with [getOrNull] or [getOrThrow]. * - * The successful result represents a successful operation with a value of type [T], for example, - * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend]. + * If the operation [failed][isFailure], it does not necessarily mean that the channel itself is closed. + * For example, [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive] can fail because the channel is empty, + * and [Channel.trySend] can fail because the channel is full. * - * The failed result represents a failed operation attempt to a channel, but it doesn't necessarily indicate that the channel is failed. - * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. + * If the operation [failed][isFailure] because the channel was closed for that operation, [isClosed] returns `true`. + * The opposite is also true: if [isClosed] returns `true`, then the channel is closed for that operation + * ([ReceiveChannel.isClosedForReceive] or [SendChannel.isClosedForSend]). + * In this case, retrying the operation is meaningless: once closed, the channel will remain closed. + * The [exceptionOrNull] function returns the reason the channel was closed, if any was given. * - * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. - * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend] - * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving. + * Manually obtaining a [ChannelResult] instance is not supported. + * See the documentation for [ChannelResult]-returning functions for usage examples. */ @JvmInline public value class ChannelResult @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) { /** - * Returns `true` if this instance represents a successful - * operation outcome. + * Whether the operation succeeded. + * + * If this returns `true`, the operation was successful. + * In this case, [getOrNull] and [getOrThrow] can be used to retrieve the value. + * + * If this returns `false`, the operation failed. + * [isClosed] can be used to determine whether the operation failed because the channel was closed + * (and therefore retrying the operation is meaningless). * - * In this case [isFailure] and [isClosed] return `false`. + * ``` + * val result = channel.tryReceive() + * if (result.isSuccess) { + * println("Successfully received the value ${result.getOrThrow()}") + * } else { + * println("Failed to receive the value.") + * if (result.isClosed) { + * println("The channel is closed.") + * if (result.exceptionOrNull() != null) { + * println("The reason: ${result.exceptionOrNull()}") + * } + * } + * } + * ``` + * + * [isFailure] is a shorthand for `!isSuccess`. + * [getOrNull] can simplify [isSuccess] followed by [getOrThrow] into just one check if [T] is known + * to be non-nullable. */ public val isSuccess: Boolean get() = holder !is Failed /** - * Returns `true` if this instance represents unsuccessful operation. - * - * In this case [isSuccess] returns false, but it does not imply - * that the channel is failed or closed. + * Whether the operation failed. * - * Example of a failed operation without an exception and channel being closed - * is [Channel.trySend] attempt to a channel that is full. + * A shorthand for `!isSuccess`. See [isSuccess] for more details. */ public val isFailure: Boolean get() = holder is Failed /** - * Returns `true` if this instance represents unsuccessful operation - * to a closed or cancelled channel. + * Whether the operation failed because the channel was closed. + * + * If this returns `true`, the channel was closed for the operation that returned this result. + * In this case, retrying the operation is meaningless: once closed, the channel will remain closed. + * [isSuccess] will return `false`. + * [exceptionOrNull] can be used to determine the reason the channel was closed. * - * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply - * that [exceptionOrNull] returns non-null value. + * If this returns `false`, subsequent attempts to perform the same operation may succeed. * - * It can happen if the channel was [closed][Channel.close] normally without an exception. + * ``` + * val result = channel.trySend(42) + * if (result.isClosed) { + * println("The channel is closed.") + * if (result.exceptionOrNull() != null) { + * println("The reason: ${result.exceptionOrNull()}") + * } + * } */ public val isClosed: Boolean get() = holder is Closed /** - * Returns the encapsulated value if this instance represents success or `null` if it represents failed result. + * Returns the encapsulated [T] if the operation succeeded, or `null` if it failed. + * + * For non-nullable [T], the following code can be used to handle the result: + * ``` + * val result = channel.tryReceive() + * val value = result.getOrNull() + * if (value == null) { + * if (result.isClosed) { + * println("The channel is closed.") + * if (result.exceptionOrNull() != null) { + * println("The reason: ${result.exceptionOrNull()}") + * } + * } + * return + * } + * println("Successfully received the value $value") + * ``` + * + * If [T] is nullable, [getOrThrow] together with [isSuccess] is a more reliable way to handle the result. */ @Suppress("UNCHECKED_CAST") public fun getOrNull(): T? = if (holder !is Failed) holder as T else null /** - * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. + * Returns the encapsulated [T] if the operation succeeded, or throws the encapsulated exception if it failed. + * + * Example: + * ``` + * val result = channel.tryReceive() + * if (result.isSuccess) { + * println("Successfully received the value ${result.getOrThrow()}") + * } + * ``` + * + * @throws IllegalStateException if the operation failed because the channel is closed without a cause. */ public fun getOrThrow(): T { @Suppress("UNCHECKED_CAST") if (holder !is Failed) return holder as T if (holder is Closed && holder.cause != null) throw holder.cause - error("Trying to call 'getOrThrow' on a failed channel result: $holder") + error("Trying to call 'getOrThrow' on a channel closed without a cause") } /** - * Returns the encapsulated exception if this instance represents failure or `null` if it is success - * or unsuccessful operation to closed channel. + * Returns the exception with which the channel was closed, or `null` if the channel was not closed or was closed + * without a cause. + * + * [exceptionOrNull] can only return a non-`null` value if [isClosed] is `true`, + * but even if [isClosed] is `true`, + * [exceptionOrNull] can still return `null` if the channel was closed without a cause. + * + * ``` + * val result = channel.tryReceive() + * if (result.isClosed) { + * // Now we know not to retry the operation later. + * // Check if the channel was closed with a cause and rethrow the exception: + * result.exceptionOrNull()?.let { throw it } + * // Otherwise, the channel was closed without a cause. + * } + * ``` */ public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause @@ -503,9 +971,13 @@ public value class ChannelResult } /** - * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the - * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed - * result. + * Returns the encapsulated value if the operation [succeeded][ChannelResult.isSuccess], or the + * result of [onFailure] function for [ChannelResult.exceptionOrNull] otherwise. + * + * A shorthand for `if (isSuccess) getOrNull() else onFailure(exceptionOrNull())`. + * + * @see ChannelResult.getOrNull + * @see ChannelResult.exceptionOrNull */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.getOrElse(onFailure: (exception: Throwable?) -> T): T { @@ -517,8 +989,10 @@ public inline fun ChannelResult.getOrElse(onFailure: (exception: Throwabl } /** - * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess]. + * Performs the given [action] on the encapsulated value if the operation [succeeded][ChannelResult.isSuccess]. * Returns the original `ChannelResult` unchanged. + * + * A shorthand for `this.also { if (isSuccess) action(getOrThrow()) }`. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onSuccess(action: (value: T) -> Unit): ChannelResult { @@ -531,10 +1005,12 @@ public inline fun ChannelResult.onSuccess(action: (value: T) -> Unit): Ch } /** - * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]. + * Performs the given [action] if the operation [failed][ChannelResult.isFailure]. * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. * * Returns the original `ChannelResult` unchanged. + * + * A shorthand for `this.also { if (isFailure) action(exceptionOrNull()) }`. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult { @@ -546,13 +1022,16 @@ public inline fun ChannelResult.onFailure(action: (exception: Throwable?) } /** - * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure] - * due to channel being [closed][Channel.close]. + * Performs the given [action] if the operation failed because the channel was [closed][ChannelResult.isClosed] for + * that operation. * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend] * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation. * * Returns the original `ChannelResult` unchanged. + * + * A shorthand for `this.also { if (isClosed) action(exceptionOrNull()) }`. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult { @@ -564,17 +1043,20 @@ public inline fun ChannelResult.onClosed(action: (exception: Throwable?) } /** - * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used - * from concurrent coroutines. + * Iterator for a [ReceiveChannel]. + * Instances of this interface are *not thread-safe* and shall not be used from concurrent coroutines. */ public interface ChannelIterator { /** - * Returns `true` if the channel has more elements, suspending the caller while this channel is empty, - * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause. - * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. + * Prepare an element for retrieval by the invocation of [next]. * - * This function retrieves and removes an element from this channel for the subsequent invocation - * of [next]. + * - If the element that was retrieved by an earlier [hasNext] call was not yet consumed by [next], returns `true`. + * - If the channel has an element available, returns `true` and removes it from the channel. + * This element will be returned by the subsequent invocation of [next]. + * - If the channel is [closed for receiving][ReceiveChannel.isClosedForReceive] without a cause, returns `false`. + * - If the channel is closed with a cause, throws the original [close][SendChannel.close] cause exception. + * - If the channel is not closed but does not contain an element, + * suspends until either an element is sent to the channel or the channel gets closed. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -583,9 +1065,11 @@ public interface ChannelIterator { * See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + * See the "Undelivered elements" section in the [Channel] documentation + * for details on handling undelivered elements. * - * Note that this function does not check for cancellation when it is not suspended. + * Note that this function does not check for cancellation when it is not suspended, that is, + * if the next element is immediately available. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. */ public suspend operator fun hasNext(): Boolean @@ -604,66 +1088,72 @@ public interface ChannelIterator { } /** - * Retrieves the element removed from the channel by a preceding call to [hasNext], or + * Retrieves the element removed from the channel by the preceding call to [hasNext], or * throws an [IllegalStateException] if [hasNext] was not invoked. - * This method should only be used in pair with [hasNext]: + * + * This method can only be used together with [hasNext]: * ``` * while (iterator.hasNext()) { * val element = iterator.next() - * // ... handle element ... + * // ... handle the element ... + * } + * ``` + * + * A more idiomatic way to iterate over a channel is to use a `for` loop: + * ``` + * for (element in channel) { + * // ... handle the element ... * } * ``` * - * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause. - * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. + * This method never throws if [hasNext] returned `true`. + * If [hasNext] threw the cause with which the channel was closed, this method will rethrow the same exception. + * If [hasNext] returned `false` because the channel was closed without a cause, this method throws + * a [ClosedReceiveChannelException]. */ public operator fun next(): E } /** * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]). - * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue], + * Conceptually, a channel is similar to `java.util.concurrent.BlockingQueue`, * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close]. * - * ### Creating channels + * ### Channel capacity * - * The `Channel(capacity)` factory function is used to create channels of different kinds depending on - * the value of the `capacity` integer: + * Most ways to create a [Channel] (in particular, the `Channel()` factory function) allow specifying a capacity, + * which determines how elements are buffered in the channel. + * There are several predefined constants for the capacity that have special behavior: * - * - When `capacity` is 0 — it creates a _rendezvous_ channel. - * This channel does not have any buffer at all. An element is transferred from the sender - * to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends - * until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send]. + * - [Channel.RENDEZVOUS] (or 0) creates a _rendezvous_ channel, which does not have a buffer at all. + * Instead, the sender and the receiver must rendezvous (meet): + * [SendChannel.send] suspends until another coroutine invokes [ReceiveChannel.receive], and vice versa. + * - [Channel.CONFLATED] creates a buffer for a single element and automatically changes the + * [buffer overflow strategy][BufferOverflow] to [BufferOverflow.DROP_OLDEST]. + * - [Channel.UNLIMITED] creates a channel with an unlimited buffer, which never suspends the sender. + * - [Channel.BUFFERED] creates a channel with a buffer whose size depends on + * the [buffer overflow strategy][BufferOverflow]. * - * - When `capacity` is [Channel.UNLIMITED] — it creates a channel with effectively unlimited buffer. - * This channel has a linked-list buffer of unlimited capacity (limited only by available memory). - * [Sending][send] to this channel never suspends, and [trySend] always succeeds. + * See each constant's documentation for more details. * - * - When `capacity` is [Channel.CONFLATED] — it creates a _conflated_ channel - * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations, - * so that the receiver always gets the last element sent. - * Back-to-back sent elements are conflated — only the last sent element is received, - * while previously sent elements **are lost**. - * [Sending][send] to this channel never suspends, and [trySend] always succeeds. + * If the capacity is positive but less than [Channel.UNLIMITED], the channel has a buffer with the specified capacity. + * It is safe to construct a channel with a large buffer, as memory is only allocated gradually as elements are added. * - * - When `capacity` is positive but less than [UNLIMITED] — it creates an array-based channel with the specified capacity. - * This channel has an array buffer of a fixed `capacity`. - * [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty. + * Constructing a channel with a negative capacity not equal to a predefined constant is not allowed + * and throws an [IllegalArgumentException]. * - * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour - * of the channel's [send][Channel.send] function on buffer overflow: + * ### Buffer overflow * - * - [SUSPEND][BufferOverflow.SUSPEND] — the default, suspend `send` on buffer overflow until there is - * free space in the buffer. - * - [DROP_OLDEST][BufferOverflow.DROP_OLDEST] — do not suspend the `send`, add the latest value to the buffer, - * drop the oldest one from the buffer. - * A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel. - * - [DROP_LATEST][BufferOverflow.DROP_LATEST] — do not suspend the `send`, drop the value that is being sent, - * keep the buffer contents intact. + * Some ways to create a [Channel] also expose a [BufferOverflow] parameter (by convention, `onBufferOverflow`), + * which does not affect the receiver but determines the behavior of the sender when the buffer is full. + * The options include [suspending][BufferOverflow.SUSPEND] until there is space in the buffer, + * [dropping the oldest element][BufferOverflow.DROP_OLDEST] to make room for the new one, or + * [dropping the element to be sent][BufferOverflow.DROP_LATEST]. See the [BufferOverflow] documentation. * - * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and - * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which - * is a shortcut by itself. + * By convention, the default value for [BufferOverflow] whenever it can not be configured is [BufferOverflow.SUSPEND]. + * + * See the [Channel.RENDEZVOUS], [Channel.CONFLATED], and [Channel.UNLIMITED] documentation for a description of how + * they interact with the [BufferOverflow] parameter. * * ### Prompt cancellation guarantee * @@ -673,41 +1163,46 @@ public interface ChannelIterator { * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main], this gives a * guarantee that the coroutine promptly reacts to the cancellation of its [Job] and does not resume its execution. * - * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0` - * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions. - * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function. + * > **Prompt cancellation guarantee** for channel operations was added in `kotlinx.coroutines` version `1.4.0` + * > and has replaced the channel-specific atomic cancellation that was not consistent with other suspending functions. + * > The low-level mechanics of prompt cancellation are explained in the [suspendCancellableCoroutine] documentation. * * ### Undelivered elements * * As a result of the prompt cancellation guarantee, when a closeable resource - * (like open file or a handle to another native resource) is transferred via a channel from one coroutine to another, - * it can fail to be delivered and will be lost if the receiving operation is cancelled in transit. + * (like an open file or a handle to another native resource) is transferred via a channel, + * it can be successfully extracted from the channel, + * but still be lost if the receiving operation is cancelled in parallel. * - * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter. - * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element + * The `Channel()` constructor function has the optional parameter `onUndeliveredElement`. + * When that parameter is set, the corresponding function is called once for each element * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered, * which can happen in the following cases: * - * - When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually - * send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. - * - When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext] - * operation throws an exception when it had retrieved the element from the - * channel but was cancelled before the code following the receive call resumed. - * - The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every + * - When the sending operations like [send][SendChannel.send] or [onSend][SendChannel.onSend] + * throw an exception because it was cancelled + * before it had a chance to actually send the element + * or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. + * - When the receiving operations like [receive][ReceiveChannel.receive], + * [onReceive][ReceiveChannel.onReceive], or [hasNext][ChannelIterator.hasNext] + * throw an exception after retrieving the element from the channel + * because of being cancelled before the code following them had a chance to resume. + * - When the channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every * remaining element in the channel's buffer. * - * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking, - * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime - * exception which is either rethrown from the caller method or handed off to the exception handler in the current context + * Note that `onUndeliveredElement` is called synchronously in an arbitrary context. + * It should be fast, non-blocking, and should not throw exceptions. + * Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime exception + * which is either rethrown from the caller method or handed off to the exception handler in the current context * (see [CoroutineExceptionHandler]) when one is available. * * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The - * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel - * are cancelled. Resources are never lost. + * following code pattern guarantees that opened resources are closed even if the producer, the consumer, + * and/or the channel are cancelled. Resources are never lost. * * ``` - * // Create the channel with onUndeliveredElement block that closes a resource - * val channel = Channel(capacity) { resource -> resource.close() } + * // Create a channel with an onUndeliveredElement block that closes a resource + * val channel = Channel(onUndeliveredElement = { resource -> resource.close() }) * * // Producer code * val resourceToSend = openResource() @@ -722,8 +1217,8 @@ public interface ChannelIterator { * } * ``` * - * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should - * > ensure that resource gets closed in case this additional code fails. + * > Note that if any work happens between `openResource()` and `channel.send(...)`, + * > it is your responsibility to ensure that resource gets closed in case this additional code fails. */ public interface Channel : SendChannel, ReceiveChannel { /** @@ -731,26 +1226,112 @@ public interface Channel : SendChannel, ReceiveChannel { */ public companion object Factory { /** - * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function. + * An unlimited buffer capacity. + * + * `Channel(UNLIMITED)` creates a channel with an unlimited buffer, which never suspends the sender. + * The total amount of elements that can be sent to the channel is limited only by the available memory. + * + * If [BufferOverflow] is specified for the channel, it is completely ignored, + * as the channel never suspends the sender. + * + * ``` + * val channel = Channel(Channel.UNLIMITED) + * repeat(1000) { + * channel.trySend(it) + * } + * repeat(1000) { + * check(channel.tryReceive().getOrNull() == it) + * } + * ``` */ public const val UNLIMITED: Int = Int.MAX_VALUE /** - * Requests a rendezvous channel in the `Channel(...)` factory function — a channel that does not have a buffer. + * The zero buffer capacity. + * + * For the default [BufferOverflow] value of [BufferOverflow.SUSPEND], + * `Channel(RENDEZVOUS)` creates a channel without a buffer. + * An element is transferred from the sender to the receiver only when [send] and [receive] invocations meet + * in time (that is, they _rendezvous_), + * so [send] suspends until another coroutine invokes [receive], + * and [receive] suspends until another coroutine invokes [send]. + * + * ``` + * val channel = Channel(Channel.RENDEZVOUS) + * check(channel.trySend(5).isFailure) // sending fails: no receiver is waiting + * launch(start = CoroutineStart.UNDISPATCHED) { + * val element = channel.receive() // suspends + * check(element == 3) + * } + * check(channel.trySend(3).isSuccess) // sending succeeds: receiver is waiting + * ``` + * + * If a different [BufferOverflow] is specified, + * `Channel(RENDEZVOUS)` creates a channel with a buffer of size 1: + * + * ``` + * val channel = Channel(0, onBufferOverflow = BufferOverflow.DROP_OLDEST) + * // None of the calls suspend, since the buffer overflow strategy is not SUSPEND + * channel.send(1) + * channel.send(2) + * channel.send(3) + * check(channel.receive() == 3) + * ``` */ public const val RENDEZVOUS: Int = 0 /** - * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating - * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST]. + * A single-element buffer with conflating behavior. + * + * Specifying [CONFLATED] as the capacity in the `Channel(...)` factory function is equivalent to + * creating a channel with a buffer of size 1 and a [BufferOverflow] strategy of [BufferOverflow.DROP_OLDEST]: + * `Channel(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)`. + * Such a channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations + * so that the receiver always gets the last element sent, **losing** the previously sent elements. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. + * + * ``` + * val channel = Channel(Channel.CONFLATED) + * channel.send(1) + * channel.send(2) + * channel.send(3) + * check(channel.receive() == 3) + * ``` + * + * Specifying a [BufferOverflow] other than [BufferOverflow.SUSPEND] is not allowed with [CONFLATED], and + * an [IllegalArgumentException] is thrown if such a combination is used. + * For creating a conflated channel that instead keeps the existing element in the channel and throws out + * the new one, use `Channel(1, onBufferOverflow = BufferOverflow.DROP_LATEST)`. */ public const val CONFLATED: Int = -1 /** - * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function. - * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow - * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM. - * For non-suspending channels, a buffer of capacity 1 is used. + * A channel capacity marker that is substituted by the default buffer capacity. + * + * When passed as a parameter to the `Channel(...)` factory function, the default buffer capacity is used. + * For [BufferOverflow.SUSPEND] (the default buffer overflow strategy), the default capacity is 64, + * but on the JVM it can be overridden by setting the [DEFAULT_BUFFER_PROPERTY_NAME] system property. + * + * ``` + * val channel = Channel(Channel.BUFFERED) + * repeat(100) { + * channel.trySend(it) + * } + * channel.close() + * // The check can fail if the default buffer capacity is changed + * check(channel.toList() == (0 until 64).toList()) + * ``` + * + * If a different [BufferOverflow] is specified, `Channel(BUFFERED)` creates a channel with a buffer of size 1: + * + * ``` + * val channel = Channel(Channel.BUFFERED, onBufferOverflow = BufferOverflow.DROP_OLDEST) + * channel.send(1) + * channel.send(2) + * channel.send(3) + * channel.close() + * check(channel.toList() == listOf(3)) + * ``` */ public const val BUFFERED: Int = -2 @@ -758,8 +1339,9 @@ public interface Channel : SendChannel, ReceiveChannel { internal const val OPTIONAL_CHANNEL = -3 /** - * Name of the property that defines the default channel capacity when - * [BUFFERED] is used as parameter in `Channel(...)` factory function. + * Name of the JVM system property for the default channel capacity (64 by default). + * + * See [BUFFERED] for details on how this property is used. */ public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer" @@ -770,16 +1352,55 @@ public interface Channel : SendChannel, ReceiveChannel { } /** - * Creates a channel with the specified buffer capacity (or without a buffer by default). - * See [Channel] interface documentation for details. + * Creates a channel. See the [Channel] interface documentation for details. + * + * This function is the most flexible way to create a channel. + * It allows specifying the channel's capacity, buffer overflow strategy, and an optional function to call + * to handle undelivered elements. + * + * ``` + * val allocatedResources = HashSet() + * // An autocloseable resource that must be closed when it is no longer needed + * class Resource(val id: Int): AutoCloseable { + * init { + * allocatedResources.add(id) + * } + * override fun close() { + * allocatedResources.remove(id) + * } + * } + * // A channel with a 15-element buffer that drops the oldest element on buffer overflow + * // and closes the elements that were not delivered to the consumer + * val channel = Channel( + * capacity = 15, + * onBufferOverflow = BufferOverflow.DROP_OLDEST, + * onUndeliveredElement = { element -> element.close() } + * ) + * // A sender's view of the channel + * val sendChannel: SendChannel = channel + * repeat(100) { + * sendChannel.send(Resource(it)) + * } + * sendChannel.close() + * // A receiver's view of the channel + * val receiveChannel: ReceiveChannel = channel + * val receivedResources = receiveChannel.toList() + * // Check that the last 15 sent resources were received + * check(receivedResources.map { it.id } == (85 until 100).toList()) + * // Close the resources that were successfully received + * receivedResources.forEach { it.close() } + * // The dropped resources were closed by the channel itself + * check(allocatedResources.isEmpty()) + * ``` + * + * For a full explanation of every parameter and their interaction, see the [Channel] interface documentation. * * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory]. - * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to - * a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value, - * supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`, - * implicitly creates a channel with at least one buffered element). - * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer. - * See "Undelivered elements" section in [Channel] documentation. + * See the "Channel capacity" section in the [Channel] documentation. + * @param onBufferOverflow configures an action on buffer overflow. + * See the "Buffer overflow" section in the [Channel] documentation. + * @param onUndeliveredElement a function that is called when element was sent but was not delivered to the consumer. + * See the "Undelivered elements" section in the [Channel] documentation. * @throws IllegalArgumentException when [capacity] < -2 */ public fun Channel( @@ -815,20 +1436,29 @@ public fun Channel( public fun Channel(capacity: Int = RENDEZVOUS): Channel = Channel(capacity) /** - * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel - * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause - * exception on send attempts. + * Indicates an attempt to [send][SendChannel.send] to a [closed-for-sending][SendChannel.isClosedForSend] channel + * that was [closed][SendChannel.close] without a cause. * - * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility - * to close the channel and not try to send anything thereafter. Attempts to - * send to a closed channel indicate a logical error in the sender's code. + * If a cause was provided, that cause is thrown from [send][SendChannel.send] instead of this exception. + * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel], + * this exception will never be thrown: either the `cause` of the cancellation is thrown, + * or a new [CancellationException] gets constructed to be thrown from [SendChannel.send]. + * + * This exception is a subclass of [IllegalStateException], because the sender should not attempt to send to a closed + * channel after it itself has [closed][SendChannel.close] it. + * Usually, this exception can be avoided altogether by restructuring the code. */ public class ClosedSendChannelException(message: String?) : IllegalStateException(message) /** - * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive] - * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause - * exception on receive attempts. + * Indicates an attempt to [receive][ReceiveChannel.receive] from a + * [closed-for-receiving][ReceiveChannel.isClosedForReceive] channel + * that was [closed][SendChannel.close] without a cause. + * + * If a clause was provided, that clause is thrown from [receive][ReceiveChannel.receive] instead of this exception. + * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel], + * this exception will never be thrown: either the `cause` of the cancellation is thrown, + * or a new [CancellationException] gets constructed to be thrown from [ReceiveChannel.receive]. * * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections. */ diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 7a955597f1..995db24025 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -81,6 +81,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { * The kind of the resulting channel depends on the specified [capacity] parameter. * See the [Channel] interface documentation for details. * By default, an unbuffered channel is created. + * If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown. * * ### Behavior on termination * diff --git a/kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt index 39b9f9d755..d314f8b1f4 100644 --- a/kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt @@ -5,6 +5,20 @@ import kotlinx.coroutines.* import kotlin.test.* class BufferedChannelTest : TestBase() { + + /** Tests that a buffered channel does not consume enough memory to fail with an OOM. */ + @Test + fun testMemoryConsumption() = runTest { + val largeChannel = Channel(Int.MAX_VALUE / 2) + repeat(10_000) { + largeChannel.send(it) + } + repeat(10_000) { + val element = largeChannel.receive() + assertEquals(it, element) + } + } + @Test fun testIteratorHasNextIsIdempotent() = runTest { val q = Channel() diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 654982ea8f..c8e9667fb2 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -266,6 +266,13 @@ class ProduceTest : TestBase() { } } + @Test + fun testProduceWithInvalidCapacity() = runTest { + assertFailsWith { + produce(capacity = -3) { } + } + } + private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply { val source = Channel() expect(1) diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index 8fd41b8376..00aa1925d2 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -273,4 +273,24 @@ class RendezvousChannelTest : TestBase() { channel.cancel(TestCancellationException()) channel.receiveCatching().getOrThrow() } + + /** Tests that [BufferOverflow.DROP_OLDEST] takes precedence over [Channel.RENDEZVOUS]. */ + @Test + fun testDropOldest() = runTest { + val channel = Channel(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_OLDEST) + channel.send(1) + channel.send(2) + channel.send(3) + assertEquals(3, channel.receive()) + } + + /** Tests that [BufferOverflow.DROP_LATEST] takes precedence over [Channel.RENDEZVOUS]. */ + @Test + fun testDropLatest() = runTest { + val channel = Channel(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_LATEST) + channel.send(1) + channel.send(2) + channel.send(3) + assertEquals(1, channel.receive()) + } } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index 36faaf4e52..ad9ec556a8 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -419,10 +419,10 @@ class SelectRendezvousChannelTest : TestBase() { fun testSelectSendWhenClosed() = runTest { expect(1) val c = Channel(Channel.RENDEZVOUS) - val sender = launch(start = CoroutineStart.UNDISPATCHED) { + launch(start = CoroutineStart.UNDISPATCHED) { expect(2) c.send(1) // enqueue sender - expectUnreached() + finish(4) } c.close() // then close assertFailsWith { @@ -434,8 +434,7 @@ class SelectRendezvousChannelTest : TestBase() { } } } - sender.cancel() - finish(4) + assertEquals(1, c.receive()) } // only for debugging diff --git a/kotlinx-coroutines-core/jvm/test/channels/CancelledChannelLeakTest.kt b/kotlinx-coroutines-core/jvm/test/channels/CancelledChannelLeakTest.kt new file mode 100644 index 0000000000..06108a6fe6 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/CancelledChannelLeakTest.kt @@ -0,0 +1,27 @@ +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlinx.coroutines.testing.* +import kotlin.test.* + +class CancelledChannelLeakTest : TestBase() { + /** + * Tests that cancellation removes the elements from the channel's buffer. + */ + @Test + fun testBufferedChannelLeak() = runTest { + for (capacity in listOf(Channel.CONFLATED, Channel.RENDEZVOUS, 1, 2, 5, 10)) { + val channel = Channel(capacity) + val value = X() + launch(start = CoroutineStart.UNDISPATCHED) { + channel.send(value) + } + FieldWalker.assertReachableCount(1, channel) { it === value } + channel.cancel() + // the element must be removed so that there is no memory leak + FieldWalker.assertReachableCount(0, channel) { it === value } + } + } + + class X +} \ No newline at end of file From 739d2678f510b35dcd26eddcfa4d682722b2600a Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:23:08 +0200 Subject: [PATCH 02/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index e2533a0d9f..50d39c5cf7 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -1174,7 +1174,7 @@ public interface ChannelIterator { * it can be successfully extracted from the channel, * but still be lost if the receiving operation is cancelled in parallel. * - * The `Channel()` constructor function has the optional parameter `onUndeliveredElement`. + * The `Channel()` factory function has the optional parameter `onUndeliveredElement`. * When that parameter is set, the corresponding function is called once for each element * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered, * which can happen in the following cases: From 6d625cb8e88dfd0c2050f5297a6f2acd83433f09 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:23:33 +0200 Subject: [PATCH 03/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 50d39c5cf7..a65fa0c2d7 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -652,7 +652,7 @@ public interface ReceiveChannel { * } * ``` * - * Note that is an early return happens from the `for` loop, the channel does not get cancelled. + * Note that if an early return happens from the `for` loop, the channel does not get cancelled. * To forbid sending new elements after the iteration is completed, use [consumeEach] or * call [close] manually. */ From 053d1726919f7d777d2a3e4eb13c7c43bc8d901c Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:26:57 +0200 Subject: [PATCH 04/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index a65fa0c2d7..59ba5d17bc 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -475,7 +475,7 @@ public interface ReceiveChannel { * ## Related * * This function can be used in [select] invocations with the [onReceive] clause. - * Use [tryReceive] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting and throwing. * Use [receiveCatching] to receive from this channel without throwing. */ public suspend fun receive(): E From aad3890bfcbf7612b625e49366f8f87a13766aad Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:27:35 +0200 Subject: [PATCH 05/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 59ba5d17bc..1b92115af1 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -403,7 +403,7 @@ public interface ReceiveChannel { * ``` * // DANGER! THIS CHECK IS NOT RELIABLE! * while (!channel.isEmpty) { - * // can still suspend in other `receive` happens in parallel! + * // can still suspend if other `receive` happens in parallel! * val element = channel.receive() * println(element) * } From 52688afd4bac1e2ff155af09c211b75538f616ca Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:28:50 +0200 Subject: [PATCH 06/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 1b92115af1..711296d2be 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -1319,7 +1319,7 @@ public interface Channel : SendChannel, ReceiveChannel { * } * channel.close() * // The check can fail if the default buffer capacity is changed - * check(channel.toList() == (0 until 64).toList()) + * check(channel.toList() == (0..<64).toList()) * ``` * * If a different [BufferOverflow] is specified, `Channel(BUFFERED)` creates a channel with a buffer of size 1: From e5a9ce39395761cb143266315adcdb1dc53b804f Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:29:21 +0200 Subject: [PATCH 07/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 711296d2be..84ac0bf70a 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -1070,7 +1070,7 @@ public interface ChannelIterator { * * Note that this function does not check for cancellation when it is not suspended, that is, * if the next element is immediately available. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. */ public suspend operator fun hasNext(): Boolean From 1cfe7ed2d9e453815ebd8514b8eaade37d29ceab Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:31:05 +0200 Subject: [PATCH 08/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 84ac0bf70a..22803fae4c 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -654,7 +654,7 @@ public interface ReceiveChannel { * * Note that if an early return happens from the `for` loop, the channel does not get cancelled. * To forbid sending new elements after the iteration is completed, use [consumeEach] or - * call [close] manually. + * call [cancel] manually. */ public operator fun iterator(): ChannelIterator From 48c61bc599531bb74cc58711e016ff6acbc2e4d9 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:33:02 +0200 Subject: [PATCH 09/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 22803fae4c..378754842c 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -585,7 +585,7 @@ public interface ReceiveChannel { * ## Related * * This function can be used in [select] invocations with the [onReceiveCatching] clause. - * Use [tryReceive] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting and throwing. * Use [receive] to receive from this channel and throw exceptions on error. */ public suspend fun receiveCatching(): ChannelResult From 3b5003929282178f00687743d0b93f18d87d5b50 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 2 Oct 2024 10:01:53 +0200 Subject: [PATCH 10/13] Fixes after a review --- .../common/src/channels/Channel.kt | 28 ++++++++++++------- .../common/src/channels/Produce.kt | 9 +++--- .../jvm/src/channels/Actor.kt | 4 +-- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 378754842c..f6a25646cf 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -136,7 +136,7 @@ public interface SendChannel { * If a channel was [closed][close] before [send] was called and no cause was specified, * an [ClosedSendChannelException] will be thrown from [send]. * If a channel was [closed][close] with a cause before [send] was called, - * then [send] will rethrow the exact object that was passed to [close]. + * then [send] will rethrow the same (in the `===` sense) exception that was passed to [close]. * * In both cases, it is guaranteed that the element was not delivered to the consumer, * and the `onUndeliveredElement` callback will be called. @@ -357,7 +357,7 @@ public interface SendChannel { */ public interface ReceiveChannel { /** - * Returns `true` if either the sending side of this channel was [closed][SendChannel.close] + * Returns `true` if the sending side of this channel was [closed][SendChannel.close] * and all previously sent items were already received (which also happens for [cancelled][cancel] channels). * * Note that if this property returns `false`, @@ -469,7 +469,8 @@ public interface ReceiveChannel { * will successfully retrieve an element from the channel. * - When a channel is [closed][SendChannel.close] and there are no elements remaining, * the channel becomes [closed for `receive`][isClosedForReceive]. - * After that, [receive] will rethrow the exact exception that was passed to [SendChannel.close], + * After that, + * [receive] will rethrow the same (in the `===` sense) exception that was passed to [SendChannel.close], * or [ClosedReceiveChannelException] if none was given. * * ## Related @@ -524,8 +525,8 @@ public interface ReceiveChannel { * It is guaranteed that the only way this function can return a [failed][ChannelResult.isFailure] result is when * the channel is [closed for `receive`][isClosedForReceive], so [ChannelResult.isClosed] is also true. * - * This function suspends if the channel is empty, waiting until an element is available. - * If the channel is [closed for `receive`][isClosedForReceive], an exception is thrown (see below). + * This function suspends if the channel is empty, waiting until an element is available or the channel becomes + * closed. * ``` * val channel = Channel() * launch { @@ -579,7 +580,8 @@ public interface ReceiveChannel { * - When a channel is [closed][SendChannel.close] and there are no elements remaining, * the channel becomes [closed for `receive`][isClosedForReceive]. * After that, [receiveCatching] will return a result with [ChannelResult.isClosed] set. - * [ChannelResult.exceptionOrNull] will be the exact exception that was passed to [SendChannel.close], + * [ChannelResult.exceptionOrNull] will be the exact (in the `===` sense) exception + * that was passed to [SendChannel.close], * or `null` if none was given. * * ## Related @@ -904,13 +906,16 @@ public value class ChannelResult * } * ``` * - * @throws IllegalStateException if the operation failed because the channel is closed without a cause. + * @throws IllegalStateException if the operation failed, but the channel was not claused with a cause. */ public fun getOrThrow(): T { @Suppress("UNCHECKED_CAST") if (holder !is Failed) return holder as T - if (holder is Closed && holder.cause != null) throw holder.cause - error("Trying to call 'getOrThrow' on a channel closed without a cause") + if (holder is Closed) { + check(holder.cause != null) { "Trying to call 'getOrThrow' on a channel closed without a cause" } + throw holder.cause + } + error("Trying to call 'getOrThrow' on a failed result of a non-closed channel") } /** @@ -1179,6 +1184,8 @@ public interface ChannelIterator { * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered, * which can happen in the following cases: * + * - When an element is dropped due to the limited buffer capacity. + * This can happen when the overflow strategy is [BufferOverflow.DROP_LATEST] and [BufferOverflow.DROP_OLDEST]. * - When the sending operations like [send][SendChannel.send] or [onSend][SendChannel.onSend] * throw an exception because it was cancelled * before it had a chance to actually send the element @@ -1287,7 +1294,8 @@ public interface Channel : SendChannel, ReceiveChannel { * creating a channel with a buffer of size 1 and a [BufferOverflow] strategy of [BufferOverflow.DROP_OLDEST]: * `Channel(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)`. * Such a channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations - * so that the receiver always gets the last element sent, **losing** the previously sent elements. + * so that the receiver always gets the last element sent, **losing** the previously sent elements: + * see the "Undelivered elements" section in the [Channel] documentation. * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * ``` diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 995db24025..e746c37d13 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -115,9 +115,9 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { * channel.cancel() * ``` * - * If this coroutine finishes with an exception, it will close the channel with that exception as the cause and - * the resulting channel will become _failed_, so after receiving all the existing elements, all further attempts - * to receive from it will throw the exception with which the coroutine finished. + * If this coroutine finishes with an exception, it will close the channel with that exception as the cause, + * so after receiving all the existing elements, + * all further attempts to receive from it will throw the exception with which the coroutine finished. * * ``` * val produceJob = Job() @@ -151,8 +151,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { * } // throws a `CancellationException` exception after reaching -1 * ``` * - * Note that cancelling `produce` via structured concurrency closes the channel with a cause, - * making it a _failed_ channel. + * Note that cancelling `produce` via structured concurrency closes the channel with a cause. * * The behavior around coroutine cancellation and error handling is experimental and may change in a future release. * diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index 62eb19e747..ef74a08f35 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -45,8 +45,8 @@ public interface ActorScope : CoroutineScope, ReceiveChannel { * it will be started implicitly on the first message * [sent][SendChannel.send] to this actors's mailbox channel. * - * Uncaught exceptions in this coroutine close the channel with this exception as a cause and - * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception. + * Uncaught exceptions in this coroutine close the channel with this exception as a cause, + * so that any attempt to send to such a channel throws exception. * * The kind of the resulting channel depends on the specified [capacity] parameter. * See [Channel] interface documentation for details. From dfe52b7440aa49d37d9cbdb08f5eedb91201fd87 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:37:58 +0200 Subject: [PATCH 11/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index f6a25646cf..d0d9d602a6 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -906,7 +906,7 @@ public value class ChannelResult * } * ``` * - * @throws IllegalStateException if the operation failed, but the channel was not claused with a cause. + * @throws IllegalStateException if the operation failed, but the channel was not closed with a cause. */ public fun getOrThrow(): T { @Suppress("UNCHECKED_CAST") From 5de0d4c2dfa0c91c1f757b8bfcc017711926c3aa Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:38:10 +0200 Subject: [PATCH 12/13] Update kotlinx-coroutines-core/common/src/channels/Channel.kt Co-authored-by: Luca Kellermann --- kotlinx-coroutines-core/common/src/channels/Channel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index d0d9d602a6..72c4404546 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -1185,7 +1185,7 @@ public interface ChannelIterator { * which can happen in the following cases: * * - When an element is dropped due to the limited buffer capacity. - * This can happen when the overflow strategy is [BufferOverflow.DROP_LATEST] and [BufferOverflow.DROP_OLDEST]. + * This can happen when the overflow strategy is [BufferOverflow.DROP_LATEST] or [BufferOverflow.DROP_OLDEST]. * - When the sending operations like [send][SendChannel.send] or [onSend][SendChannel.onSend] * throw an exception because it was cancelled * before it had a chance to actually send the element From c10a0f369c02e97df0deb1456770543949b3cf25 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 25 Oct 2024 12:14:43 +0200 Subject: [PATCH 13/13] Address review --- .../common/src/channels/Channel.kt | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 72c4404546..b4b80fb789 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -604,7 +604,7 @@ public interface ReceiveChannel { public val onReceiveCatching: SelectClause1> /** - * Attempts to retrieve an element, removing it from the channel. + * Attempts to retrieve an element without waiting, removing it from the channel. * * - When the channel is non-empty, a [successful][ChannelResult.isSuccess] result is returned, * and [ChannelResult.getOrNull] returns the retrieved element. @@ -675,6 +675,8 @@ public interface ReceiveChannel { * [isClosedForReceive] and, on the [SendChannel] side, [isClosedForSend][SendChannel.isClosedForSend] * start returning `true`. * Any attempt to send to or receive from this channel will lead to a [CancellationException]. + * This also applies to the existing senders and receivers that are suspended at the time of the call: + * they will be resumed with a [CancellationException] immediately after [cancel] is called. * * If the channel has an `onUndeliveredElement` callback installed, this function will invoke it for each of the * elements still in the channel, since these elements will be inaccessible otherwise. @@ -856,7 +858,8 @@ public value class ChannelResult * If this returns `true`, the channel was closed for the operation that returned this result. * In this case, retrying the operation is meaningless: once closed, the channel will remain closed. * [isSuccess] will return `false`. - * [exceptionOrNull] can be used to determine the reason the channel was closed. + * [exceptionOrNull] can be used to determine the reason the channel was [closed][SendChannel.close] + * if one was given. * * If this returns `false`, subsequent attempts to perform the same operation may succeed. * @@ -1319,6 +1322,8 @@ public interface Channel : SendChannel, ReceiveChannel { * When passed as a parameter to the `Channel(...)` factory function, the default buffer capacity is used. * For [BufferOverflow.SUSPEND] (the default buffer overflow strategy), the default capacity is 64, * but on the JVM it can be overridden by setting the [DEFAULT_BUFFER_PROPERTY_NAME] system property. + * The overridden value is used for all channels created with a default buffer capacity, + * including those created in third-party libraries. * * ``` * val channel = Channel(Channel.BUFFERED) @@ -1350,7 +1355,15 @@ public interface Channel : SendChannel, ReceiveChannel { * Name of the JVM system property for the default channel capacity (64 by default). * * See [BUFFERED] for details on how this property is used. + * + * Setting this property affects the default channel capacity for channel constructors, + * channel-backed coroutines and flow operators that imply channel usage, + * including ones defined in 3rd-party libraries. + * + * Usage of this property is highly discouraged and is intended to be used as a last-ditch effort + * as an immediate measure for hot fixes and duct-taping. */ + @DelicateCoroutinesApi public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer" internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME, @@ -1453,7 +1466,7 @@ public fun Channel(capacity: Int = RENDEZVOUS): Channel = Channel(capacit * or a new [CancellationException] gets constructed to be thrown from [SendChannel.send]. * * This exception is a subclass of [IllegalStateException], because the sender should not attempt to send to a closed - * channel after it itself has [closed][SendChannel.close] it. + * channel after it itself has [closed][SendChannel.close] it, and indicates an error on the part of the programmer. * Usually, this exception can be avoided altogether by restructuring the code. */ public class ClosedSendChannelException(message: String?) : IllegalStateException(message)