Skip to content

Extend the KDoc for some channel APIs #4148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8cafba2
Reword the BufferOverflow KDoc for consistency in the entry list
globsterg Jun 2, 2024
b40cf97
Describe the situations in which BufferOverflow options are useful
globsterg Jun 2, 2024
776adad
Expand the documentation for channel consumption functions
globsterg Jun 4, 2024
1b70f42
Specify the behavior of Channel.consumeEach on scope cancellation
globsterg Jun 4, 2024
2f4490e
Extend the documentation for `ProducerScope.awaitClose`
globsterg Jun 4, 2024
34b239f
Reword a misleading statement in the `produce` documentation
globsterg Jun 5, 2024
60b1f10
Document `awaitClose` and `invokeOnClose` interactions
globsterg Jun 5, 2024
2375d0f
Document how consuming operators handle failed channels
globsterg Jun 5, 2024
d09b4f7
Document cancelling the coroutine but not the channel of `produce`
globsterg Jun 6, 2024
56b512d
Don't use the magic constant 0 in default parameters of `produce`
globsterg Jun 6, 2024
4e4afd8
Fix an incorrect statement in `produce` docs
globsterg Jun 6, 2024
53b5184
Add samples to the `produce` documentation and restructure it
globsterg Jun 6, 2024
08dc948
Apply suggestions from code review
globsterg Jul 9, 2024
e570d99
Wrap a branch in braces
dkhalanskyjb Jul 16, 2024
1459060
Clarify the contract of ReceiveChannel.consume
dkhalanskyjb Jul 17, 2024
1c3e51e
Clarify the contract of consumeEach
dkhalanskyjb Jul 17, 2024
3b3c94a
Fix the Channel.toList documentation
dkhalanskyjb Jul 17, 2024
dbe9b2a
Clarify that produce() may lose elements
dkhalanskyjb Jul 17, 2024
3bc19f7
Remove a code snippet
dkhalanskyjb Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@ package kotlinx.coroutines.channels
*
* - [SUSPEND] — the upstream that is [sending][SendChannel.send] or
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
* - [DROP_OLDEST] — drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
* - [DROP_LATEST] — drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* - [DROP_OLDEST] — **the oldest** value in the buffer is dropped on overflow, and the new value is added,
* all without suspending.
* - [DROP_LATEST] — the buffer remains unchanged on overflow, and the value that we were going to add
* gets discarded, all without suspending.
*/
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
* consumers not being able to process the incoming values in time.
* [SUSPEND] is a good choice when all elements must eventually be processed.
*/
SUSPEND,

/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*
* Use this in scenarios when only the last few values are important and skipping the processing of severely
* outdated ones is desirable.
*/
DROP_OLDEST,

/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* Leave the buffer unchanged on overflow, dropping the value that we were going to add, do not suspend.
*
* This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are
* equal, so it is not important which of them get thrown away.
*/
DROP_LATEST
}
116 changes: 108 additions & 8 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,43 @@ public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
}

/**
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
* Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel.
*
* The operation is _terminal_.
* It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
* the operation is _terminal_.
* If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
*
* This function is useful for building more complex terminal operators while ensuring that the producers stop sending
* new elements to the channel.
*
* Example:
* ```
* suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
* consume { return receive() }
* // Launch a coroutine that constantly sends new values
* val channel = produce(Dispatchers.Default) {
* var i = 0
* while (true) {
* // Will fail with a `CancellationException`
* // after `consumeFirst` finishes.
* send(i++)
* }
* }
* // Grab the first value and discard everything else
* val firstElement = channel.consumeFirst()
* check(firstElement == 0)
* // *Note*: some elements could be lost in the channel!
* ```
*
* In this example, the channel will get closed, and the producer coroutine will finish its work after the first
* element is obtained.
* If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active
* until it was cancelled some other way.
*
* [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so
* some channel elements may be lost.
* Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these
* elements during [ReceiveChannel.cancel].
*/
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
Expand All @@ -70,23 +103,90 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
}

/**
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
* the channel after the execution of the block.
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
*
* This function stops processing elements when either the channel is [closed][SendChannel.close],
* the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
* channel's buffer,
* [action] fails with an exception,
* or an early return from [action] happens.
* If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
* If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
*
* When the channel does not need to be closed after iterating over its elements,
* a regular `for` loop (`for (element in channel)`) should be used instead.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
* This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel].
*
* This function is useful in cases when this channel is only expected to have a single consumer that decides when
* the producer may stop.
* Example:
*
* ```
* val channel = Channel<Int>(1)
* // Launch several procedures that create values
* repeat(5) {
* launch(Dispatchers.Default) {
* while (true) {
* channel.send(Random.nextInt(40, 50))
* }
* }
* }
* // Launch the exclusive consumer
* val result = run {
* channel.consumeEach {
* if (it == 42) {
* println("Found the answer")
* return@run it // forcibly stop collection
* }
* }
* // *Note*: some elements could be lost in the channel!
* }
* check(result == 42)
* ```
*
* In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
* Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
*
* **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
* this function decided to close the channel.
* In this case, the elements will simply be lost.
* If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
* an `onUndeliveredElement` must be passed to the [Channel] on construction.
* It will be called for each element left in the channel at the point of cancellation.
*/
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
}

/**
* Returns a [List] containing all elements.
* Returns a [List] containing all the elements sent to this channel, preserving their order.
*
* This function will attempt to receive elements and put them into the list until the channel is
* [closed][SendChannel.close].
* Calling [toList] on channels that are not eventually closed is always incorrect:
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
* until exhausting it.
*
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* Example:
* ```
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
* // start a new coroutine that creates a channel,
* // sends elements to it, and closes it
* // once the coroutine's body finishes
* val channel = produce {
* values.forEach { send(it) }
* }
* check(channel.toList() == values)
* ```
*/
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
Expand Down
Loading