diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 87bd43714d..bb7feef71f 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -477,7 +477,14 @@ internal abstract class AbstractSendChannel( override val pollResult: Any? get() = element override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun resumeSendClosed(closed: Closed<*>) {} + + /** + * This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList]. + */ + override fun resumeSendClosed(closed: Closed<*>) { + assert { false } + } + override fun toString(): String = "SendBuffered@$hexAddress($element)" } } @@ -669,6 +676,13 @@ internal abstract class AbstractChannel( // Add to the list only **after** successful removal list += previous as Send } + onCancelIdempotentList(list, closed) + } + + /** + * This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list. + */ + protected open fun onCancelIdempotentList(list: InlineList, closed: Closed<*>) { list.forEachReversed { it.resumeSendClosed(closed) } } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 80cb8aa011..ef8ba399b8 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -298,7 +298,7 @@ internal open class ArrayChannel( } // then clean all queued senders super.onCancelIdempotent(wasClosed) - undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } // ------ debug ------ diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 75e421c6e7..3a502bd09f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -120,7 +120,7 @@ internal open class ConflatedChannel(onUndeliveredElement: OnUndeliveredEleme undeliveredElementException = updateValueLocked(EMPTY) } super.onCancelIdempotent(wasClosed) - undeliveredElementException?.let { throw it } // throw exception at the end if there was one + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } private fun updateValueLocked(element: Any?): UndeliveredElementException? { diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 2f46421344..6d42a44b17 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -58,5 +58,19 @@ internal open class LinkedListChannel(onUndeliveredElement: OnUndeliveredElem } } } + + override fun onCancelIdempotentList(list: InlineList, closed: Closed<*>) { + var undeliveredElementException: UndeliveredElementException? = null + list.forEachReversed { + when (it) { + is SendBuffered<*> -> { + @Suppress("UNCHECKED_CAST") + undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException) + } + else -> it.resumeSendClosed(closed) + } + } + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one + } } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt index 0391e00033..601c2381d7 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt @@ -50,6 +50,20 @@ class ChannelUndeliveredElementTest : TestBase() { assertTrue(resA.isCancelled) // now cancelled in buffer } + @Test + fun testUnlimitedChannelCancelled() = runTest { + val channel = Channel(Channel.UNLIMITED) { it.cancel() } + val resA = Resource("A") + val resB = Resource("B") + channel.send(resA) // goes to buffer + channel.send(resB) // goes to buffer + assertFalse(resA.isCancelled) // it is in buffer, not cancelled + assertFalse(resB.isCancelled) // it is in buffer, not cancelled + channel.cancel() // now cancel the channel + assertTrue(resA.isCancelled) // now cancelled in buffer + assertTrue(resB.isCancelled) // now cancelled in buffer + } + @Test fun testConflatedResourceCancelled() = runTest { val channel = Channel(Channel.CONFLATED) { it.cancel() }