Skip to content

Extend the KDoc for some channel APIs #4148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8cafba2
Reword the BufferOverflow KDoc for consistency in the entry list
globsterg Jun 2, 2024
b40cf97
Describe the situations in which BufferOverflow options are useful
globsterg Jun 2, 2024
776adad
Expand the documentation for channel consumption functions
globsterg Jun 4, 2024
1b70f42
Specify the behavior of Channel.consumeEach on scope cancellation
globsterg Jun 4, 2024
2f4490e
Extend the documentation for `ProducerScope.awaitClose`
globsterg Jun 4, 2024
34b239f
Reword a misleading statement in the `produce` documentation
globsterg Jun 5, 2024
60b1f10
Document `awaitClose` and `invokeOnClose` interactions
globsterg Jun 5, 2024
2375d0f
Document how consuming operators handle failed channels
globsterg Jun 5, 2024
d09b4f7
Document cancelling the coroutine but not the channel of `produce`
globsterg Jun 6, 2024
56b512d
Don't use the magic constant 0 in default parameters of `produce`
globsterg Jun 6, 2024
4e4afd8
Fix an incorrect statement in `produce` docs
globsterg Jun 6, 2024
53b5184
Add samples to the `produce` documentation and restructure it
globsterg Jun 6, 2024
08dc948
Apply suggestions from code review
globsterg Jul 9, 2024
e570d99
Wrap a branch in braces
dkhalanskyjb Jul 16, 2024
1459060
Clarify the contract of ReceiveChannel.consume
dkhalanskyjb Jul 17, 2024
1c3e51e
Clarify the contract of consumeEach
dkhalanskyjb Jul 17, 2024
3b3c94a
Fix the Channel.toList documentation
dkhalanskyjb Jul 17, 2024
dbe9b2a
Clarify that produce() may lose elements
dkhalanskyjb Jul 17, 2024
3bc19f7
Remove a code snippet
dkhalanskyjb Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@ package kotlinx.coroutines.channels
*
* - [SUSPEND] — the upstream that is [sending][SendChannel.send] or
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
* - [DROP_OLDEST] — drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
* - [DROP_LATEST] — drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* - [DROP_OLDEST] — **the oldest** value in the buffer is dropped on overflow, and the new value is added,
* all without suspending.
* - [DROP_LATEST] — the buffer remains unchanged on overflow, and the value that we were going to add
* gets discarded, all without suspending.
*/
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
* consumers not being able to process the incoming values in time.
* [SUSPEND] is a good choice when all elements must eventually be processed.
*/
SUSPEND,

/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*
* Use this in scenarios when only the last few values are important and skipping the processing of severely
* outdated ones is desirable.
*/
DROP_OLDEST,

/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* Leave the buffer unchanged on overflow, dropping the value that we were going to add, do not suspend.
*
* This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are
* equal, so it is not important which of them get thrown away.
*/
DROP_LATEST
}
108 changes: 101 additions & 7 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,44 @@ public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
}

/**
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
* Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel,
* ensuring that all the elements that were sent are processed by either [block] or [ReceiveChannel.cancel].
*
* The operation is _terminal_.
* It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
* the operation is _terminal_.
* If the [block] finishes with an exception, that exception will be used for cancelling the channel.
*
* This function is useful for building more complex terminal operators while ensuring that no elements will be lost.
* Example:
*
* ```
* suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
* consume { return receive() }
* fun Int.cleanup() { println("cleaning up $this") }
* val channel = Channel<Int>(10, onUndeliveredElement = Int::cleanup)
* // Launch a procedure that creates values
* launch(Dispatchers.Default) {
* repeat(10) {
* val sendResult = channel.trySend(it)
* if (sendResult.isFailure) {
* print("in the producer: ")
* it.cleanup()
* }
* yield()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield is redundant and rather confusing here, let's avoid that?

* }
* }
* // Grab the first value and discard everything else
* launch(Dispatchers.Default) {
* val firstElement = channel.consumeFirst()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe invoke it in-place instead of a single coroutine?

* println("received $firstElement")
* }
* ```
*
* In this example, all ten values created by the producer coroutine will be processed: one by `consumeFirst`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this, it seems like the example is mixing two concepts: one is about consuming, and the other about undelivered clean up (which we don't consider as "processed", it has a different meaning).

Maybe something more straightforward?
Like the following (outline, haven't tried to compile it):

val producer = produce {
    var currentElement = 0
    try {
        while(true) {
            send(currentElement++)
        }
    } finally {
        println("Cancelled after: $currentElement")
    }
}

consumeFirst { ... }

It's deterministic, more trivial and straightforward. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. The point of consume seems to be just notifying the producer that there are no more consumers. I noticed that consume is a reliable way not to miss any elements and became too fixated on that, but this really is a just side effect.

* and the other ones by `Int.cleanup`, invoked either by [ReceiveChannel.cancel] inside [consume] or by the
* producer itself when it observes failure.
* In any case, exactly nine elements will go through a cleanup in this example.
* If `consumeFirst` is implemented as `for (e in this) { return e }` instead, the cleanup does not happen.
*/
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
Expand All @@ -70,23 +104,83 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
}

