From 0baea378866d87d66d61140793aecdb3b79bc48f Mon Sep 17 00:00:00 2001 From: Artem Kolin Date: Mon, 20 Jun 2022 17:30:23 +0700 Subject: [PATCH] Fix channel BufferOverflow behavior --- .../common/src/channels/ArrayChannel.kt | 18 ++++++---- .../common/test/channels/ArrayChannelTest.kt | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 7e6c0e68c5..00ba3b2a3c 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -140,8 +140,8 @@ internal open class ArrayChannel( // buffer is full return when (onBufferOverflow) { BufferOverflow.SUSPEND -> OFFER_FAILED - BufferOverflow.DROP_LATEST -> OFFER_SUCCESS - BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement + BufferOverflow.DROP_LATEST, + BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest/latest in enqueueElement } } @@ -152,10 +152,16 @@ internal open class ArrayChannel( buffer[(head + currentSize) % buffer.size] = element // actually queue element } else { // buffer is full - assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here - buffer[head % buffer.size] = null // drop oldest element - buffer[(head + currentSize) % buffer.size] = element // actually queue element - head = (head + 1) % buffer.size + if (onBufferOverflow == BufferOverflow.DROP_OLDEST) { + @Suppress("UNCHECKED_CAST") + onUndeliveredElement?.callUndeliveredElementCatchingException(buffer[head % buffer.size] as E) + buffer[head % buffer.size] = null // drop oldest element + buffer[(head + currentSize) % buffer.size] = element // actually queue element + head = (head + 1) % buffer.size + } else { + assert { onBufferOverflow == BufferOverflow.DROP_LATEST } // the only way we can get here + onUndeliveredElement?.callUndeliveredElementCatchingException(element) + } } } diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index 632fd2928b..3975fcd61a 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -145,6 +145,42 @@ class ArrayChannelTest : TestBase() { channel.receive() } + @Test + fun testBufferOverflowOldest() = runTest { + val capacity = 3 + val dropped = mutableListOf() + val expectedDropped = listOf(1, 2, 3) + val expectedValues = listOf(4, 5, 6) + + val channel = Channel(capacity, BufferOverflow.DROP_OLDEST) { dropped.add(it) } + (1..6).forEach { value -> + channel.send(value) + } + + assertContentEquals(expectedDropped, dropped) + expectedValues.forEach { expected -> + assertEquals(expected, channel.receive()) + } + } + + @Test + fun testBufferOverflowLatest() = runTest { + val capacity = 3 + val dropped = mutableListOf() + val expectedDropped = listOf(4, 5, 6) + val expectedValues = listOf(1, 2, 3) + + val channel = Channel(capacity, BufferOverflow.DROP_LATEST) { dropped.add(it) } + (1..6).forEach { value -> + channel.send(value) + } + + assertContentEquals(expectedDropped, dropped) + expectedValues.forEach { expected -> + assertEquals(expected, channel.receive()) + } + } + @Test fun testBufferSize() = runTest { val capacity = 42