Skip to content

Commit 13147c4

Browse files
authored
Update the documentation for the Channel interface (#4241)
1 parent fa3076f commit 13147c4

File tree

9 files changed

+960
-242
lines changed

9 files changed

+960
-242
lines changed

Diff for: kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ package kotlinx.coroutines.channels
1313
*/
1414
public enum class BufferOverflow {
1515
/**
16-
* Suspend on buffer overflow.
16+
* Suspend until free space appears in the buffer.
1717
*
1818
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
1919
* consumers not being able to process the incoming values in time.

Diff for: kotlinx-coroutines-core/common/src/channels/Channel.kt

+881-230
Large diffs are not rendered by default.

Diff for: kotlinx-coroutines-core/common/src/channels/Produce.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
8181
* The kind of the resulting channel depends on the specified [capacity] parameter.
8282
* See the [Channel] interface documentation for details.
8383
* By default, an unbuffered channel is created.
84+
* If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown.
8485
*
8586
* ### Behavior on termination
8687
*
@@ -114,9 +115,9 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
114115
* channel.cancel()
115116
* ```
116117
*
117-
* If this coroutine finishes with an exception, it will close the channel with that exception as the cause and
118-
* the resulting channel will become _failed_, so after receiving all the existing elements, all further attempts
119-
* to receive from it will throw the exception with which the coroutine finished.
118+
* If this coroutine finishes with an exception, it will close the channel with that exception as the cause,
119+
* so after receiving all the existing elements,
120+
* all further attempts to receive from it will throw the exception with which the coroutine finished.
120121
*
121122
* ```
122123
* val produceJob = Job()
@@ -150,8 +151,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
150151
* } // throws a `CancellationException` exception after reaching -1
151152
* ```
152153
*
153-
* Note that cancelling `produce` via structured concurrency closes the channel with a cause,
154-
* making it a _failed_ channel.
154+
* Note that cancelling `produce` via structured concurrency closes the channel with a cause.
155155
*
156156
* The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
157157
*

Diff for: kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@ import kotlinx.coroutines.*
55
import kotlin.test.*
66

77
class BufferedChannelTest : TestBase() {
8+
9+
/** Tests that a buffered channel does not consume enough memory to fail with an OOM. */
10+
@Test
11+
fun testMemoryConsumption() = runTest {
12+
val largeChannel = Channel<Int>(Int.MAX_VALUE / 2)
13+
repeat(10_000) {
14+
largeChannel.send(it)
15+
}
16+
repeat(10_000) {
17+
val element = largeChannel.receive()
18+
assertEquals(it, element)
19+
}
20+
}
21+
822
@Test
923
fun testIteratorHasNextIsIdempotent() = runTest {
1024
val q = Channel<Int>()

Diff for: kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+7
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ class ProduceTest : TestBase() {
266266
}
267267
}
268268

269+
@Test
270+
fun testProduceWithInvalidCapacity() = runTest {
271+
assertFailsWith<IllegalArgumentException> {
272+
produce<Int>(capacity = -3) { }
273+
}
274+
}
275+
269276
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
270277
val source = Channel<Int>()
271278
expect(1)

Diff for: kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt

+20
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,24 @@ class RendezvousChannelTest : TestBase() {
273273
channel.cancel(TestCancellationException())
274274
channel.receiveCatching().getOrThrow()
275275
}
276+
277+
/** Tests that [BufferOverflow.DROP_OLDEST] takes precedence over [Channel.RENDEZVOUS]. */
278+
@Test
279+
fun testDropOldest() = runTest {
280+
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_OLDEST)
281+
channel.send(1)
282+
channel.send(2)
283+
channel.send(3)
284+
assertEquals(3, channel.receive())
285+
}
286+
287+
/** Tests that [BufferOverflow.DROP_LATEST] takes precedence over [Channel.RENDEZVOUS]. */
288+
@Test
289+
fun testDropLatest() = runTest {
290+
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_LATEST)
291+
channel.send(1)
292+
channel.send(2)
293+
channel.send(3)
294+
assertEquals(1, channel.receive())
295+
}
276296
}

Diff for: kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,10 @@ class SelectRendezvousChannelTest : TestBase() {
419419
fun testSelectSendWhenClosed() = runTest {
420420
expect(1)
421421
val c = Channel<Int>(Channel.RENDEZVOUS)
422-
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
422+
launch(start = CoroutineStart.UNDISPATCHED) {
423423
expect(2)
424424
c.send(1) // enqueue sender
425-
expectUnreached()
425+
finish(4)
426426
}
427427
c.close() // then close
428428
assertFailsWith<ClosedSendChannelException> {
@@ -434,8 +434,7 @@ class SelectRendezvousChannelTest : TestBase() {
434434
}
435435
}
436436
}
437-
sender.cancel()
438-
finish(4)
437+
assertEquals(1, c.receive())
439438
}
440439

441440
// only for debugging

Diff for: kotlinx-coroutines-core/jvm/src/channels/Actor.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
4545
* it will be started implicitly on the first message
4646
* [sent][SendChannel.send] to this actors's mailbox channel.
4747
*
48-
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
49-
* the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
48+
* Uncaught exceptions in this coroutine close the channel with this exception as a cause,
49+
* so that any attempt to send to such a channel throws exception.
5050
*
5151
* The kind of the resulting channel depends on the specified [capacity] parameter.
5252
* See [Channel] interface documentation for details.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package kotlinx.coroutines.channels
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.testing.*
5+
import kotlin.test.*
6+
7+
class CancelledChannelLeakTest : TestBase() {
8+
/**
9+
* Tests that cancellation removes the elements from the channel's buffer.
10+
*/
11+
@Test
12+
fun testBufferedChannelLeak() = runTest {
13+
for (capacity in listOf(Channel.CONFLATED, Channel.RENDEZVOUS, 1, 2, 5, 10)) {
14+
val channel = Channel<X>(capacity)
15+
val value = X()
16+
launch(start = CoroutineStart.UNDISPATCHED) {
17+
channel.send(value)
18+
}
19+
FieldWalker.assertReachableCount(1, channel) { it === value }
20+
channel.cancel()
21+
// the element must be removed so that there is no memory leak
22+
FieldWalker.assertReachableCount(0, channel) { it === value }
23+
}
24+
}
25+
26+
class X
27+
}

0 commit comments

Comments
 (0)