diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 9224ae8fce..fb6846ef4e 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -241,7 +241,7 @@ internal open class BufferedChannel( /** * Abstract send implementation. */ - protected inline fun sendImpl( + private inline fun sendImpl( /* The element to be sent. */ element: E, /* The waiter to be stored in case of suspension, @@ -350,6 +350,29 @@ internal open class BufferedChannel( } } + // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554. + // For now, an inline function, which invokes atomic operations, may only be called within a parent class. + protected fun trySendDropOldest(element: E): ChannelResult = + sendImpl( // <-- this is an inline function + element = element, + // Put the element into the logical buffer even + // if this channel is already full, the `onSuspend` + // callback below extract the first (oldest) element. + waiter = BUFFERED, + // Finish successfully when a rendezvous has happened + // or the element has been buffered. + onRendezvousOrBuffered = { return success(Unit) }, + // In case the algorithm decided to suspend, the element + // was added to the buffer. However, as the buffer is now + // overflowed, the first (oldest) element has to be extracted. + onSuspend = { segm, i -> + dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i) + return success(Unit) + }, + // If the channel is closed, return the corresponding result. + onClosed = { return closed(sendException) } + ) + private inline fun sendImplOnNoWaiter( /* The working cell is specified by the segment and the index in it. */ diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt index a2d3a10322..5c7f151022 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt @@ -72,27 +72,6 @@ internal open class ConflatedBufferedChannel( return success(Unit) } - private fun trySendDropOldest(element: E): ChannelResult = - sendImpl( // <-- this is an inline function - element = element, - // Put the element into the logical buffer even - // if this channel is already full, the `onSuspend` - // callback below extract the first (oldest) element. - waiter = BUFFERED, - // Finish successfully when a rendezvous has happened - // or the element has been buffered. - onRendezvousOrBuffered = { return success(Unit) }, - // In case the algorithm decided to suspend, the element - // was added to the buffer. However, as the buffer is now - // overflowed, the first (oldest) element has to be extracted. - onSuspend = { segm, i -> - dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i) - return success(Unit) - }, - // If the channel is closed, return the corresponding result. - onClosed = { return closed(sendException) } - ) - @Suppress("UNCHECKED_CAST") override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { // The plain `send(..)` operation never suspends. Thus, either this