Skip to content

Fix StackOverflowError with select expressions #1524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun trySelect (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public fun trySelect ()Z
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
Expand All @@ -1071,7 +1073,8 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun isSelected ()Z
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect (Ljava/lang/Object;)Z
public abstract fun trySelect ()Z
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/selects/SelectKt {
Expand Down
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (select.trySelect()) {
block.startCoroutineUnintercepted(select.completion)
}
return
Expand Down Expand Up @@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (select.trySelect()) {
if (state is CompletedExceptionally) {
select.resumeSelectCancellableWithException(state.cause)
}
Expand Down Expand Up @@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion<R>(
private val block: suspend () -> R
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
if (select.trySelect(null))
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
Expand All @@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion<T, R>(
private val block: suspend (T) -> R
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
if (select.trySelect(null))
if (select.trySelect())
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
Expand Down
89 changes: 56 additions & 33 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
protected open fun offerInternal(element: E): Any {
while (true) {
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
val token = receive.tryResumeReceive(element, idempotent = null)
val token = receive.tryResumeReceive(element, null)
if (token != null) {
receive.completeResumeReceive(token)
return receive.offerResult
Expand All @@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {

/**
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
Expand Down Expand Up @@ -362,10 +362,13 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
else -> null
}

override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
val token = node.tryResumeReceive(element, idempotent = this) ?: return false
@Suppress("UNCHECKED_CAST")
override fun onPrepare(prepareOp: PrepareOp): Any? {
val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
resumeToken = token
return true
return null
}
}

Expand Down Expand Up @@ -398,6 +401,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
when {
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === RETRY_ATOMIC -> {} // retry
offerResult === OFFER_SUCCESS -> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
Expand Down Expand Up @@ -447,8 +451,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
) : Send(), DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null
override fun tryResumeSend(otherOp: PrepareOp?): Any? =
select.trySelectOther(otherOp)

override fun completeResumeSend(token: Any) {
assert { token === SELECT_STARTED }
Expand All @@ -460,7 +464,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
}

override fun resumeSendClosed(closed: Closed<*>) {
if (select.trySelect(null))
if (select.trySelect())
select.resumeSelectCancellableWithException(closed.sendException)
}

Expand All @@ -471,7 +475,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
@JvmField val element: E
) : Send() {
override val pollResult: Any? get() = element
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
override fun tryResumeSend(otherOp: PrepareOp?): Any? = SEND_RESUMED.also { otherOp?.finishPrepare() }
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
override fun resumeSendClosed(closed: Closed<*>) {}
}
Expand Down Expand Up @@ -505,7 +509,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
protected open fun pollInternal(): Any? {
while (true) {
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
val token = send.tryResumeSend(idempotent = null)
val token = send.tryResumeSend(null)
if (token != null) {
send.completeResumeSend(token)
return send.pollResult
Expand All @@ -515,7 +519,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

/**
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
Expand Down Expand Up @@ -679,11 +683,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("UNCHECKED_CAST")
override fun validatePrepared(node: Send): Boolean {
val token = node.tryResumeSend(idempotent = this) ?: return false
override fun onPrepare(prepareOp: PrepareOp): Any? {
val affected = prepareOp.affected as Send // see "failure" impl
val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
resumeToken = token
pollResult = node.pollResult as E
return true
pollResult = affected.pollResult as E
return null
}
}

Expand All @@ -705,6 +711,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult === RETRY_ATOMIC -> {} // retry
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
else -> {
block.startCoroutineUnintercepted(pollResult as E, select.completion)
Expand Down Expand Up @@ -733,9 +740,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult === RETRY_ATOMIC -> {} // retry
pollResult is Closed<*> -> {
if (pollResult.closeCause == null) {
if (select.trySelect(null))
if (select.trySelect())
block.startCoroutineUnintercepted(null, select.completion)
return
} else {
Expand Down Expand Up @@ -770,6 +778,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult === RETRY_ATOMIC -> {} // retry
pollResult is Closed<*> -> {
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
}
Expand Down Expand Up @@ -894,7 +903,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(resumeValue(value), idempotent)
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
otherOp?.finishPrepare()
return cont.tryResume(resumeValue(value), otherOp?.desc)
}

override fun completeResumeReceive(token: Any) = cont.completeResume(token)
override fun resumeReceiveClosed(closed: Closed<*>) {
when {
Expand All @@ -910,15 +923,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
) : Receive<E>() {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
val token = cont.tryResume(true, idempotent)
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
otherOp?.finishPrepare()
val token = cont.tryResume(true, otherOp?.desc)
if (token != null) {
/*
When idempotent != null this invocation can be stale and we cannot directly update iterator.result
When otherOp != null this invocation can be stale and we cannot directly update iterator.result
Instead, we save both token & result into a temporary IdempotentTokenValue object and
set iterator result only in completeResumeReceive that is going to be invoked just once
*/
if (idempotent != null) return IdempotentTokenValue(token, value)
if (otherOp != null) return IdempotentTokenValue(token, value)
iterator.result = value
}
return token
Expand Down Expand Up @@ -952,8 +966,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
val result = select.trySelectOther(otherOp)
return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
}

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
Expand All @@ -962,7 +978,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect(null)) return
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
Expand Down Expand Up @@ -1009,10 +1025,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
@SharedImmutable
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")

@JvmField
@SharedImmutable
internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")

@JvmField
@SharedImmutable
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
Expand All @@ -1036,7 +1048,11 @@ internal typealias Handler = (Throwable?) -> Unit
*/
internal abstract class Send : LockFreeLinkedListNode() {
abstract val pollResult: Any? // E | Closed
abstract fun tryResumeSend(idempotent: Any?): Any?
// Returns: null - failure,
// RETRY_ATOMIC for retry (only when otherOp != null),
// otherwise token for completeResumeSend
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
abstract fun tryResumeSend(otherOp: PrepareOp?): Any?
abstract fun completeResumeSend(token: Any)
abstract fun resumeSendClosed(closed: Closed<*>)
}
Expand All @@ -1046,7 +1062,11 @@ internal abstract class Send : LockFreeLinkedListNode() {
*/
internal interface ReceiveOrClosed<in E> {
val offerResult: Any // OFFER_SUCCESS | Closed
fun tryResumeReceive(value: E, idempotent: Any?): Any?
// Returns: null - failure,
// RETRY_ATOMIC for retry (only when otherOp != null),
// otherwise token for completeResumeReceive
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any?
fun completeResumeReceive(token: Any)
}

Expand All @@ -1058,7 +1078,10 @@ internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation<Unit>
) : Send() {
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun tryResumeSend(otherOp: PrepareOp?): Any? {
otherOp?.finishPrepare()
return cont.tryResume(Unit, otherOp?.desc)
}
override fun completeResumeSend(token: Any) = cont.completeResume(token)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun toString(): String = "SendElement($pollResult)"
Expand All @@ -1075,9 +1098,9 @@ internal class Closed<in E>(

override val offerResult get() = this
override val pollResult get() = this
override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } }
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } }
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed[$closeCause]"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand Down Expand Up @@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel<E>(
val size = this.size
if (size >= capacity) return OFFER_FAILED
// let's try to select sending this element to buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
return ALREADY_SELECTED
}
val tail = this.tail
Expand Down Expand Up @@ -163,7 +163,7 @@ internal class ArrayBroadcastChannel<E>(
while (true) {
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
if (send is Closed<*>) break // break when closed for send
token = send!!.tryResumeSend(idempotent = null)
token = send!!.tryResumeSend(null)
if (token != null) {
// put sent element to the buffer
buffer[(tail % capacity).toInt()] = (send as Send).pollResult
Expand Down Expand Up @@ -242,7 +242,7 @@ internal class ArrayBroadcastChannel<E>(
// find a receiver for an element
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
if (receive is Closed<*>) break // noting more to do if this sub already closed
token = receive.tryResumeReceive(result as E, idempotent = null)
token = receive.tryResumeReceive(result as E, null)
if (token == null) continue // bail out here to next iteration (see for next receiver)
val subHead = this.subHead
this.subHead = subHead + 1 // retrieved element for this subscriber
Expand Down Expand Up @@ -296,7 +296,7 @@ internal class ArrayBroadcastChannel<E>(
result === POLL_FAILED -> { /* just bail out of lock */ }
else -> {
// let's try to select receiving this element from buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
result = ALREADY_SELECTED
} else {
// update subHead after retrieiving element from buffer
Expand Down
Loading