Skip to content

Commit 3dbe82b

Browse files
committed
Fix StackOverflowException with select expressions
* onSend/onReceive clauses on the same channel: Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. * Fix SOE in select with "opposite channels" stress-test. The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). Fixes #504 Fixes #1411
1 parent 8e2a84d commit 3dbe82b

23 files changed

+747
-324
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+5-2
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10501050
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10511051
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10521052
public fun resumeWith (Ljava/lang/Object;)V
1053-
public fun trySelect (Ljava/lang/Object;)Z
1053+
public fun toString ()Ljava/lang/String;
1054+
public fun trySelect ()Z
1055+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
10541056
}
10551057

10561058
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1071,7 +1073,8 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10711073
public abstract fun isSelected ()Z
10721074
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10731075
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
1074-
public abstract fun trySelect (Ljava/lang/Object;)Z
1076+
public abstract fun trySelect ()Z
1077+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
10751078
}
10761079

10771080
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/JobSupport.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
553553
if (select.isSelected) return
554554
if (state !is Incomplete) {
555555
// already complete -- select result
556-
if (select.trySelect(null)) {
556+
if (select.trySelect()) {
557557
block.startCoroutineUnintercepted(select.completion)
558558
}
559559
return
@@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11811181
if (select.isSelected) return
11821182
if (state !is Incomplete) {
11831183
// already complete -- select result
1184-
if (select.trySelect(null)) {
1184+
if (select.trySelect()) {
11851185
if (state is CompletedExceptionally) {
11861186
select.resumeSelectCancellableWithException(state.cause)
11871187
}
@@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
13621362
private val block: suspend () -> R
13631363
) : JobNode<JobSupport>(job) {
13641364
override fun invoke(cause: Throwable?) {
1365-
if (select.trySelect(null))
1365+
if (select.trySelect())
13661366
block.startCoroutineCancellable(select.completion)
13671367
}
13681368
override fun toString(): String = "SelectJoinOnCompletion[$select]"
@@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
13741374
private val block: suspend (T) -> R
13751375
) : JobNode<JobSupport>(job) {
13761376
override fun invoke(cause: Throwable?) {
1377-
if (select.trySelect(null))
1377+
if (select.trySelect())
13781378
job.selectAwaitCompletion(select, block)
13791379
}
13801380
override fun toString(): String = "SelectAwaitOnCompletion[$select]"

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+56-33
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
4646
protected open fun offerInternal(element: E): Any {
4747
while (true) {
4848
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
49-
val token = receive.tryResumeReceive(element, idempotent = null)
49+
val token = receive.tryResumeReceive(element, null)
5050
if (token != null) {
5151
receive.completeResumeReceive(token)
5252
return receive.offerResult
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
5656

5757
/**
5858
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59-
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59+
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
6060
* @suppress **This is unstable API and it is subject to change.**
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
@@ -362,10 +362,13 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
362362
else -> null
363363
}
364364

365-
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
366-
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
365+
@Suppress("UNCHECKED_CAST")
366+
override fun onPrepare(prepareOp: PrepareOp): Any? {
367+
val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
368+
val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
369+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
367370
resumeToken = token
368-
return true
371+
return null
369372
}
370373
}
371374

@@ -398,6 +401,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
398401
when {
399402
offerResult === ALREADY_SELECTED -> return
400403
offerResult === OFFER_FAILED -> {} // retry
404+
offerResult === RETRY_ATOMIC -> {} // retry
401405
offerResult === OFFER_SUCCESS -> {
402406
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
403407
return
@@ -447,8 +451,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
447451
@JvmField val select: SelectInstance<R>,
448452
@JvmField val block: suspend (SendChannel<E>) -> R
449453
) : Send(), DisposableHandle {
450-
override fun tryResumeSend(idempotent: Any?): Any? =
451-
if (select.trySelect(idempotent)) SELECT_STARTED else null
454+
override fun tryResumeSend(otherOp: PrepareOp?): Any? =
455+
select.trySelectOther(otherOp)
452456

453457
override fun completeResumeSend(token: Any) {
454458
assert { token === SELECT_STARTED }
@@ -460,7 +464,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
460464
}
461465

462466
override fun resumeSendClosed(closed: Closed<*>) {
463-
if (select.trySelect(null))
467+
if (select.trySelect())
464468
select.resumeSelectCancellableWithException(closed.sendException)
465469
}
466470

@@ -471,7 +475,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
471475
@JvmField val element: E
472476
) : Send() {
473477
override val pollResult: Any? get() = element
474-
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
478+
override fun tryResumeSend(otherOp: PrepareOp?): Any? = SEND_RESUMED.also { otherOp?.finishPrepare() }
475479
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
476480
override fun resumeSendClosed(closed: Closed<*>) {}
477481
}
@@ -505,7 +509,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
505509
protected open fun pollInternal(): Any? {
506510
while (true) {
507511
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
508-
val token = send.tryResumeSend(idempotent = null)
512+
val token = send.tryResumeSend(null)
509513
if (token != null) {
510514
send.completeResumeSend(token)
511515
return send.pollResult
@@ -515,7 +519,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
515519

516520
/**
517521
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
518-
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
522+
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
519523
* @suppress **This is unstable API and it is subject to change.**
520524
*/
521525
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
@@ -679,11 +683,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
679683
}
680684

681685
@Suppress("UNCHECKED_CAST")
682-
override fun validatePrepared(node: Send): Boolean {
683-
val token = node.tryResumeSend(idempotent = this) ?: return false
686+
override fun onPrepare(prepareOp: PrepareOp): Any? {
687+
val affected = prepareOp.affected as Send // see "failure" impl
688+
val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
689+
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
684690
resumeToken = token
685-
pollResult = node.pollResult as E
686-
return true
691+
pollResult = affected.pollResult as E
692+
return null
687693
}
688694
}
689695

@@ -705,6 +711,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
705711
when {
706712
pollResult === ALREADY_SELECTED -> return
707713
pollResult === POLL_FAILED -> {} // retry
714+
pollResult === RETRY_ATOMIC -> {} // retry
708715
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
709716
else -> {
710717
block.startCoroutineUnintercepted(pollResult as E, select.completion)
@@ -733,9 +740,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
733740
when {
734741
pollResult === ALREADY_SELECTED -> return
735742
pollResult === POLL_FAILED -> {} // retry
743+
pollResult === RETRY_ATOMIC -> {} // retry
736744
pollResult is Closed<*> -> {
737745
if (pollResult.closeCause == null) {
738-
if (select.trySelect(null))
746+
if (select.trySelect())
739747
block.startCoroutineUnintercepted(null, select.completion)
740748
return
741749
} else {
@@ -770,6 +778,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
770778
when {
771779
pollResult === ALREADY_SELECTED -> return
772780
pollResult === POLL_FAILED -> {} // retry
781+
pollResult === RETRY_ATOMIC -> {} // retry
773782
pollResult is Closed<*> -> {
774783
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
775784
}
@@ -894,7 +903,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
894903
}
895904

896905
@Suppress("IMPLICIT_CAST_TO_ANY")
897-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(resumeValue(value), idempotent)
906+
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
907+
otherOp?.finishPrepare()
908+
return cont.tryResume(resumeValue(value), otherOp?.desc)
909+
}
910+
898911
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
899912
override fun resumeReceiveClosed(closed: Closed<*>) {
900913
when {
@@ -910,15 +923,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
910923
@JvmField val iterator: Itr<E>,
911924
@JvmField val cont: CancellableContinuation<Boolean>
912925
) : Receive<E>() {
913-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
914-
val token = cont.tryResume(true, idempotent)
926+
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
927+
otherOp?.finishPrepare()
928+
val token = cont.tryResume(true, otherOp?.desc)
915929
if (token != null) {
916930
/*
917-
When idempotent != null this invocation can be stale and we cannot directly update iterator.result
931+
When otherOp != null this invocation can be stale and we cannot directly update iterator.result
918932
Instead, we save both token & result into a temporary IdempotentTokenValue object and
919933
set iterator result only in completeResumeReceive that is going to be invoked just once
920934
*/
921-
if (idempotent != null) return IdempotentTokenValue(token, value)
935+
if (otherOp != null) return IdempotentTokenValue(token, value)
922936
iterator.result = value
923937
}
924938
return token
@@ -952,8 +966,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
952966
@JvmField val block: suspend (Any?) -> R,
953967
@JvmField val receiveMode: Int
954968
) : Receive<E>(), DisposableHandle {
955-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
956-
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
969+
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
970+
val result = select.trySelectOther(otherOp)
971+
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
972+
}
957973

958974
@Suppress("UNCHECKED_CAST")
959975
override fun completeResumeReceive(token: Any) {
@@ -962,7 +978,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
962978
}
963979

964980
override fun resumeReceiveClosed(closed: Closed<*>) {
965-
if (!select.trySelect(null)) return
981+
if (!select.trySelect()) return
966982
when (receiveMode) {
967983
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
968984
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
@@ -1009,10 +1025,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
10091025
@SharedImmutable
10101026
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
10111027

1012-
@JvmField
1013-
@SharedImmutable
1014-
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
1015-
10161028
@JvmField
10171029
@SharedImmutable
10181030
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
@@ -1036,7 +1048,11 @@ internal typealias Handler = (Throwable?) -> Unit
10361048
*/
10371049
internal abstract class Send : LockFreeLinkedListNode() {
10381050
abstract val pollResult: Any? // E | Closed
1039-
abstract fun tryResumeSend(idempotent: Any?): Any?
1051+
// Returns: null - failure,
1052+
// RETRY_ATOMIC for retry (only when otherOp != null),
1053+
// otherwise token for completeResumeSend
1054+
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1055+
abstract fun tryResumeSend(otherOp: PrepareOp?): Any?
10401056
abstract fun completeResumeSend(token: Any)
10411057
abstract fun resumeSendClosed(closed: Closed<*>)
10421058
}
@@ -1046,7 +1062,11 @@ internal abstract class Send : LockFreeLinkedListNode() {
10461062
*/
10471063
internal interface ReceiveOrClosed<in E> {
10481064
val offerResult: Any // OFFER_SUCCESS | Closed
1049-
fun tryResumeReceive(value: E, idempotent: Any?): Any?
1065+
// Returns: null - failure,
1066+
// RETRY_ATOMIC for retry (only when otherOp != null),
1067+
// otherwise token for completeResumeReceive
1068+
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1069+
fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any?
10501070
fun completeResumeReceive(token: Any)
10511071
}
10521072

@@ -1058,7 +1078,10 @@ internal class SendElement(
10581078
override val pollResult: Any?,
10591079
@JvmField val cont: CancellableContinuation<Unit>
10601080
) : Send() {
1061-
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
1081+
override fun tryResumeSend(otherOp: PrepareOp?): Any? {
1082+
otherOp?.finishPrepare()
1083+
return cont.tryResume(Unit, otherOp?.desc)
1084+
}
10621085
override fun completeResumeSend(token: Any) = cont.completeResume(token)
10631086
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
10641087
override fun toString(): String = "SendElement($pollResult)"
@@ -1075,9 +1098,9 @@ internal class Closed<in E>(
10751098

10761099
override val offerResult get() = this
10771100
override val pollResult get() = this
1078-
override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
1101+
override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
10791102
override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } }
1080-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
1103+
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
10811104
override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } }
10821105
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
10831106
override fun toString(): String = "Closed[$closeCause]"

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
105105
val size = this.size
106106
if (size >= capacity) return OFFER_FAILED
107107
// let's try to select sending this element to buffer
108-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
108+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
109109
return ALREADY_SELECTED
110110
}
111111
val tail = this.tail
@@ -163,7 +163,7 @@ internal class ArrayBroadcastChannel<E>(
163163
while (true) {
164164
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
165165
if (send is Closed<*>) break // break when closed for send
166-
token = send!!.tryResumeSend(idempotent = null)
166+
token = send!!.tryResumeSend(null)
167167
if (token != null) {
168168
// put sent element to the buffer
169169
buffer[(tail % capacity).toInt()] = (send as Send).pollResult
@@ -242,7 +242,7 @@ internal class ArrayBroadcastChannel<E>(
242242
// find a receiver for an element
243243
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
244244
if (receive is Closed<*>) break // noting more to do if this sub already closed
245-
token = receive.tryResumeReceive(result as E, idempotent = null)
245+
token = receive.tryResumeReceive(result as E, null)
246246
if (token == null) continue // bail out here to next iteration (see for next receiver)
247247
val subHead = this.subHead
248248
this.subHead = subHead + 1 // retrieved element for this subscriber
@@ -296,7 +296,7 @@ internal class ArrayBroadcastChannel<E>(
296296
result === POLL_FAILED -> { /* just bail out of lock */ }
297297
else -> {
298298
// let's try to select receiving this element from buffer
299-
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
299+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
300300
result = ALREADY_SELECTED
301301
} else {
302302
// update subHead after retrieiving element from buffer

0 commit comments

Comments
 (0)