Skip to content

Commit d994b66

Browse files
committed
Fix SOE with select onSend/onReceive clauses on the same channel
* Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. Fixes #1411
1 parent a3763e8 commit d994b66

File tree

5 files changed

+120
-32
lines changed

5 files changed

+120
-32
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
6363
// offer atomically with select
64-
val offerOp = describeTryOffer(element)
64+
val offerOp = describeTryOffer(element, select)
6565
val failure = select.performAtomicTrySelect(offerOp)
6666
if (failure != null) return failure
6767
val receive = offerOp.result
@@ -322,15 +322,17 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
322322
/**
323323
* @suppress **This is unstable API and it is subject to change.**
324324
*/
325-
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
325+
protected fun describeTryOffer(element: E, select: SelectInstance<*>): TryOfferDesc<E> =
326+
TryOfferDesc(element, select, queue)
326327

327328
/**
328329
* @suppress **This is unstable API and it is subject to change.**
329330
*/
330331
protected class TryOfferDesc<E>(
331332
@JvmField val element: E,
333+
override val select: SelectInstance<*>,
332334
queue: LockFreeLinkedListHead
333-
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
335+
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue), SelectInstanceDesc {
334336
@JvmField var resumeToken: Any? = null
335337

336338
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
@@ -521,7 +523,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
521523
*/
522524
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
523525
// poll atomically with select
524-
val pollOp = describeTryPoll()
526+
val pollOp = describeTryPoll(select)
525527
val failure = select.performAtomicTrySelect(pollOp)
526528
if (failure != null) return failure
527529
val send = pollOp.result
@@ -650,12 +652,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
650652
/**
651653
* @suppress **This is unstable API and it is subject to change.**
652654
*/
653-
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
655+
protected fun describeTryPoll(select: SelectInstance<*>): TryPollDesc<E> =
656+
TryPollDesc(select, queue)
654657

655658
/**
656659
* @suppress **This is unstable API and it is subject to change.**
657660
*/
658-
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
661+
protected class TryPollDesc<E>(
662+
override val select: SelectInstance<*>,
663+
queue: LockFreeLinkedListHead
664+
) : RemoveFirstDesc<Send>(queue), SelectInstanceDesc {
659665
@JvmField var resumeToken: Any? = null
660666
@JvmField var pollResult: E? = null
661667

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ internal open class ArrayChannel<E>(
8888
// check for receivers that were waiting on empty queue
8989
if (size == 0) {
9090
loop@ while (true) {
91-
val offerOp = describeTryOffer(element)
91+
val offerOp = describeTryOffer(element, select)
9292
val failure = select.performAtomicTrySelect(offerOp)
9393
when {
9494
failure == null -> { // offered successfully
@@ -175,7 +175,7 @@ internal open class ArrayChannel<E>(
175175
var replacement: Any? = POLL_FAILED
176176
if (size == capacity) {
177177
loop@ while (true) {
178-
val pollOp = describeTryPoll()
178+
val pollOp = describeTryPoll(select)
179179
val failure = select.performAtomicTrySelect(pollOp)
180180
when {
181181
failure == null -> { // polled successfully

kotlinx-coroutines-core/common/src/selects/Select.kt

+63-21
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,19 @@ private val UNDECIDED: Any = Symbol("UNDECIDED")
193193
@SharedImmutable
194194
private val RESUMED: Any = Symbol("RESUMED")
195195

196+
/**
197+
* Descriptor that is a part of the atomic select operator.
198+
*/
199+
internal interface SelectInstanceDesc {
200+
val select: SelectInstance<*>
201+
}
202+
196203
@PublishedApi
197204
internal class SelectBuilderImpl<in R>(
198205
private val uCont: Continuation<R> // unintercepted delegate continuation
199206
) : LockFreeLinkedListHead(), SelectBuilder<R>,
200-
SelectInstance<R>, Continuation<R>, CoroutineStackFrame {
207+
SelectInstance<R>, Continuation<R>, CoroutineStackFrame
208+
{
201209
override val callerFrame: CoroutineStackFrame?
202210
get() = uCont as? CoroutineStackFrame
203211

@@ -325,7 +333,7 @@ internal class SelectBuilderImpl<in R>(
325333
while (true) { // lock-free loop on state
326334
val state = this.state
327335
if (state === this) {
328-
if (addLastIf(node, { this.state === this }))
336+
if (addLastIf(node) { this.state === this })
329337
return
330338
} else { // already selected
331339
handle.dispose()
@@ -344,13 +352,32 @@ internal class SelectBuilderImpl<in R>(
344352
// it is just like start(), but support idempotent start
345353
override fun trySelect(idempotent: Any?): Boolean {
346354
assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
347-
while (true) { // lock-free loop on state
348-
val state = this.state
355+
_state.loop { state -> // lock-free loop on state
349356
when {
350-
state === this -> {
351-
if (_state.compareAndSet(this, idempotent)) {
352-
doAfterSelect()
353-
return true
357+
// Found initial state (not selected yet) -- try to make it selected
358+
state === this -> if (_state.compareAndSet(this, idempotent)) {
359+
doAfterSelect()
360+
return true
361+
}
362+
// Found descriptor of another atomic operation
363+
state is OpDescriptor -> {
364+
// Did we found descriptor from the same select instance
365+
// while registering another clause on the same instance?
366+
if (state is AtomicSelectOp<*> && state.impl === this &&
367+
idempotent is SelectInstanceDesc && idempotent.select === this)
368+
{
369+
/*
370+
* We cannot do state.perform(this) here and "help" it since it is the same select and
371+
* we'll get StackOverflowError. See https://github.com/Kotlin/kotlinx.coroutines/issues/1411
372+
* "Properly" supporting this case is complicated in the current select architecture, since we
373+
* have to "pull up" body of two clauses that need to be activated and link them
374+
* together so that they are sequentially executed, followed by the continuation of
375+
* the code after the select expression. This requires massive implementation rewrite
376+
* (instead of immediately resuming clause bodies, they have to be stored somewhere first).
377+
*/
378+
error("Cannot use matching select clauses on the same object")
379+
} else {
380+
state.perform(this) // help it
354381
}
355382
}
356383
// otherwise -- already selected
@@ -361,10 +388,14 @@ internal class SelectBuilderImpl<in R>(
361388
}
362389
}
363390

364-
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
365-
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
391+
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
392+
AtomicSelectOp(this, desc, true).perform(null)
393+
394+
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? =
395+
AtomicSelectOp(this, desc, false).perform(null)
366396

367-
private inner class AtomicSelectOp(
397+
private class AtomicSelectOp<R>(
398+
@JvmField val impl: SelectBuilderImpl<R>,
368399
@JvmField val desc: AtomicDesc,
369400
@JvmField val select: Boolean
370401
) : AtomicOp<Any?>() {
@@ -375,34 +406,45 @@ internal class SelectBuilderImpl<in R>(
375406
// we are originator (affected reference is not null if helping)
376407
prepareIfNotSelected()?.let { return it }
377408
}
378-
return desc.prepare(this)
409+
try {
410+
return desc.prepare(this)
411+
} catch (e: Throwable) {
412+
// undo prepareIfNotSelected on crash (for example if IllegalStateException is thrown)
413+
if (affected == null) undoPrepare()
414+
throw e
415+
}
379416
}
380417

381418
override fun complete(affected: Any?, failure: Any?) {
382419
completeSelect(failure)
383420
desc.complete(this, failure)
384421
}
385422

386-
fun prepareIfNotSelected(): Any? {
387-
_state.loop { state ->
423+
private fun prepareIfNotSelected(): Any? {
424+
impl._state.loop { state ->
388425
when {
389-
state === this@AtomicSelectOp -> return null // already in progress
390-
state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
391-
state === this@SelectBuilderImpl -> {
392-
if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
426+
state === this -> return null // already in progress
427+
state is OpDescriptor -> state.perform(impl) // help
428+
state === impl -> {
429+
if (impl._state.compareAndSet(impl, this))
393430
return null // success
394431
}
395432
else -> return ALREADY_SELECTED
396433
}
397434
}
398435
}
399436

437+
// reverts the change done by prepareIfNotSelected
438+
private fun undoPrepare() {
439+
impl._state.compareAndSet(this, impl)
440+
}
441+
400442
private fun completeSelect(failure: Any?) {
401443
val selectSuccess = select && failure == null
402-
val update = if (selectSuccess) null else this@SelectBuilderImpl
403-
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
444+
val update = if (selectSuccess) null else impl
445+
if (impl._state.compareAndSet(this, update)) {
404446
if (selectSuccess)
405-
doAfterSelect()
447+
impl.doAfterSelect()
406448
}
407449
}
408450
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+34
Original file line numberDiff line numberDiff line change
@@ -409,4 +409,38 @@ class SelectRendezvousChannelTest : TestBase() {
409409
if (!trySelect(null)) return
410410
block.startCoroutineUnintercepted(this)
411411
}
412+
413+
@Test
414+
fun testSelectSendAndReceive() = runTest {
415+
val c = Channel<Int>()
416+
assertFailsWith<IllegalStateException> {
417+
select<Unit> {
418+
c.onSend(1) { expectUnreached() }
419+
c.onReceive { expectUnreached() }
420+
}
421+
}
422+
checkNotBroken(c)
423+
}
424+
425+
@Test
426+
fun testSelectReceiveAndSend() = runTest {
427+
val c = Channel<Int>()
428+
assertFailsWith<IllegalStateException> {
429+
select<Unit> {
430+
c.onReceive { expectUnreached() }
431+
c.onSend(1) { expectUnreached() }
432+
}
433+
}
434+
checkNotBroken(c)
435+
}
436+
437+
// makes sure the channel is not broken
438+
private suspend fun checkNotBroken(c: Channel<Int>) {
439+
coroutineScope {
440+
launch {
441+
c.send(42)
442+
}
443+
assertEquals(42, c.receive())
444+
}
445+
}
412446
}

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -466,9 +466,15 @@ public actual open class LockFreeLinkedListNode {
466466
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
467467
if (affected._next.compareAndSet(next, prepareOp)) {
468468
// prepared -- complete preparations
469-
val prepFail = prepareOp.perform(affected)
470-
if (prepFail === REMOVE_PREPARED) continue // retry
471-
return prepFail
469+
try {
470+
val prepFail = prepareOp.perform(affected)
471+
if (prepFail === REMOVE_PREPARED) continue // retry
472+
return prepFail
473+
} catch (e: Throwable) {
474+
// Crashed during preparation (for example IllegalStateExpception) -- undo & rethrow
475+
affected._next.compareAndSet(prepareOp, next)
476+
throw e
477+
}
472478
}
473479
}
474480
}

0 commit comments

Comments
 (0)