Skip to content

Commit 353510a

Browse files
authored
Make ReceiveChannel.cancel linearizability-friendly
1 parent fcc004c commit 353510a

File tree

4 files changed

+44
-29
lines changed

4 files changed

+44
-29
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+23-9
Original file line numberDiff line numberDiff line change
@@ -624,22 +624,36 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
624624
}
625625

626626
// It needs to be internal to support deprecated cancel(Throwable?) API
627-
internal open fun cancelInternal(cause: Throwable?): Boolean =
627+
internal fun cancelInternal(cause: Throwable?): Boolean =
628628
close(cause).also {
629-
cleanupSendQueueOnCancel()
629+
onCancelIdempotent(it)
630630
}
631631

632-
// Note: this function is invoked when channel is already closed
633-
protected open fun cleanupSendQueueOnCancel() {
632+
/**
633+
* Method that is invoked right after [close] in [cancel] sequence.
634+
* [wasClosed] is directly mapped to the value returned by [close].
635+
*/
636+
protected open fun onCancelIdempotent(wasClosed: Boolean) {
637+
/*
638+
* See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
639+
* has the same rationale.
640+
*/
634641
val closed = closedForSend ?: error("Cannot happen")
642+
var list = InlineList<Send>()
635643
while (true) {
636-
val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
637-
if (send is Closed<*>) {
638-
assert { send === closed }
639-
return // cleaned
644+
val previous = closed.prevNode
645+
if (previous is LockFreeLinkedListHead) {
646+
break
647+
}
648+
assert { previous is Send }
649+
if (!previous.remove()) {
650+
previous.helpRemove() // make sure remove is complete before continuing
651+
continue
640652
}
641-
send.resumeSendClosed(closed)
653+
// Add to the list only **after** successful removal
654+
list += previous as Send
642655
}
656+
list.forEachReversed { it.resumeSendClosed(closed) }
643657
}
644658

645659
public final override fun iterator(): ChannelIterator<E> = Itr(this)

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+6-9
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,12 @@ internal class ArrayBroadcastChannel<E>(
208208
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
209209
override val isBufferFull: Boolean get() = error("Should not be used")
210210

211-
override fun cancelInternal(cause: Throwable?): Boolean =
212-
close(cause).also { closed ->
213-
if (closed) broadcastChannel.updateHead(removeSub = this)
214-
clearBuffer()
215-
}
216-
217-
private fun clearBuffer() {
218-
subLock.withLock {
219-
subHead = broadcastChannel.tail
211+
override fun onCancelIdempotent(wasClosed: Boolean) {
212+
if (wasClosed) {
213+
broadcastChannel.updateHead(removeSub = this)
214+
subLock.withLock {
215+
subHead = broadcastChannel.tail
216+
}
220217
}
221218
}
222219

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+10-8
Original file line numberDiff line numberDiff line change
@@ -241,17 +241,19 @@ internal open class ArrayChannel<E>(
241241
}
242242

243243
// Note: this function is invoked when channel is already closed
244-
override fun cleanupSendQueueOnCancel() {
245-
// clear buffer first
246-
lock.withLock {
247-
repeat(size) {
248-
buffer[head] = 0
249-
head = (head + 1) % buffer.size
244+
override fun onCancelIdempotent(wasClosed: Boolean) {
245+
// clear buffer first, but do not wait for it in helpers
246+
if (wasClosed) {
247+
lock.withLock {
248+
repeat(size) {
249+
buffer[head] = 0
250+
head = (head + 1) % buffer.size
251+
}
252+
size = 0
250253
}
251-
size = 0
252254
}
253255
// then clean all queued senders
254-
super.cleanupSendQueueOnCancel()
256+
super.onCancelIdempotent(wasClosed)
255257
}
256258

257259
// ------ debug ------

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
284284
private class Subscriber<E>(
285285
private val broadcastChannel: ConflatedBroadcastChannel<E>
286286
) : ConflatedChannel<E>(), ReceiveChannel<E> {
287-
override fun cancelInternal(cause: Throwable?): Boolean =
288-
close(cause).also { closed ->
289-
if (closed) broadcastChannel.closeSubscriber(this)
287+
288+
override fun onCancelIdempotent(wasClosed: Boolean) {
289+
if (wasClosed) {
290+
broadcastChannel.closeSubscriber(this)
290291
}
292+
}
291293

292294
public override fun offerInternal(element: E): Any = super.offerInternal(element)
293295
}

0 commit comments

Comments
 (0)