Skip to content

Commit bc8160b

Browse files
authored
Extend the KDoc for some channel APIs (#4148)
* Reword the BufferOverflow KDoc for consistency in the entry list Before, the description of `SUSPEND` was phrased in terms of what will happen, while the rest of the entries were described in an imperative form, that is, as commands as to what should happen. Now, all entries are clarified using a descriptive form. * Describe the situations in which BufferOverflow options are useful * Expand the documentation for channel consumption functions Added explanations of what exactly happens on each code path, how these operators ensure that all elements get processed eventually, and provided some usage examples. * Specify the behavior of Channel.consumeEach on scope cancellation * Extend the documentation for `ProducerScope.awaitClose` Filed #4149 * Reword a misleading statement in the `produce` documentation Currently, the documentation states that uncaught exceptions will lead to the channel being closed. "Uncaught exceptions" is a special thing in kotlinx.coroutines: <https://kotlinlang.org/docs/exception-handling.html#coroutineexceptionhandler> These are not just exceptions that are not wrapped in a try-catch, these are exceptions that can not be propagated to a root coroutine via structured concurrency. Fixed the wording and added a test that shows that uncaught coroutine exceptions are not handled in any special manner. * Document `awaitClose` and `invokeOnClose` interactions Turns out, only a single invocation of either `awaitClose` or `invokeOnClose` is allowed in the lifetime of a channel. Document that. * Document how consuming operators handle failed channels * Document cancelling the coroutine but not the channel of `produce` * Don't use the magic constant 0 in default parameters of `produce` Instead, use `Channel.RENDEZVOUS` so that a meaningful constant is shown in Dokka's output. * Fix an incorrect statement in `produce` docs Currently, the docs claim that attempting to receive from a failed channel fails. However, the documentation for `Channel` itself correctly states that before `receive` fails, the elements that were already sent will be processed first. Corrected this and added a test demonstrating the behavior. * Add samples to the `produce` documentation and restructure it
1 parent ab279a7 commit bc8160b

File tree

6 files changed

+433
-35
lines changed

6 files changed

+433
-35
lines changed

kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt

+15-5
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,34 @@ package kotlinx.coroutines.channels
66
*
77
* - [SUSPEND] &mdash; the upstream that is [sending][SendChannel.send] or
88
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
9-
* - [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
10-
* - [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
11-
* (so that buffer contents stay the same), do not suspend.
9+
* - [DROP_OLDEST] &mdash; **the oldest** value in the buffer is dropped on overflow, and the new value is added,
10+
* all without suspending.
11+
* - [DROP_LATEST] &mdash; the buffer remains unchanged on overflow, and the value that we were going to add
12+
* gets discarded, all without suspending.
1213
*/
1314
public enum class BufferOverflow {
1415
/**
1516
* Suspend on buffer overflow.
17+
*
18+
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
19+
* consumers not being able to process the incoming values in time.
20+
* [SUSPEND] is a good choice when all elements must eventually be processed.
1621
*/
1722
SUSPEND,
1823

1924
/**
2025
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
26+
*
27+
* Use this in scenarios when only the last few values are important and skipping the processing of severely
28+
* outdated ones is desirable.
2129
*/
2230
DROP_OLDEST,
2331

2432
/**
25-
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
26-
* (so that buffer contents stay the same), do not suspend.
33+
* Leave the buffer unchanged on overflow, dropping the value that we were going to add, do not suspend.
34+
*
35+
* This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are
36+
* equal, so it is not important which of them get thrown away.
2737
*/
2838
DROP_LATEST
2939
}

kotlinx-coroutines-core/common/src/channels/Channels.common.kt

+108-8
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,43 @@ public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
4949
}
5050

5151
/**
52-
* Makes sure that the given [block] consumes all elements from the given channel
53-
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
52+
* Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel.
5453
*
55-
* The operation is _terminal_.
54+
* It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
55+
* the operation is _terminal_.
56+
* If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
57+
*
58+
* This function is useful for building more complex terminal operators while ensuring that the producers stop sending
59+
* new elements to the channel.
60+
*
61+
* Example:
62+
* ```
63+
* suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
64+
* consume { return receive() }
65+
* // Launch a coroutine that constantly sends new values
66+
* val channel = produce(Dispatchers.Default) {
67+
* var i = 0
68+
* while (true) {
69+
* // Will fail with a `CancellationException`
70+
* // after `consumeFirst` finishes.
71+
* send(i++)
72+
* }
73+
* }
74+
* // Grab the first value and discard everything else
75+
* val firstElement = channel.consumeFirst()
76+
* check(firstElement == 0)
77+
* // *Note*: some elements could be lost in the channel!
78+
* ```
79+
*
80+
* In this example, the channel will get closed, and the producer coroutine will finish its work after the first
81+
* element is obtained.
82+
* If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active
83+
* until it was cancelled some other way.
84+
*
85+
* [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so
86+
* some channel elements may be lost.
87+
* Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these
88+
* elements during [ReceiveChannel.cancel].
5689
*/
5790
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
5891
contract {
@@ -70,23 +103,90 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
70103
}
71104

72105
/**
73-
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
74-
* the channel after the execution of the block.
75-
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
106+
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
107+
*
108+
* This function stops processing elements when either the channel is [closed][SendChannel.close],
109+
* the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
110+
* channel's buffer,
111+
* [action] fails with an exception,
112+
* or an early return from [action] happens.
113+
* If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
114+
* If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
115+
*
116+
* When the channel does not need to be closed after iterating over its elements,
117+
* a regular `for` loop (`for (element in channel)`) should be used instead.
76118
*
77119
* The operation is _terminal_.
78-
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
120+
* This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel].
121+
*
122+
* This function is useful in cases when this channel is only expected to have a single consumer that decides when
123+
* the producer may stop.
124+
* Example:
125+
*
126+
* ```
127+
* val channel = Channel<Int>(1)
128+
* // Launch several procedures that create values
129+
* repeat(5) {
130+
* launch(Dispatchers.Default) {
131+
* while (true) {
132+
* channel.send(Random.nextInt(40, 50))
133+
* }
134+
* }
135+
* }
136+
* // Launch the exclusive consumer
137+
* val result = run {
138+
* channel.consumeEach {
139+
* if (it == 42) {
140+
* println("Found the answer")
141+
* return@run it // forcibly stop collection
142+
* }
143+
* }
144+
* // *Note*: some elements could be lost in the channel!
145+
* }
146+
* check(result == 42)
147+
* ```
148+
*
149+
* In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
150+
* Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
151+
*
152+
* **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
153+
* this function decided to close the channel.
154+
* In this case, the elements will simply be lost.
155+
* If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
156+
* an `onUndeliveredElement` must be passed to the [Channel] on construction.
157+
* It will be called for each element left in the channel at the point of cancellation.
79158
*/
80159
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
81160
consume {
82161
for (e in this) action(e)
83162
}
84163

85164
/**
86-
* Returns a [List] containing all elements.
165+
* Returns a [List] containing all the elements sent to this channel, preserving their order.
166+
*
167+
* This function will attempt to receive elements and put them into the list until the channel is
168+
* [closed][SendChannel.close].
169+
* Calling [toList] on channels that are not eventually closed is always incorrect:
170+
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
171+
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
172+
* until exhausting it.
173+
*
174+
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
87175
*
88176
* The operation is _terminal_.
89177
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
178+
*
179+
* Example:
180+
* ```
181+
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
182+
* // start a new coroutine that creates a channel,
183+
* // sends elements to it, and closes it
184+
* // once the coroutine's body finishes
185+
* val channel = produce {
186+
* values.forEach { send(it) }
187+
* }
188+
* check(channel.toList() == values)
189+
* ```
90190
*/
91191
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
92192
consumeEach {

0 commit comments

Comments
 (0)