Skip to content

Commit ae36c5b

Browse files
committed
Fix SOE with select (WIP, DO NOT MERGE)
* onSend/onReceive clauses on the same channel: Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. * Fix SOE in select with "opposite channels" stress-test. The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). * TODO:NOTE: SelectDeadlockLFStressTest does not pass and is disabled. !!! DO NOT MERGE !!! Fixes #504 Fixes #1411
1 parent 007d8d7 commit ae36c5b

23 files changed

+608
-175
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+9-2
Original file line numberDiff line numberDiff line change
@@ -1049,7 +1049,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10491049
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10501050
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10511051
public fun resumeWith (Ljava/lang/Object;)V
1052-
public fun trySelect (Ljava/lang/Object;)Z
1052+
public fun toString ()Ljava/lang/String;
1053+
public fun trySelect ()Z
1054+
public fun trySelectIdempotent (Lkotlinx/coroutines/internal/AtomicDesc;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10531055
}
10541056

10551057
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1070,13 +1072,18 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10701072
public abstract fun isSelected ()Z
10711073
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10721074
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
1073-
public abstract fun trySelect (Ljava/lang/Object;)Z
1075+
public abstract fun trySelect ()Z
1076+
public abstract fun trySelectIdempotent (Lkotlinx/coroutines/internal/AtomicDesc;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10741077
}
10751078

10761079
public final class kotlinx/coroutines/selects/SelectKt {
10771080
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10781081
}
10791082

1083+
public synthetic class kotlinx/coroutines/selects/SelectKtSelectOpSequenceNumberRefVolatile {
1084+
public fun <init> (J)V
1085+
}
1086+
10801087
public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
10811088
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10821089
}

