From 0f2119035eee43bae36450f2ec62cf73f7579133 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 11 Dec 2020 12:20:22 +0300 Subject: [PATCH 1/2] Fix LinkedListChannel.onUndeliveredElement call on channel cancel Fixes #2435 --- .../common/src/channels/AbstractChannel.kt | 16 +++++++++++++++- .../common/src/channels/ArrayChannel.kt | 2 +- .../common/src/channels/ConflatedChannel.kt | 2 +- .../common/src/channels/LinkedListChannel.kt | 14 ++++++++++++++ .../channels/ChannelUndeliveredElementTest.kt | 14 ++++++++++++++ 5 files changed, 45 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 87bd43714d..1ef5528b74 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -477,7 +477,14 @@ internal abstract class AbstractSendChannel( override val pollResult: Any? get() = element override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun resumeSendClosed(closed: Closed<*>) {} + + /** + * This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList]. + */ + override fun resumeSendClosed(closed: Closed<*>) { + assert { false } + } + override fun toString(): String = "SendBuffered@$hexAddress($element)" } } @@ -669,6 +676,13 @@ internal abstract class AbstractChannel( // Add to the list only **after** successful removal list += previous as Send } + onCancelIdempotentList(list, closed) + } + + /** + * This method is overriden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list. + */ + protected open fun onCancelIdempotentList(list: InlineList, closed: Closed<*>) { list.forEachReversed { it.resumeSendClosed(closed) } } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 80cb8aa011..ef8ba399b8 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -298,7 +298,7 @@ internal open class ArrayChannel( } // then clean all queued senders super.onCancelIdempotent(wasClosed) - undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } // ------ debug ------ diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 75e421c6e7..3a502bd09f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -120,7 +120,7 @@ internal open class ConflatedChannel(onUndeliveredElement: OnUndeliveredEleme undeliveredElementException = updateValueLocked(EMPTY) } super.onCancelIdempotent(wasClosed) - undeliveredElementException?.let { throw it } // throw exception at the end if there was one + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } private fun updateValueLocked(element: Any?): UndeliveredElementException? { diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 2f46421344..6d42a44b17 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -58,5 +58,19 @@ internal open class LinkedListChannel(onUndeliveredElement: OnUndeliveredElem } } } + + override fun onCancelIdempotentList(list: InlineList, closed: Closed<*>) { + var undeliveredElementException: UndeliveredElementException? = null + list.forEachReversed { + when (it) { + is SendBuffered<*> -> { + @Suppress("UNCHECKED_CAST") + undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException) + } + else -> it.resumeSendClosed(closed) + } + } + undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one + } } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt index 0391e00033..601c2381d7 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt @@ -50,6 +50,20 @@ class ChannelUndeliveredElementTest : TestBase() { assertTrue(resA.isCancelled) // now cancelled in buffer } + @Test + fun testUnlimitedChannelCancelled() = runTest { + val channel = Channel(Channel.UNLIMITED) { it.cancel() } + val resA = Resource("A") + val resB = Resource("B") + channel.send(resA) // goes to buffer + channel.send(resB) // goes to buffer + assertFalse(resA.isCancelled) // it is in buffer, not cancelled + assertFalse(resB.isCancelled) // it is in buffer, not cancelled + channel.cancel() // now cancel the channel + assertTrue(resA.isCancelled) // now cancelled in buffer + assertTrue(resB.isCancelled) // now cancelled in buffer + } + @Test fun testConflatedResourceCancelled() = runTest { val channel = Channel(Channel.CONFLATED) { it.cancel() } From 294ec357d6e4e5f7b5fd1057dc1f7d662e11165a Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Mon, 18 Jan 2021 23:02:27 +0300 Subject: [PATCH 2/2] Update kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt Co-authored-by: Vsevolod Tolstopyatov --- kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 1ef5528b74..bb7feef71f 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -680,7 +680,7 @@ internal abstract class AbstractChannel( } /** - * This method is overriden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list. + * This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list. */ protected open fun onCancelIdempotentList(list: InlineList, closed: Closed<*>) { list.forEachReversed { it.resumeSendClosed(closed) }