Skip to content

Commit 09a1e03

Browse files
committed
Update the documentation for the Channel interface
1 parent 9fd5201 commit 09a1e03

File tree

8 files changed

+929
-231
lines changed

8 files changed

+929
-231
lines changed

kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ package kotlinx.coroutines.channels
1313
*/
1414
public enum class BufferOverflow {
1515
/**
16-
* Suspend on buffer overflow.
16+
* Suspend until free space appears in the buffer.
1717
*
1818
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
1919
* consumers not being able to process the incoming values in time.

kotlinx-coroutines-core/common/src/channels/Channel.kt

Lines changed: 856 additions & 226 deletions
Large diffs are not rendered by default.

kotlinx-coroutines-core/common/src/channels/Produce.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
8181
* The kind of the resulting channel depends on the specified [capacity] parameter.
8282
* See the [Channel] interface documentation for details.
8383
* By default, an unbuffered channel is created.
84+
* If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown.
8485
*
8586
* ### Behavior on termination
8687
*

kotlinx-coroutines-core/common/test/channels/BufferedChannelTest.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@ import kotlinx.coroutines.*
55
import kotlin.test.*
66

77
class BufferedChannelTest : TestBase() {
8+
9+
/** Tests that a buffered channel does not consume enough memory to fail with an OOM. */
10+
@Test
11+
fun testMemoryConsumption() = runTest {
12+
val largeChannel = Channel<Int>(Int.MAX_VALUE / 2)
13+
repeat(10_000) {
14+
largeChannel.send(it)
15+
}
16+
repeat(10_000) {
17+
val element = largeChannel.receive()
18+
assertEquals(it, element)
19+
}
20+
}
21+
822
@Test
923
fun testIteratorHasNextIsIdempotent() = runTest {
1024
val q = Channel<Int>()

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ class ProduceTest : TestBase() {
266266
}
267267
}
268268

269+
@Test
270+
fun testProduceWithInvalidCapacity() = runTest {
271+
assertFailsWith<IllegalArgumentException> {
272+
produce<Int>(capacity = -3) { }
273+
}
274+
}
275+
269276
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
270277
val source = Channel<Int>()
271278
expect(1)

kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,24 @@ class RendezvousChannelTest : TestBase() {
273273
channel.cancel(TestCancellationException())
274274
channel.receiveCatching().getOrThrow()
275275
}
276+
277+
/** Tests that [BufferOverflow.DROP_OLDEST] takes precedence over [Channel.RENDEZVOUS]. */
278+
@Test
279+
fun testDropOldest() = runTest {
280+
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_OLDEST)
281+
channel.send(1)
282+
channel.send(2)
283+
channel.send(3)
284+
assertEquals(3, channel.receive())
285+
}
286+
287+
/** Tests that [BufferOverflow.DROP_LATEST] takes precedence over [Channel.RENDEZVOUS]. */
288+
@Test
289+
fun testDropLatest() = runTest {
290+
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_LATEST)
291+
channel.send(1)
292+
channel.send(2)
293+
channel.send(3)
294+
assertEquals(1, channel.receive())
295+
}
276296
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,10 @@ class SelectRendezvousChannelTest : TestBase() {
419419
fun testSelectSendWhenClosed() = runTest {
420420
expect(1)
421421
val c = Channel<Int>(Channel.RENDEZVOUS)
422-
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
422+
launch(start = CoroutineStart.UNDISPATCHED) {
423423
expect(2)
424424
c.send(1) // enqueue sender
425-
expectUnreached()
425+
finish(4)
426426
}
427427
c.close() // then close
428428
assertFailsWith<ClosedSendChannelException> {
@@ -434,8 +434,7 @@ class SelectRendezvousChannelTest : TestBase() {
434434
}
435435
}
436436
}
437-
sender.cancel()
438-
finish(4)
437+
assertEquals(1, c.receive())
439438
}
440439

441440
// only for debugging
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package kotlinx.coroutines.channels
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.testing.*
5+
import kotlin.test.*
6+
7+
class CancelledChannelLeakTest : TestBase() {
8+
/**
9+
* Tests that cancellation removes the elements from the channel's buffer.
10+
*/
11+
@Test
12+
fun testBufferedChannelLeak() = runTest {
13+
for (capacity in listOf(Channel.CONFLATED, Channel.RENDEZVOUS, 1, 2, 5, 10)) {
14+
val channel = Channel<X>(capacity)
15+
val value = X()
16+
launch(start = CoroutineStart.UNDISPATCHED) {
17+
channel.send(value)
18+
}
19+
FieldWalker.assertReachableCount(1, channel) { it === value }
20+
channel.cancel()
21+
// the element must be removed so that there is no memory leak
22+
FieldWalker.assertReachableCount(0, channel) { it === value }
23+
}
24+
}
25+
26+
class X
27+
}

0 commit comments

Comments
 (0)