/**
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
* the channel after the execution of the block.
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
*
* This function stops processing elements when the channel is [closed][SendChannel.close],
* the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
* channel's buffer,
* or an early return from [action] happens.
* If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
* If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
*
* When the channel does not need to be closed after iterating over its elements,
* a regular `for` loop (`for (element in channel)`) should be used instead.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* This function is useful in cases when this channel is only expected to have a single consumer that decides when
* the producer may stop and ensures that the elements that were sent do get processed.
* Example:
*
* ```
* fun Int.cleanup() { println("cleaning up $this") }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about onUndeliveredElement and cleanup, it doesn't really seem relevant to these functions

* val channel = Channel<Int>(1, onUndeliveredElement = Int::cleanup)
* // Launch several procedures that create values
* repeat(5) {
* launch(Dispatchers.Default) {
* while (true) {
* val x = Random.nextInt(40, 50)
* println("Generating $x")
* channel.send(x)
* }
* }
* }
* // Launch the exclusive consumer
* launch(Dispatchers.Default) {
* channel.consumeEach {
* if (it == 42) {
* println("Found the answer")
* return@launch
* } else it.cleanup()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap it in braces

* }
* }
* ```
*
* In this example, all ten values created by the producer coroutines will be processed:
* while the single consumer is active, it will receive all the elements, but once it exits,
* the values that can no longer be delivered will be passed to the `Int.cleanup` handler.
*/
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
}

/**
* Returns a [List] containing all elements.
* Returns a [List] containing all the elements sent to this channel, preserving their order.
*
* This function will attempt to receive elements and put them into the list until the channel is
* [closed][SendChannel.close].
* Calling [toList] without closing the channel is always incorrect:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it a bit less ambigous? "Calling toList on channels that are not closed" or something like that

* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
* until exhausting it.
*
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* Example:
* ```
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
* val channel = Channel<Int>()
* GlobalScope.launch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use produce for that?
Esp. here, where finally block is required otherwise

Copy link
Contributor Author

@globsterg globsterg Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found produce too high-level and thought that using it could detract from the point of the sample. Is it wrong? Is produce common knowledge and part of any coroutine user's toolkit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

produce is more high-level, but it's also the idiomatic way to create a producer coroutine, so it's worth mentioning. It's true that it may overwhelm the reader, but we can probably work around this by adding extensive comments.

* values.forEach { channel.send(it) }
* channel.close()
* }
* check(channel.toList() == values)
* ```
*/
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
Expand Down
178 changes: 156 additions & 22 deletions kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,43 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
}

