Skip to content

Commit 3d731cf

Browse files
committed
Fix SOE with select (WIP)
* onSend/onReceive clauses on the same channel: Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. * Fix SOE in select with "opposite channels" stress-test. The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). Fixes #504 Fixes #1411
1 parent 82aaaff commit 3d731cf

24 files changed

+493
-127
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+9-2
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10481048
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10491049
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10501050
public fun resumeWith (Ljava/lang/Object;)V
1051-
public fun trySelect (Ljava/lang/Object;)Z
1051+
public fun toString ()Ljava/lang/String;
1052+
public fun trySelect ()Z
1053+
public fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10521054
}
10531055

10541056
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1069,13 +1071,18 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10691071
public abstract fun isSelected ()Z
10701072
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10711073
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
1072-
public abstract fun trySelect (Ljava/lang/Object;)Z
1074+
public abstract fun trySelect ()Z
1075+
public abstract fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10731076
}
10741077

10751078
public final class kotlinx/coroutines/selects/SelectKt {
10761079
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10771080
}
10781081

1082+
public synthetic class kotlinx/coroutines/selects/SelectKtSelectOpSequenceNumberRefVolatile {
1083+
public fun <init> (J)V
1084+
}
1085+
10791086
public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
10801087
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10811088
}

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ kotlin_version=1.3.41
55

66
# Dependencies
77
junit_version=4.12
8-
atomicfu_version=0.12.9
8+
atomicfu_version=0.12.9-SNAPSHOT
99
html_version=0.6.8
1010
lincheck_version=2.0
1111
dokka_version=0.9.16-rdev-2-mpp-hacks

