Skip to content

Draft: Add onUndeliveredElement callback to ArrayChannels #3327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ internal open class ArrayChannel<E>(
*/
private val capacity: Int,
private val onBufferOverflow: BufferOverflow,
onUndeliveredElement: OnUndeliveredElement<E>?
onUndeliveredElement: OnUndeliveredElement<E>?,
private val onDroppedElement: ((E) -> Unit)? = null
) : AbstractChannel<E>(onUndeliveredElement) {
init {
// This check is actually used by the Channel(...) constructor function which checks only for known
Expand Down Expand Up @@ -63,7 +64,8 @@ internal open class ArrayChannel<E>(
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
receive = takeFirstReceiveOrPeekClosed()
?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size.value = size // restore size
return receive!!
Expand Down Expand Up @@ -153,6 +155,11 @@ internal open class ArrayChannel<E>(
} else {
// buffer is full
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
val dropped = buffer[head % buffer.size]
if (dropped != null) {
@Suppress("UNCHECKED_CAST")
onDroppedElement?.let { it(dropped as E) }
}
buffer[head % buffer.size] = null // drop oldest element
buffer[(head + currentSize) % buffer.size] = element // actually queue element
head = (head + 1) % buffer.size
Expand Down Expand Up @@ -180,7 +187,8 @@ internal open class ArrayChannel<E>(
var result: Any? = null
lock.withLock {
val size = this.size.value
if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
if (size == 0) return closedForSend
?: POLL_FAILED // when nothing can be read from buffer
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
Expand Down Expand Up @@ -282,13 +290,18 @@ internal open class ArrayChannel<E>(
override fun onCancelIdempotent(wasClosed: Boolean) {
// clear buffer first, but do not wait for it in helpers
val onUndeliveredElement = onUndeliveredElement
var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
var undeliveredElementException: UndeliveredElementException? =
null // first cancel exception, others suppressed
lock.withLock {
repeat(size.value) {
val value = buffer[head]
if (onUndeliveredElement != null && value !== EMPTY) {
@Suppress("UNCHECKED_CAST")
undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(value as E, undeliveredElementException)
undeliveredElementException =
onUndeliveredElement.callUndeliveredElementCatchingException(
value as E,
undeliveredElementException
)
}
buffer[head] = EMPTY
head = (head + 1) % buffer.size
Expand Down
9 changes: 5 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -768,14 +768,15 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
onUndeliveredElement: ((E) -> Unit)? = null,
onDroppedElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
ArrayChannel(1, onBufferOverflow, onUndeliveredElement, onDroppedElement) // support buffer overflow with buffered channel
}
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
Expand All @@ -786,13 +787,13 @@ public fun <E> Channel(
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
onBufferOverflow, onUndeliveredElement, onDroppedElement
)
else -> {
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement, onDroppedElement)
}
}

Expand Down