/**
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
* and invokes the given [block] before resuming the coroutine.
* Suspends the current coroutine until the channel is either
* [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
*
* This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
* suspending function is waiting, this function immediately resumes with [CancellationException].
* The given [block] will be executed unconditionally before this function returns.
* `awaitClose { cleanup() }` is a convenient shorthand for the often useful form
* `try { awaitClose() } finally { cleanup() }`.
*
* This function can only be invoked directly inside the same coroutine that is its receiver.
* Specifying the receiver of [awaitClose] explicitly is most probably a mistake.
*
* This suspending function is cancellable: if the [Job] of the current coroutine is [cancelled][CoroutineScope.cancel]
* while this suspending function is waiting, this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**: even if this function is ready to return, but was cancelled
* while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
*
* Note that when the producer channel is cancelled, this function resumes with a cancellation exception.
* Therefore, in case of cancellation, no code after the call to this function will be executed.
* That's why this function takes a lambda parameter.
*
* Example of usage:
* ```
* val callbackEventsStream = produce {
* val disposable = registerChannelInCallback(channel)
* awaitClose { disposable.dispose() }
* }
* ```
*
* Internally, [awaitClose] is implemented using [SendChannel.invokeOnClose].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

* Currently, every channel can have at most one [SendChannel.invokeOnClose] handler.
* This means that calling [awaitClose] several times in a row or combining it with other [SendChannel.invokeOnClose]
* invocations is prohibited.
* An [IllegalStateException] will be thrown if this rule is broken.
*
* **Pitfall**: when used in [produce], if the channel is [cancelled][ReceiveChannel.cancel], [awaitClose] can either
* return normally or throw a [CancellationException] due to a race condition.
* The reason is that, for [produce], cancelling the channel and cancelling the coroutine of the [ProducerScope] is
* done simultaneously.
*
* @throws IllegalStateException if invoked from outside the [ProducerScope] (by leaking `this` outside the producer
* coroutine).
* @throws IllegalStateException if this channel already has a [SendChannel.invokeOnClose] handler registered.
*/
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
Expand All @@ -58,35 +76,151 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
*
* The scope of the coroutine contains the [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that the coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
* The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
* both [CoroutineScope] and [SendChannel], so that the coroutine can invoke [send][SendChannel.send] directly.
*
* The kind of the resulting channel depends on the specified [capacity] parameter.
* See the [Channel] interface documentation for details.
* By default, an unbuffered channel is created.
*
* ### Behavior on termination
*
* The channel is [closed][SendChannel.close] when the coroutine completes.
*
* ```
* val values = listOf(1, 2, 3, 4)
* val channel = produce<Int> {
* for (value in values) {
* send(value)
* }
* }
* check(channel.toList() == values)
* ```
*
* The running coroutine is cancelled when the channel is [cancelled][ReceiveChannel.cancel].
*
* ```
* val channel = produce<Int> {
* send(1)
* send(2)
* try {
* send(3) // will throw CancellationException
* } catch (e: CancellationException) {
* println("The channel was cancelled!)
* throw e // always rethrow CancellationException
* }
* }
* check(channel.receive() == 1)
* check(channel.receive() == 2)
* channel.cancel()
* ```
*
* If this coroutine finishes with an exception, it will close the channel with that exception as the cause and
* the resulting channel will become _failed_, so after receiving all the existing elements, all further attempts
* to receive from it will throw the exception with which the coroutine finished.
*
* ```
* val produceJob = Job()
* // create and populate a channel with a buffer
* val channel = produce<Int>(produceJob, capacity = Channel.UNLIMITED) {
* repeat(5) { send(it) }
* throw TestException()
* }
* produceJob.join() // wait for `produce` to fail
* check(produceJob.isCancelled == true)
* // prints 0, 1, 2, 3, 4, then throws `TestException`
* for (value in channel) { println(value) }
* ```
*
* When the coroutine is cancelled via structured concurrency and not the `cancel` function,
* the channel does not automatically close until the coroutine completes,
* so it is possible that some elements will be sent even after the coroutine is cancelled:
*
* ```
* val parentScope = CoroutineScope(Dispatchers.Default)
* val channel = parentScope.produce<Int>(capacity = Channel.UNLIMITED) {
* repeat(5) {
* send(it)
* }
* parentScope.cancel()
* // suspending after this point would fail, but sending succeeds
* send(-1)
* }
* for (c in channel) {
* println(c) // 0, 1, 2, 3, 4, -1
* } // throws a `CancellationException` exception after reaching -1
* ```
*
* Note that cancelling `produce` via structured concurrency closes the channel with a cause,
* making it a _failed_ channel.
*
* The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
*
* ### Coroutine context
*
* The coroutine context is inherited from this [CoroutineScope]. Additional context elements can be specified with the [context] argument.
* If the context does not have any dispatcher or other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from the [CoroutineScope] as well, but it can also be overridden
* with a corresponding [context] element.
*
* Any uncaught exception in this coroutine will close the channel with this exception as the cause and
* the resulting channel will become _failed_, so that any attempt to receive from it thereafter will throw an exception.
* See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines.
*
* The kind of the resulting channel depends on the specified [capacity] parameter.
* See the [Channel] interface documentation for details.
* ### Usage example
*
* See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines.
* ```
* /* Generate random integers until we find the square root of 9801.
* To calculate whether the given number is that square root,
* use several coroutines that separately process these integers.
* Alternatively, we may randomly give up during value generation.
* `produce` is used to generate the integers and put them into a
* channel, from which the square-computing coroutines take them. */
* val parentScope = CoroutineScope(SupervisorJob())
* val channel = parentScope.produce<Int>(
* Dispatchers.IO,
* capacity = 16 // buffer of size 16
* ) {
* // this code will run on Dispatchers.IO
* while (true) {
* val request = run {
* // simulate waiting for the next request
* delay(5.milliseconds)
* val randomInt = Random.nextInt(-1, 100)
* if (randomInt == -1) {
* // external termination request received
* println("Producer: no longer accepting requests")
* return@produce
* }
* println("Producer: sending a request ($randomInt)")
* randomInt
* }
* send(request)
* }
* }
* // Launch consumers
* repeat(4) {
* launch(Dispatchers.Default) {
* for (request in channel) {
* // simulate processing a request
* delay(25.milliseconds)
* println("Consumer $it: received a request ($request)")
* if (request * request == 9801) {
* println("Consumer $it found the square root of 9801!")
* /* the work is done, the producer may finish.
* the internal termination request will cancel
* the producer on the next suspension point. */
* channel.cancel()
* }
* }
* }
* }
* ```
*
* **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (no buffer by default).
* @param block the coroutine code.
*/
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
capacity: Int = Channel.RENDEZVOUS,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
Expand Down
Loading