Skip to content

Commit bce71ef

Browse files
committed
Fix SOE with select (WIP, DO NOT MERGE)
* onSend/onReceive clauses on the same channel: Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. * Fix SOE in select with "opposite channels" stress-test. The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). * TODO:NOTE: SelectDeadlockLFStressTest does not pass and is disabled. !!! DO NOT MERGE !!! Fixes #504 Fixes #1411
1 parent 007d8d7 commit bce71ef

23 files changed

+497
-127
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+9-2
Original file line numberDiff line numberDiff line change
@@ -1049,7 +1049,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10491049
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10501050
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10511051
public fun resumeWith (Ljava/lang/Object;)V
1052-
public fun trySelect (Ljava/lang/Object;)Z
1052+
public fun toString ()Ljava/lang/String;
1053+
public fun trySelect ()Z
1054+
public fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10531055
}
10541056

10551057
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1070,13 +1072,18 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10701072
public abstract fun isSelected ()Z
10711073
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10721074
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
1073-
public abstract fun trySelect (Ljava/lang/Object;)Z
1075+
public abstract fun trySelect ()Z
1076+
public abstract fun trySelectIdempotent (Ljava/lang/Object;)Ljava/lang/Object;
10741077
}
10751078

10761079
public final class kotlinx/coroutines/selects/SelectKt {
10771080
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10781081
}
10791082

1083+
public synthetic class kotlinx/coroutines/selects/SelectKtSelectOpSequenceNumberRefVolatile {
1084+
public fun <init> (J)V
1085+
}
1086+
10801087
public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
10811088
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10821089
}