kotlinx-coroutines-core/common/src/JobSupport.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
553553
if (select.isSelected) return
554554
if (state !is Incomplete) {
555555
// already complete -- select result
556-
if (select.trySelect(null)) {
556+
if (select.trySelect()) {
557557
block.startCoroutineUnintercepted(select.completion)
558558
}
559559
return
@@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11811181
if (select.isSelected) return
11821182
if (state !is Incomplete) {
11831183
// already complete -- select result
1184-
if (select.trySelect(null)) {
1184+
if (select.trySelect()) {
11851185
if (state is CompletedExceptionally) {
11861186
select.resumeSelectCancellableWithException(state.cause)
11871187
}
@@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
13621362
private val block: suspend () -> R
13631363
) : JobNode<JobSupport>(job) {
13641364
override fun invoke(cause: Throwable?) {
1365-
if (select.trySelect(null))
1365+
if (select.trySelect())
13661366
block.startCoroutineCancellable(select.completion)
13671367
}
13681368
override fun toString(): String = "SelectJoinOnCompletion[$select]"
@@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
13741374
private val block: suspend (T) -> R
13751375
) : JobNode<JobSupport>(job) {
13761376
override fun invoke(cause: Throwable?) {
1377-
if (select.trySelect(null))
1377+
if (select.trySelect())
13781378
job.selectAwaitCompletion(select, block)
13791379
}
13801380
override fun toString(): String = "SelectAwaitOnCompletion[$select]"

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+58-30
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
4646
protected open fun offerInternal(element: E): Any {
4747
while (true) {
4848
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
49-
val token = receive.tryResumeReceive(element, idempotent = null)
49+
val token = receive.tryResumeReceive(element, null, null)
5050
if (token != null) {
5151
receive.completeResumeReceive(token)
5252
return receive.offerResult
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
5656

5757
/**
5858
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59-
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59+
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
6060
* @suppress **This is unstable API and it is subject to change.**
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
@@ -352,10 +352,14 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
352352
else -> null
353353
}
354354

355-
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
356-
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
355+
@Suppress("UNCHECKED_CAST")
356+
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
357+
affected as ReceiveOrClosed<E> // type assertion, see "failure" impl
358+
val beforeStart = { finishPrepare(affected, next) }
359+
val token = affected.tryResumeReceive(element, this, beforeStart) ?: return REMOVE_PREPARED
360+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
357361
resumeToken = token
358-
return true
362+
return null
359363
}
360364
}
361365

@@ -388,6 +392,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
388392
when {
389393
offerResult === ALREADY_SELECTED -> return
390394
offerResult === OFFER_FAILED -> {} // retry
395+
offerResult === RETRY_ATOMIC -> {} // retry
391396
offerResult === OFFER_SUCCESS -> {
392397
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
393398
return
@@ -437,8 +442,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
437442
@JvmField val select: SelectInstance<R>,
438443
@JvmField val block: suspend (SendChannel<E>) -> R
439444
) : Send(), DisposableHandle {
440-
override fun tryResumeSend(idempotent: Any?): Any? =
441-
if (select.trySelect(idempotent)) SELECT_STARTED else null
445+
override fun tryResumeSend(idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? =
446+
select.trySelectIdempotent(idempotent, beforeStart)
442447

443448
override fun completeResumeSend(token: Any) {
444449
assert { token === SELECT_STARTED }
@@ -450,7 +455,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
450455
}
451456

452457
override fun resumeSendClosed(closed: Closed<*>) {
453-
if (select.trySelect(null))
458+
if (select.trySelect())
454459
select.resumeSelectCancellableWithException(closed.sendException)
455460
}
456461

@@ -461,7 +466,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
461466
@JvmField val element: E
462467
) : Send() {
463468
override val pollResult: Any? get() = element
464-
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
469+
override fun tryResumeSend(idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
470+
beforeStart?.invoke()
471+
return SEND_RESUMED
472+
}
465473
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
466474
override fun resumeSendClosed(closed: Closed<*>) {}
467475
}
@@ -495,7 +503,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
495503
protected open fun pollInternal(): Any? {
496504
while (true) {
497505
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
498-
val token = send.tryResumeSend(idempotent = null)
506+
val token = send.tryResumeSend(null, null)
499507
if (token != null) {
500508
send.completeResumeSend(token)
501509
return send.pollResult
@@ -505,7 +513,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
505513

506514
/**
507515
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
508-
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
516+
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
509517
* @suppress **This is unstable API and it is subject to change.**
510518
*/
511519
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
@@ -655,11 +663,14 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
655663
}
656664

657665
@Suppress("UNCHECKED_CAST")
658-
override fun validatePrepared(node: Send): Boolean {
659-
val token = node.tryResumeSend(idempotent = this) ?: return false
666+
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
667+
affected as Send // type assertion, see "failure" impl
668+
val beforeStart = { finishPrepare(affected, next) }
669+
val token = affected.tryResumeSend(this, beforeStart) ?: return REMOVE_PREPARED
670+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
660671
resumeToken = token
661-
pollResult = node.pollResult as E
662-
return true
672+
pollResult = affected.pollResult as E
673+
return null
663674
}
664675
}
665676

@@ -681,6 +692,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
681692
when {
682693
pollResult === ALREADY_SELECTED -> return
683694
pollResult === POLL_FAILED -> {} // retry
695+
pollResult === RETRY_ATOMIC -> {} // retry
684696
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
685697
else -> {
686698
block.startCoroutineUnintercepted(pollResult as E, select.completion)
@@ -709,9 +721,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
709721
when {
710722
pollResult === ALREADY_SELECTED -> return
711723
pollResult === POLL_FAILED -> {} // retry
724+
pollResult === RETRY_ATOMIC -> {} // retry
712725
pollResult is Closed<*> -> {
713726
if (pollResult.closeCause == null) {
714-
if (select.trySelect(null))
727+
if (select.trySelect())
715728
block.startCoroutineUnintercepted(null, select.completion)
716729
return
717730
} else {
@@ -746,6 +759,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
746759
when {
747760
pollResult === ALREADY_SELECTED -> return
748761
pollResult === POLL_FAILED -> {} // retry
762+
pollResult === RETRY_ATOMIC -> {} // retry
749763
pollResult is Closed<*> -> {
750764
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
751765
}
@@ -870,7 +884,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
870884
}
871885

872886
@Suppress("IMPLICIT_CAST_TO_ANY")
873-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(resumeValue(value), idempotent)
887+
override fun tryResumeReceive(value: E, idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
888+
beforeStart?.invoke()
889+
return cont.tryResume(resumeValue(value), idempotent)
890+
}
891+
874892
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
875893
override fun resumeReceiveClosed(closed: Closed<*>) {
876894
when {
@@ -886,7 +904,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
886904
@JvmField val iterator: Itr<E>,
887905
@JvmField val cont: CancellableContinuation<Boolean>
888906
) : Receive<E>() {
889-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
907+
override fun tryResumeReceive(value: E, idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
908+
beforeStart?.invoke()
890909
val token = cont.tryResume(true, idempotent)
891910
if (token != null) {
892911
/*
@@ -928,8 +947,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
928947
@JvmField val block: suspend (Any?) -> R,
929948
@JvmField val receiveMode: Int
930949
) : Receive<E>(), DisposableHandle {
931-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
932-
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
950+
override fun tryResumeReceive(value: E, idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
951+
val result = select.trySelectIdempotent(idempotent, beforeStart)
952+
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
953+
}
933954

934955
@Suppress("UNCHECKED_CAST")
935956
override fun completeResumeReceive(token: Any) {
@@ -938,7 +959,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
938959
}
939960

940961
override fun resumeReceiveClosed(closed: Closed<*>) {
941-
if (!select.trySelect(null)) return
962+
if (!select.trySelect()) return
942963
when (receiveMode) {
943964
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
944965
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
@@ -985,10 +1006,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
9851006
@SharedImmutable
9861007
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
9871008

988-
@JvmField
989-
@SharedImmutable
990-
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
991-
9921009
@JvmField
9931010
@SharedImmutable
9941011
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
@@ -1012,7 +1029,8 @@ internal typealias Handler = (Throwable?) -> Unit
10121029
*/
10131030
internal abstract class Send : LockFreeLinkedListNode() {
10141031
abstract val pollResult: Any? // E | Closed
1015-
abstract fun tryResumeSend(idempotent: Any?): Any?
1032+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeSend
1033+
abstract fun tryResumeSend(idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any?
10161034
abstract fun completeResumeSend(token: Any)
10171035
abstract fun resumeSendClosed(closed: Closed<*>)
10181036
}
@@ -1022,7 +1040,8 @@ internal abstract class Send : LockFreeLinkedListNode() {
10221040
*/
10231041
internal interface ReceiveOrClosed<in E> {
10241042
val offerResult: Any // OFFER_SUCCESS | Closed
1025-
fun tryResumeReceive(value: E, idempotent: Any?): Any?
1043+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeReceive
1044+
fun tryResumeReceive(value: E, idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any?
10261045
fun completeResumeReceive(token: Any)
10271046
}
10281047

@@ -1034,7 +1053,10 @@ internal class SendElement(
10341053
override val pollResult: Any?,
10351054
@JvmField val cont: CancellableContinuation<Unit>
10361055
) : Send() {
1037-
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
1056+
override fun tryResumeSend(idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
1057+
beforeStart?.invoke()
1058+
return cont.tryResume(Unit, idempotent)
1059+
}
10381060
override fun completeResumeSend(token: Any) = cont.completeResume(token)
10391061
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
10401062
override fun toString(): String = "SendElement($pollResult)"
@@ -1051,9 +1073,15 @@ internal class Closed<in E>(
10511073

10521074
override val offerResult get() = this
10531075
override val pollResult get() = this
1054-
override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
1076+
override fun tryResumeSend(idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
1077+
beforeStart?.invoke()
1078+
return CLOSE_RESUMED
1079+
}
10551080
override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } }
1056-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
1081+
override fun tryResumeReceive(value: E, idempotent: AtomicDesc?, beforeStart: (() -> Unit)?): Any? {
1082+
beforeStart?.invoke()
1083+
return CLOSE_RESUMED
1084+
}
10571085
override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } }
10581086
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
10591087
override fun toString(): String = "Closed[$closeCause]"

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
105105
val size = this.size
106106
if (size >= capacity) return OFFER_FAILED
107107
// let's try to select sending this element to buffer
108-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
108+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
109109
return ALREADY_SELECTED
110110
}
111111
val tail = this.tail
@@ -163,7 +163,7 @@ internal class ArrayBroadcastChannel<E>(
163163
while (true) {
164164
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
165165
if (send is Closed<*>) break // break when closed for send
166-
token = send!!.tryResumeSend(idempotent = null)
166+
token = send!!.tryResumeSend(null, null)
167167
if (token != null) {
168168
// put sent element to the buffer
169169
buffer[(tail % capacity).toInt()] = (send as Send).pollResult
@@ -245,7 +245,7 @@ internal class ArrayBroadcastChannel<E>(
245245
// find a receiver for an element
246246
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
247247
if (receive is Closed<*>) break // noting more to do if this sub already closed
248-
token = receive.tryResumeReceive(result as E, idempotent = null)
248+
token = receive.tryResumeReceive(result as E, null, null)
249249
if (token == null) continue // bail out here to next iteration (see for next receiver)
250250
val subHead = this.subHead
251251
this.subHead = subHead + 1 // retrieved element for this subscriber
@@ -299,7 +299,7 @@ internal class ArrayBroadcastChannel<E>(
299299
result === POLL_FAILED -> { /* just bail out of lock */ }
300300
else -> {
301301
// let's try to select receiving this element from buffer
302-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
302+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
303303
result = ALREADY_SELECTED
304304
} else {
305305
// update subHead after retrieiving element from buffer

0 commit comments

Comments
 (0)