Skip to content

Commit 1f77783

Browse files
committed
Fix non-linearizability in ArrayChannel while moving an element from the waiting queue to the buffer
1 parent 16d2606 commit 1f77783

File tree

2 files changed

+52
-37
lines changed

2 files changed

+52
-37
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+17-14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
3030

3131
/**
3232
* Returns `true` if this channel's buffer is full.
33+
* This operation should be atomic if it is invoked by [enqueueSend].
3334
* @suppress **This is unstable API and it is subject to change.**
3435
*/
3536
protected abstract val isBufferFull: Boolean
@@ -140,8 +141,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
140141
// ------ SendChannel ------
141142

142143
public final override val isClosedForSend: Boolean get() = closedForSend != null
143-
public final override val isFull: Boolean get() = full
144-
private val full: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull // TODO rename to `isFull`
144+
public override val isFull: Boolean get() = isFullImpl
145+
protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
145146

146147
public final override suspend fun send(element: E) {
147148
// fast path -- try offer non-blocking
@@ -182,7 +183,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
182183

183184
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
184185
loop@ while (true) {
185-
if (full) {
186+
if (isFullImpl) {
186187
val send = SendElement(element, cont)
187188
val enqueueResult = enqueueSend(send)
188189
when {
@@ -227,7 +228,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
227228
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
228229
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
229230
*/
230-
private fun enqueueSend(send: Send): Any? {
231+
protected open fun enqueueSend(send: Send): Any? {
231232
if (isBufferAlwaysFull) {
232233
queue.addLastIfPrev(send) { prev ->
233234
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
@@ -382,7 +383,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382383
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
383384
while (true) {
384385
if (select.isSelected) return
385-
if (full) {
386+
if (isFullImpl) {
386387
val node = SendSelect(element, this, select, block)
387388
val enqueueResult = enqueueSend(node)
388389
when {
@@ -495,6 +496,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
495496

496497
/**
497498
* Returns `true` if this channel's buffer is empty.
499+
* This operation should be atomic if it is invoked by [enqueueReceive].
498500
* @suppress **This is unstable API and it is subject to change.**
499501
*/
500502
protected abstract val isBufferEmpty: Boolean
@@ -542,8 +544,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
542544

543545
// ------ ReceiveChannel ------
544546

545-
public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
546-
public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
547+
public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
548+
public override val isEmpty: Boolean get() = isEmptyImpl
549+
protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
547550

548551
public final override suspend fun receive(): E {
549552
// fast path -- try poll non-blocking
@@ -580,12 +583,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
580583
}
581584
}
582585

583-
private fun enqueueReceive(receive: Receive<E>): Boolean {
584-
val result = if (isBufferAlwaysEmpty)
585-
queue.addLastIfPrev(receive) { it !is Send } else
586-
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
586+
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
587+
queue.addLastIfPrev(receive) { it !is Send } else
588+
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
589+
590+
private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
587591
if (result) onReceiveEnqueued()
588-
return result
589592
}
590593

591594
public final override suspend fun receiveOrNull(): E? {
@@ -718,7 +721,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
718721
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
719722
while (true) {
720723
if (select.isSelected) return
721-
if (isEmpty) {
724+
if (isEmptyImpl) {
722725
if (enqueueReceiveSelect(select, block, receiveMode)) return
723726
} else {
724727
val pollResult = pollSelectInternal(select)
@@ -1058,7 +1061,7 @@ internal class Closed<in E>(
10581061
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
10591062
}
10601063

1061-
private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1064+
internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
10621065
override val offerResult get() = OFFER_SUCCESS
10631066
abstract fun resumeReceiveClosed(closed: Closed<*>)
10641067
}

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+35-23
Original file line numberDiff line numberDiff line change
@@ -36,34 +36,38 @@ internal open class ArrayChannel<E>(
3636
*/
3737
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
3838
private var head: Int = 0
39-
private var size = 0 // Invariant: size <= capacity
39+
private val size = atomic(0) // Invariant: size <= capacity
4040

4141
protected final override val isBufferAlwaysEmpty: Boolean get() = false
42-
protected final override val isBufferEmpty: Boolean get() = lock.withLock { size == 0 }
42+
protected final override val isBufferEmpty: Boolean get() = size.value == 0
4343
protected final override val isBufferAlwaysFull: Boolean get() = false
44-
protected final override val isBufferFull: Boolean get() = lock.withLock { size == capacity }
44+
protected final override val isBufferFull: Boolean get() = size.value == capacity
45+
46+
override val isFull: Boolean get() = lock.withLock { isFullImpl }
47+
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
48+
override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive }
4549

4650
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
4751
protected override fun offerInternal(element: E): Any {
4852
var receive: ReceiveOrClosed<E>? = null
4953
lock.withLock {
50-
val size = this.size
54+
val size = this.size.value
5155
closedForSend?.let { return it }
5256
if (size < capacity) {
5357
// tentatively put element to buffer
54-
this.size = size + 1 // update size before checking queue (!!!)
58+
this.size.value = size + 1 // update size before checking queue (!!!)
5559
// check for receivers that were waiting on empty queue
5660
if (size == 0) {
5761
loop@ while (true) {
5862
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
5963
if (receive is Closed) {
60-
this.size = size // restore size
64+
this.size.value = size // restore size
6165
return receive!!
6266
}
6367
val token = receive!!.tryResumeReceive(element, null)
6468
if (token != null) {
6569
assert { token === RESUME_TOKEN }
66-
this.size = size // restore size
70+
this.size.value = size // restore size
6771
return@withLock
6872
}
6973
}
@@ -84,26 +88,26 @@ internal open class ArrayChannel<E>(
8488
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
8589
var receive: ReceiveOrClosed<E>? = null
8690
lock.withLock {
87-
val size = this.size
91+
val size = this.size.value
8892
closedForSend?.let { return it }
8993
if (size < capacity) {
9094
// tentatively put element to buffer
91-
this.size = size + 1 // update size before checking queue (!!!)
95+
this.size.value = size + 1 // update size before checking queue (!!!)
9296
// check for receivers that were waiting on empty queue
9397
if (size == 0) {
9498
loop@ while (true) {
9599
val offerOp = describeTryOffer(element)
96100
val failure = select.performAtomicTrySelect(offerOp)
97101
when {
98102
failure == null -> { // offered successfully
99-
this.size = size // restore size
103+
this.size.value = size // restore size
100104
receive = offerOp.result
101105
return@withLock
102106
}
103107
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
104108
failure === RETRY_ATOMIC -> {} // retry
105109
failure === ALREADY_SELECTED || failure is Closed<*> -> {
106-
this.size = size // restore size
110+
this.size.value = size // restore size
107111
return failure
108112
}
109113
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
@@ -112,7 +116,7 @@ internal open class ArrayChannel<E>(
112116
}
113117
// let's try to select sending this element to buffer
114118
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
115-
this.size = size // restore size
119+
this.size.value = size // restore size
116120
return ALREADY_SELECTED
117121
}
118122
ensureCapacity(size)
@@ -127,6 +131,10 @@ internal open class ArrayChannel<E>(
127131
return receive!!.offerResult
128132
}
129133

134+
override fun enqueueSend(send: Send): Any? = lock.withLock {
135+
super.enqueueSend(send)
136+
}
137+
130138
// Guarded by lock
131139
private fun ensureCapacity(currentSize: Int) {
132140
if (currentSize >= buffer.size) {
@@ -146,12 +154,12 @@ internal open class ArrayChannel<E>(
146154
var resumed = false
147155
var result: Any? = null
148156
lock.withLock {
149-
val size = this.size
157+
val size = this.size.value
150158
if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
151159
// size > 0: not empty -- retrieve element
152160
result = buffer[head]
153161
buffer[head] = null
154-
this.size = size - 1 // update size before checking queue (!!!)
162+
this.size.value = size - 1 // update size before checking queue (!!!)
155163
// check for senders that were waiting on full queue
156164
var replacement: Any? = POLL_FAILED
157165
if (size == capacity) {
@@ -167,7 +175,7 @@ internal open class ArrayChannel<E>(
167175
}
168176
}
169177
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
170-
this.size = size // restore size
178+
this.size.value = size // restore size
171179
buffer[(head + size) % buffer.size] = replacement
172180
}
173181
head = (head + 1) % buffer.size
@@ -184,12 +192,12 @@ internal open class ArrayChannel<E>(
184192
var success = false
185193
var result: Any? = null
186194
lock.withLock {
187-
val size = this.size
195+
val size = this.size.value
188196
if (size == 0) return closedForSend ?: POLL_FAILED
189197
// size > 0: not empty -- retrieve element
190198
result = buffer[head]
191199
buffer[head] = null
192-
this.size = size - 1 // update size before checking queue (!!!)
200+
this.size.value = size - 1 // update size before checking queue (!!!)
193201
// check for senders that were waiting on full queue
194202
var replacement: Any? = POLL_FAILED
195203
if (size == capacity) {
@@ -206,7 +214,7 @@ internal open class ArrayChannel<E>(
206214
failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
207215
failure === RETRY_ATOMIC -> {} // retry
208216
failure === ALREADY_SELECTED -> {
209-
this.size = size // restore size
217+
this.size.value = size // restore size
210218
buffer[head] = result // restore head
211219
return failure
212220
}
@@ -221,12 +229,12 @@ internal open class ArrayChannel<E>(
221229
}
222230
}
223231
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
224-
this.size = size // restore size
232+
this.size.value = size // restore size
225233
buffer[(head + size) % buffer.size] = replacement
226234
} else {
227235
// failed to poll or is already closed --> let's try to select receiving this element from buffer
228236
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
229-
this.size = size // restore size
237+
this.size.value = size // restore size
230238
buffer[head] = result // restore head
231239
return ALREADY_SELECTED
232240
}
@@ -239,16 +247,20 @@ internal open class ArrayChannel<E>(
239247
return result
240248
}
241249

250+
override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
251+
super.enqueueReceiveInternal(receive)
252+
}
253+
242254
// Note: this function is invoked when channel is already closed
243255
override fun onCancelIdempotent(wasClosed: Boolean) {
244256
// clear buffer first, but do not wait for it in helpers
245257
if (wasClosed) {
246258
lock.withLock {
247-
repeat(size) {
259+
repeat(size.value) {
248260
buffer[head] = 0
249261
head = (head + 1) % buffer.size
250262
}
251-
size = 0
263+
size.value = 0
252264
}
253265
}
254266
// then clean all queued senders
@@ -258,5 +270,5 @@ internal open class ArrayChannel<E>(
258270
// ------ debug ------
259271

260272
override val bufferDebugString: String
261-
get() = "(buffer:capacity=$capacity,size=$size)"
273+
get() = "(buffer:capacity=$capacity,size=${size.value})"
262274
}

0 commit comments

Comments
 (0)