Skip to content

Fix channel BufferOverflow behavior #3331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ internal open class ArrayChannel<E>(
// buffer is full
return when (onBufferOverflow) {
BufferOverflow.SUSPEND -> OFFER_FAILED
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
BufferOverflow.DROP_LATEST,
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest/latest in enqueueElement
}
}

Expand All @@ -152,10 +152,16 @@ internal open class ArrayChannel<E>(
buffer[(head + currentSize) % buffer.size] = element // actually queue element
} else {
// buffer is full
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
buffer[head % buffer.size] = null // drop oldest element
buffer[(head + currentSize) % buffer.size] = element // actually queue element
head = (head + 1) % buffer.size
if (onBufferOverflow == BufferOverflow.DROP_OLDEST) {
@Suppress("UNCHECKED_CAST")
onUndeliveredElement?.callUndeliveredElementCatchingException(buffer[head % buffer.size] as E)
buffer[head % buffer.size] = null // drop oldest element
buffer[(head + currentSize) % buffer.size] = element // actually queue element
head = (head + 1) % buffer.size
} else {
assert { onBufferOverflow == BufferOverflow.DROP_LATEST } // the only way we can get here
onUndeliveredElement?.callUndeliveredElementCatchingException(element)
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,42 @@ class ArrayChannelTest : TestBase() {
channel.receive()
}

@Test
fun testBufferOverflowOldest() = runTest {
val capacity = 3
val dropped = mutableListOf<Int>()
val expectedDropped = listOf(1, 2, 3)
val expectedValues = listOf(4, 5, 6)

val channel = Channel<Int>(capacity, BufferOverflow.DROP_OLDEST) { dropped.add(it) }
(1..6).forEach { value ->
channel.send(value)
}

assertContentEquals(expectedDropped, dropped)
expectedValues.forEach { expected ->
assertEquals(expected, channel.receive())
}
}

@Test
fun testBufferOverflowLatest() = runTest {
val capacity = 3
val dropped = mutableListOf<Int>()
val expectedDropped = listOf(4, 5, 6)
val expectedValues = listOf(1, 2, 3)

val channel = Channel<Int>(capacity, BufferOverflow.DROP_LATEST) { dropped.add(it) }
(1..6).forEach { value ->
channel.send(value)
}

assertContentEquals(expectedDropped, dropped)
expectedValues.forEach { expected ->
assertEquals(expected, channel.receive())
}
}

@Test
fun testBufferSize() = runTest {
val capacity = 42
Expand Down