kotlinx-coroutines-core/common/src/JobSupport.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
553553
if (select.isSelected) return
554554
if (state !is Incomplete) {
555555
// already complete -- select result
556-
if (select.trySelect(null)) {
556+
if (select.trySelect()) {
557557
block.startCoroutineUnintercepted(select.completion)
558558
}
559559
return
@@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11811181
if (select.isSelected) return
11821182
if (state !is Incomplete) {
11831183
// already complete -- select result
1184-
if (select.trySelect(null)) {
1184+
if (select.trySelect()) {
11851185
if (state is CompletedExceptionally) {
11861186
select.resumeSelectCancellableWithException(state.cause)
11871187
}
@@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
13621362
private val block: suspend () -> R
13631363
) : JobNode<JobSupport>(job) {
13641364
override fun invoke(cause: Throwable?) {
1365-
if (select.trySelect(null))
1365+
if (select.trySelect())
13661366
block.startCoroutineCancellable(select.completion)
13671367
}
13681368
override fun toString(): String = "SelectJoinOnCompletion[$select]"
@@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
13741374
private val block: suspend (T) -> R
13751375
) : JobNode<JobSupport>(job) {
13761376
override fun invoke(cause: Throwable?) {
1377-
if (select.trySelect(null))
1377+
if (select.trySelect())
13781378
job.selectAwaitCompletion(select, block)
13791379
}
13801380
override fun toString(): String = "SelectAwaitOnCompletion[$select]"

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+45-24
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
5656

5757
/**
5858
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59-
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59+
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
6060
* @suppress **This is unstable API and it is subject to change.**
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
@@ -322,7 +322,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
322322
/**
323323
* @suppress **This is unstable API and it is subject to change.**
324324
*/
325-
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
325+
protected fun describeTryOffer(element: E): TryOfferDesc<E> =
326+
TryOfferDesc(element, queue)
326327

327328
/**
328329
* @suppress **This is unstable API and it is subject to change.**
@@ -339,10 +340,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
339340
else -> null
340341
}
341342

342-
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
343-
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
343+
override fun validatePrepared(node: ReceiveOrClosed<E>): Any? {
344+
val token = node.tryResumeReceive(element, idempotent = this) ?: return REMOVE_PREPARED
345+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
344346
resumeToken = token
345-
return true
347+
return null
346348
}
347349
}
348350

@@ -358,7 +360,9 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
358360
if (select.isSelected) return
359361
if (full) {
360362
val node = SendSelect(element, this, select, block)
361-
when (val enqueueResult = enqueueSend(node)) {
363+
val enqueueResult = enqueueSend(node)
364+
// println("enqueueSend($element) -> $enqueueResult")
365+
when (enqueueResult) {
362366
null -> { // enqueued successfully
363367
select.disposeOnSelect(node)
364368
return
@@ -374,6 +378,9 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
374378
when {
375379
offerResult === ALREADY_SELECTED -> return
376380
offerResult === OFFER_FAILED -> {} // retry
381+
offerResult === RETRY_ATOMIC -> {
382+
// println("registerSelectSend($element): RETRY")
383+
} // retry
377384
offerResult === OFFER_SUCCESS -> {
378385
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
379386
return
@@ -423,20 +430,25 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
423430
@JvmField val select: SelectInstance<R>,
424431
@JvmField val block: suspend (SendChannel<E>) -> R
425432
) : Send(), DisposableHandle {
426-
override fun tryResumeSend(idempotent: Any?): Any? =
427-
if (select.trySelect(idempotent)) SELECT_STARTED else null
433+
override fun tryResumeSend(idempotent: Any?): Any? {
434+
val result = select.trySelectIdempotent(idempotent)
435+
// println("SendSelect($pollResult) tryResumeSend=$result")
436+
return result
437+
}
428438

429439
override fun completeResumeSend(token: Any) {
430440
assert { token === SELECT_STARTED }
441+
// println("SendSelect($pollResult) completeResumeSend")
431442
block.startCoroutine(receiver = channel, completion = select.completion)
432443
}
433444

434445
override fun dispose() { // invoked on select completion
446+
// println("SendSelect($pollResult) dispose")
435447
remove()
436448
}
437449

438450
override fun resumeSendClosed(closed: Closed<*>) {
439-
if (select.trySelect(null))
451+
if (select.trySelect())
440452
select.resumeSelectCancellableWithException(closed.sendException)
441453
}
442454

@@ -491,7 +503,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
491503

492504
/**
493505
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
494-
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
506+
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
495507
* @suppress **This is unstable API and it is subject to change.**
496508
*/
497509
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
@@ -625,13 +637,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
625637
/**
626638
* @suppress **This is unstable API and it is subject to change.**
627639
*/
628-
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
640+
protected fun describeTryPoll(): TryPollDesc<E> =
641+
TryPollDesc(queue)
629642

630643
/**
631644
* @suppress **This is unstable API and it is subject to change.**
632645
*/
633-
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
634-
@JvmField var resumeToken: Any? = null
646+
protected class TryPollDesc<E>(
647+
queue: LockFreeLinkedListHead
648+
) : RemoveFirstDesc<Send>(queue) {
649+
@JvmField var resumeToken: Any? = null // can be SELECT_RETRY
635650
@JvmField var pollResult: E? = null
636651

637652
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = when (affected) {
@@ -641,11 +656,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
641656
}
642657

643658
@Suppress("UNCHECKED_CAST")
644-
override fun validatePrepared(node: Send): Boolean {
645-
val token = node.tryResumeSend(idempotent = this) ?: return false
659+
override fun validatePrepared(node: Send): Any? {
660+
val token = node.tryResumeSend(idempotent = this) ?: return REMOVE_PREPARED
661+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
646662
resumeToken = token
647663
pollResult = node.pollResult as E
648-
return true
664+
return null
649665
}
650666
}
651667

@@ -667,6 +683,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
667683
when {
668684
pollResult === ALREADY_SELECTED -> return
669685
pollResult === POLL_FAILED -> {} // retry
686+
pollResult === RETRY_ATOMIC -> {
687+
// println("registerSelectReceive(): RETRY")
688+
} // retry
670689
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
671690
else -> {
672691
block.startCoroutineUnintercepted(pollResult as E, select.completion)
@@ -695,9 +714,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
695714
when {
696715
pollResult === ALREADY_SELECTED -> return
697716
pollResult === POLL_FAILED -> {} // retry
717+
pollResult === RETRY_ATOMIC -> {} // retry
698718
pollResult is Closed<*> -> {
699719
if (pollResult.closeCause == null) {
700-
if (select.trySelect(null))
720+
if (select.trySelect())
701721
block.startCoroutineUnintercepted(null, select.completion)
702722
return
703723
} else {
@@ -732,6 +752,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
732752
when {
733753
pollResult === ALREADY_SELECTED -> return
734754
pollResult === POLL_FAILED -> {} // retry
755+
pollResult === RETRY_ATOMIC -> {} // retry
735756
pollResult is Closed<*> -> {
736757
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
737758
}
@@ -913,8 +934,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
913934
@JvmField val block: suspend (Any?) -> R,
914935
@JvmField val receiveMode: Int
915936
) : Receive<E>(), DisposableHandle {
916-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
917-
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
937+
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
938+
val result = select.trySelectIdempotent(idempotent)
939+
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
940+
}
918941

919942
@Suppress("UNCHECKED_CAST")
920943
override fun completeResumeReceive(token: Any) {
@@ -923,7 +946,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
923946
}
924947

925948
override fun resumeReceiveClosed(closed: Closed<*>) {
926-
if (!select.trySelect(null)) return
949+
if (!select.trySelect()) return
927950
when (receiveMode) {
928951
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
929952
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
@@ -970,10 +993,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
970993
@SharedImmutable
971994
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
972995

973-
@JvmField
974-
@SharedImmutable
975-
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
976-
977996
@JvmField
978997
@SharedImmutable
979998
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
@@ -997,6 +1016,7 @@ internal typealias Handler = (Throwable?) -> Unit
9971016
*/
9981017
internal abstract class Send : LockFreeLinkedListNode() {
9991018
abstract val pollResult: Any? // E | Closed
1019+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeSend
10001020
abstract fun tryResumeSend(idempotent: Any?): Any?
10011021
abstract fun completeResumeSend(token: Any)
10021022
abstract fun resumeSendClosed(closed: Closed<*>)
@@ -1007,6 +1027,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
10071027
*/
10081028
internal interface ReceiveOrClosed<in E> {
10091029
val offerResult: Any // OFFER_SUCCESS | Closed
1030+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeReceive
10101031
fun tryResumeReceive(value: E, idempotent: Any?): Any?
10111032
fun completeResumeReceive(token: Any)
10121033
}

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
105105
val size = this.size
106106
if (size >= capacity) return OFFER_FAILED
107107
// let's try to select sending this element to buffer
108-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
108+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
109109
return ALREADY_SELECTED
110110
}
111111
val tail = this.tail
@@ -299,7 +299,7 @@ internal class ArrayBroadcastChannel<E>(
299299
result === POLL_FAILED -> { /* just bail out of lock */ }
300300
else -> {
301301
// let's try to select receiving this element from buffer
302-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
302+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
303303
result = ALREADY_SELECTED
304304
} else {
305305
// update subHead after retrieiving element from buffer

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ internal open class ArrayChannel<E>(
105105
return@withLock
106106
}
107107
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108+
failure === RETRY_ATOMIC -> {} // retry
108109
failure === ALREADY_SELECTED || failure is Closed<*> -> {
109110
this.size = size // restore size
110111
return failure
@@ -114,7 +115,7 @@ internal open class ArrayChannel<E>(
114115
}
115116
}
116117
// let's try to select sending this element to buffer
117-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
118+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
118119
this.size = size // restore size
119120
return ALREADY_SELECTED
120121
}
@@ -206,6 +207,7 @@ internal open class ArrayChannel<E>(
206207
break@loop
207208
}
208209
failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
210+
failure === RETRY_ATOMIC -> {} // retry
209211
failure === ALREADY_SELECTED -> {
210212
this.size = size // restore size
211213
buffer[head] = result // restore head
@@ -226,7 +228,7 @@ internal open class ArrayChannel<E>(
226228
buffer[(head + size) % buffer.size] = replacement
227229
} else {
228230
// failed to poll or is already closed --> let's try to select receiving this element from buffer
229-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
231+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
230232
this.size = size // restore size
231233
buffer[head] = result // restore head
232234
return ALREADY_SELECTED

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
273273
}
274274

275275
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
276-
if (!select.trySelect(null)) return
276+
if (!select.trySelect()) return
277277
offerInternal(element)?.let {
278278
select.resumeSelectCancellableWithException(it.sendException)
279279
return

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
8585
result === ALREADY_SELECTED -> return ALREADY_SELECTED
8686
result === OFFER_SUCCESS -> return OFFER_SUCCESS
8787
result === OFFER_FAILED -> {} // retry
88+
result === RETRY_ATOMIC -> {} // retry
8889
result is Closed<*> -> return result
8990
else -> error("Invalid result $result")
9091
}

kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.channels
66

7+
import kotlinx.coroutines.internal.*
78
import kotlinx.coroutines.selects.*
89

910
/**
@@ -52,6 +53,7 @@ internal open class LinkedListChannel<E> : AbstractChannel<E>() {
5253
result === ALREADY_SELECTED -> return ALREADY_SELECTED
5354
result === OFFER_SUCCESS -> return OFFER_SUCCESS
5455
result === OFFER_FAILED -> {} // retry
56+
result === RETRY_ATOMIC -> {} // retry
5557
result is Closed<*> -> return result
5658
else -> error("Invalid result $result")
5759
}

0 commit comments

Comments
 (0)