kotlinx-coroutines-core/common/src/JobSupport.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
553553
if (select.isSelected) return
554554
if (state !is Incomplete) {
555555
// already complete -- select result
556-
if (select.trySelect(null)) {
556+
if (select.trySelect()) {
557557
block.startCoroutineUnintercepted(select.completion)
558558
}
559559
return
@@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11811181
if (select.isSelected) return
11821182
if (state !is Incomplete) {
11831183
// already complete -- select result
1184-
if (select.trySelect(null)) {
1184+
if (select.trySelect()) {
11851185
if (state is CompletedExceptionally) {
11861186
select.resumeSelectCancellableWithException(state.cause)
11871187
}
@@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
13621362
private val block: suspend () -> R
13631363
) : JobNode<JobSupport>(job) {
13641364
override fun invoke(cause: Throwable?) {
1365-
if (select.trySelect(null))
1365+
if (select.trySelect())
13661366
block.startCoroutineCancellable(select.completion)
13671367
}
13681368
override fun toString(): String = "SelectJoinOnCompletion[$select]"
@@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
13741374
private val block: suspend (T) -> R
13751375
) : JobNode<JobSupport>(job) {
13761376
override fun invoke(cause: Throwable?) {
1377-
if (select.trySelect(null))
1377+
if (select.trySelect())
13781378
job.selectAwaitCompletion(select, block)
13791379
}
13801380
override fun toString(): String = "SelectAwaitOnCompletion[$select]"

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+24-18
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
5656

5757
/**
5858
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59-
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59+
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
6060
* @suppress **This is unstable API and it is subject to change.**
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
@@ -352,10 +352,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
352352
else -> null
353353
}
354354

355-
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
356-
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
355+
override fun validatePrepared(node: ReceiveOrClosed<E>): Any? {
356+
val token = node.tryResumeReceive(element, idempotent = this) ?: return REMOVE_PREPARED
357+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
357358
resumeToken = token
358-
return true
359+
return null
359360
}
360361
}
361362

@@ -388,6 +389,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
388389
when {
389390
offerResult === ALREADY_SELECTED -> return
390391
offerResult === OFFER_FAILED -> {} // retry
392+
offerResult === RETRY_ATOMIC -> {} // retry
391393
offerResult === OFFER_SUCCESS -> {
392394
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
393395
return
@@ -438,7 +440,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
438440
@JvmField val block: suspend (SendChannel<E>) -> R
439441
) : Send(), DisposableHandle {
440442
override fun tryResumeSend(idempotent: Any?): Any? =
441-
if (select.trySelect(idempotent)) SELECT_STARTED else null
443+
select.trySelectIdempotent(idempotent)
442444

443445
override fun completeResumeSend(token: Any) {
444446
assert { token === SELECT_STARTED }
@@ -450,7 +452,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
450452
}
451453

452454
override fun resumeSendClosed(closed: Closed<*>) {
453-
if (select.trySelect(null))
455+
if (select.trySelect())
454456
select.resumeSelectCancellableWithException(closed.sendException)
455457
}
456458

@@ -505,7 +507,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
505507

506508
/**
507509
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
508-
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
510+
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
509511
* @suppress **This is unstable API and it is subject to change.**
510512
*/
511513
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
@@ -655,11 +657,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
655657
}
656658

657659
@Suppress("UNCHECKED_CAST")
658-
override fun validatePrepared(node: Send): Boolean {
659-
val token = node.tryResumeSend(idempotent = this) ?: return false
660+
override fun validatePrepared(node: Send): Any? {
661+
val token = node.tryResumeSend(idempotent = this) ?: return REMOVE_PREPARED
662+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
660663
resumeToken = token
661664
pollResult = node.pollResult as E
662-
return true
665+
return null
663666
}
664667
}
665668

@@ -681,6 +684,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
681684
when {
682685
pollResult === ALREADY_SELECTED -> return
683686
pollResult === POLL_FAILED -> {} // retry
687+
pollResult === RETRY_ATOMIC -> {}
684688
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
685689
else -> {
686690
block.startCoroutineUnintercepted(pollResult as E, select.completion)
@@ -709,9 +713,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
709713
when {
710714
pollResult === ALREADY_SELECTED -> return
711715
pollResult === POLL_FAILED -> {} // retry
716+
pollResult === RETRY_ATOMIC -> {} // retry
712717
pollResult is Closed<*> -> {
713718
if (pollResult.closeCause == null) {
714-
if (select.trySelect(null))
719+
if (select.trySelect())
715720
block.startCoroutineUnintercepted(null, select.completion)
716721
return
717722
} else {
@@ -746,6 +751,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
746751
when {
747752
pollResult === ALREADY_SELECTED -> return
748753
pollResult === POLL_FAILED -> {} // retry
754+
pollResult === RETRY_ATOMIC -> {} // retry
749755
pollResult is Closed<*> -> {
750756
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
751757
}
@@ -928,8 +934,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
928934
@JvmField val block: suspend (Any?) -> R,
929935
@JvmField val receiveMode: Int
930936
) : Receive<E>(), DisposableHandle {
931-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
932-
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
937+
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
938+
val result = select.trySelectIdempotent(idempotent)
939+
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
940+
}
933941

934942
@Suppress("UNCHECKED_CAST")
935943
override fun completeResumeReceive(token: Any) {
@@ -938,7 +946,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
938946
}
939947

940948
override fun resumeReceiveClosed(closed: Closed<*>) {
941-
if (!select.trySelect(null)) return
949+
if (!select.trySelect()) return
942950
when (receiveMode) {
943951
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
944952
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
@@ -985,10 +993,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
985993
@SharedImmutable
986994
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
987995

988-
@JvmField
989-
@SharedImmutable
990-
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
991-
992996
@JvmField
993997
@SharedImmutable
994998
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
@@ -1012,6 +1016,7 @@ internal typealias Handler = (Throwable?) -> Unit
10121016
*/
10131017
internal abstract class Send : LockFreeLinkedListNode() {
10141018
abstract val pollResult: Any? // E | Closed
1019+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeSend
10151020
abstract fun tryResumeSend(idempotent: Any?): Any?
10161021
abstract fun completeResumeSend(token: Any)
10171022
abstract fun resumeSendClosed(closed: Closed<*>)
@@ -1022,6 +1027,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
10221027
*/
10231028
internal interface ReceiveOrClosed<in E> {
10241029
val offerResult: Any // OFFER_SUCCESS | Closed
1030+
// Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeReceive
10251031
fun tryResumeReceive(value: E, idempotent: Any?): Any?
10261032
fun completeResumeReceive(token: Any)
10271033
}

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
105105
val size = this.size
106106
if (size >= capacity) return OFFER_FAILED
107107
// let's try to select sending this element to buffer
108-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
108+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
109109
return ALREADY_SELECTED
110110
}
111111
val tail = this.tail
@@ -299,7 +299,7 @@ internal class ArrayBroadcastChannel<E>(
299299
result === POLL_FAILED -> { /* just bail out of lock */ }
300300
else -> {
301301
// let's try to select receiving this element from buffer
302-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
302+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
303303
result = ALREADY_SELECTED
304304
} else {
305305
// update subHead after retrieiving element from buffer

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ internal open class ArrayChannel<E>(
105105
return@withLock
106106
}
107107
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108+
failure === RETRY_ATOMIC -> {} // retry
108109
failure === ALREADY_SELECTED || failure is Closed<*> -> {
109110
this.size = size // restore size
110111
return failure
@@ -114,7 +115,7 @@ internal open class ArrayChannel<E>(
114115
}
115116
}
116117
// let's try to select sending this element to buffer
117-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
118+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
118119
this.size = size // restore size
119120
return ALREADY_SELECTED
120121
}
@@ -206,6 +207,7 @@ internal open class ArrayChannel<E>(
206207
break@loop
207208
}
208209
failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
210+
failure === RETRY_ATOMIC -> {} // retry
209211
failure === ALREADY_SELECTED -> {
210212
this.size = size // restore size
211213
buffer[head] = result // restore head
@@ -226,7 +228,7 @@ internal open class ArrayChannel<E>(
226228
buffer[(head + size) % buffer.size] = replacement
227229
} else {
228230
// failed to poll or is already closed --> let's try to select receiving this element from buffer
229-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
231+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
230232
this.size = size // restore size
231233
buffer[head] = result // restore head
232234
return ALREADY_SELECTED

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
273273
}
274274

275275
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
276-
if (!select.trySelect(null)) return
276+
if (!select.trySelect()) return
277277
offerInternal(element)?.let {
278278
select.resumeSelectCancellableWithException(it.sendException)
279279
return

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -85,6 +85,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
8585
result === ALREADY_SELECTED -> return ALREADY_SELECTED
8686
result === OFFER_SUCCESS -> return OFFER_SUCCESS
8787
result === OFFER_FAILED -> {} // retry
88+
result === RETRY_ATOMIC -> {} // retry
8889
result is Closed<*> -> return result
8990
else -> error("Invalid result $result")
9091
}

kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.channels
66

7+
import kotlinx.coroutines.internal.*
78
import kotlinx.coroutines.selects.*
89

910
/**
@@ -51,6 +52,7 @@ internal open class LinkedListChannel<E> : AbstractChannel<E>() {
5152
result === ALREADY_SELECTED -> return ALREADY_SELECTED
5253
result === OFFER_SUCCESS -> return OFFER_SUCCESS
5354
result === OFFER_FAILED -> {} // retry
55+
result === RETRY_ATOMIC -> {} // retry
5456
result is Closed<*> -> return result
5557
else -> error("Invalid result $result")
5658
}

kotlinx-coroutines-core/common/src/internal/Atomic.kt

+41-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.internal
66

77
import kotlinx.atomicfu.atomic
88
import kotlinx.coroutines.*
9+
import kotlin.jvm.*
910

1011
/**
1112
* The most abstract operation that can be in process. Other threads observing an instance of this
@@ -19,6 +20,20 @@ public abstract class OpDescriptor {
1920
* object that indicates the failure reason.
2021
*/
2122
abstract fun perform(affected: Any?): Any?
23+
24+
/**
25+
* Returns reference to atomic operation that this descriptor is a part of or `null`
26+
* if not a part of any [AtomicOp].
27+
*/
28+
abstract val atomicOp: AtomicOp<*>?
29+
30+
override fun toString(): String = "$classSimpleName@$hexAddress" // debug
31+
32+
fun isEarlierThan(that: OpDescriptor): Boolean {
33+
val thisOp = atomicOp ?: return false
34+
val thatOp = that.atomicOp ?: return false
35+
return thisOp.opSequence < thatOp.opSequence
36+
}
2237
}
2338

2439
@SharedImmutable
@@ -40,6 +55,19 @@ public abstract class AtomicOp<in T> : OpDescriptor() {
4055

4156
val isDecided: Boolean get() = _consensus.value !== NO_DECISION
4257

58+
/**
59+
* Sequence number of this multi-word operation for deadlock resolution.
60+
* An operation with lower number aborts itself with (using [RETRY_ATOMIC] error symbol) if it encounters
61+
* the need to help the operation with higher sequence number and then restarts
62+
* (using higher `opSequence` to ensure progress).
63+
* Simple operations that cannot get into the deadlock always return zero here.
64+
*
65+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/504
66+
*/
67+
open val opSequence: Long get() = 0L
68+
69+
override val atomicOp: AtomicOp<*> get() = this
70+
4371
fun tryDecide(decision: Any?): Boolean {
4472
assert { decision !== NO_DECISION }
4573
return _consensus.compareAndSet(NO_DECISION, decision)
@@ -59,7 +87,7 @@ public abstract class AtomicOp<in T> : OpDescriptor() {
5987
if (decision === NO_DECISION) {
6088
decision = decide(prepare(affected as T))
6189
}
62-
90+
// complete operation
6391
complete(affected as T, decision)
6492
return decision
6593
}
@@ -71,6 +99,17 @@ public abstract class AtomicOp<in T> : OpDescriptor() {
7199
* @suppress **This is unstable API and it is subject to change.**
72100
*/
73101
public abstract class AtomicDesc {
102+
lateinit var atomicOp: AtomicOp<*> // the reference to parent atomicOp, init when AtomicOp is created
74103
abstract fun prepare(op: AtomicOp<*>): Any? // returns `null` if prepared successfully
75104
abstract fun complete(op: AtomicOp<*>, failure: Any?) // decision == null if success
105+
106+
override fun toString(): String = "$classSimpleName@$hexAddress(atomicOp=$atomicOp)" // for debug
76107
}
108+
109+
/**
110+
* It is returned as an error by [AtomicOp] implementations when they detect potential deadlock
111+
* using [AtomicOp.opSequence] numbers.
112+
*/
113+
@JvmField
114+
@SharedImmutable
115+
internal val RETRY_ATOMIC: Any = Symbol("RETRY_ATOMIC")

0 commit comments

Comments
 (0)