Skip to content

Commit 7d0ad29

Browse files
elizarovqwwdfsad
andauthored
Fix LinkedListChannel.onUndeliveredElement call on channel cancel (#2444)
* Fix LinkedListChannel.onUndeliveredElement call on channel cancel Fixes #2435 Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
1 parent 0e926e7 commit 7d0ad29

File tree

5 files changed

+45
-3
lines changed

5 files changed

+45
-3
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+15-1
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,14 @@ internal abstract class AbstractSendChannel<E>(
477477
override val pollResult: Any? get() = element
478478
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
479479
override fun completeResumeSend() {}
480-
override fun resumeSendClosed(closed: Closed<*>) {}
480+
481+
/**
482+
* This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList].
483+
*/
484+
override fun resumeSendClosed(closed: Closed<*>) {
485+
assert { false }
486+
}
487+
481488
override fun toString(): String = "SendBuffered@$hexAddress($element)"
482489
}
483490
}
@@ -669,6 +676,13 @@ internal abstract class AbstractChannel<E>(
669676
// Add to the list only **after** successful removal
670677
list += previous as Send
671678
}
679+
onCancelIdempotentList(list, closed)
680+
}
681+
682+
/**
683+
* This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list.
684+
*/
685+
protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
672686
list.forEachReversed { it.resumeSendClosed(closed) }
673687
}
674688

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ internal open class ArrayChannel<E>(
298298
}
299299
// then clean all queued senders
300300
super.onCancelIdempotent(wasClosed)
301-
undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one
301+
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
302302
}
303303

304304
// ------ debug ------

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredEleme
120120
undeliveredElementException = updateValueLocked(EMPTY)
121121
}
122122
super.onCancelIdempotent(wasClosed)
123-
undeliveredElementException?.let { throw it } // throw exception at the end if there was one
123+
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
124124
}
125125

126126
private fun updateValueLocked(element: Any?): UndeliveredElementException? {

kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt

+14
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,19 @@ internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElem
5858
}
5959
}
6060
}
61+
62+
override fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
63+
var undeliveredElementException: UndeliveredElementException? = null
64+
list.forEachReversed {
65+
when (it) {
66+
is SendBuffered<*> -> {
67+
@Suppress("UNCHECKED_CAST")
68+
undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException)
69+
}
70+
else -> it.resumeSendClosed(closed)
71+
}
72+
}
73+
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
74+
}
6175
}
6276

kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ class ChannelUndeliveredElementTest : TestBase() {
5050
assertTrue(resA.isCancelled) // now cancelled in buffer
5151
}
5252

53+
@Test
54+
fun testUnlimitedChannelCancelled() = runTest {
55+
val channel = Channel<Resource>(Channel.UNLIMITED) { it.cancel() }
56+
val resA = Resource("A")
57+
val resB = Resource("B")
58+
channel.send(resA) // goes to buffer
59+
channel.send(resB) // goes to buffer
60+
assertFalse(resA.isCancelled) // it is in buffer, not cancelled
61+
assertFalse(resB.isCancelled) // it is in buffer, not cancelled
62+
channel.cancel() // now cancel the channel
63+
assertTrue(resA.isCancelled) // now cancelled in buffer
64+
assertTrue(resB.isCancelled) // now cancelled in buffer
65+
}
66+
5367
@Test
5468
fun testConflatedResourceCancelled() = runTest {
5569
val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() }

0 commit comments

Comments
 (0)