Skip to content

Commit c47a800

Browse files
committed
Fix SOE with select onSend/onReceive clauses on the same channel
* Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. Fixes #1411
1 parent a3763e8 commit c47a800

File tree

3 files changed

+86
-19
lines changed

3 files changed

+86
-19
lines changed

kotlinx-coroutines-core/common/src/selects/Select.kt

+43-16
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ internal class SelectBuilderImpl<in R>(
325325
while (true) { // lock-free loop on state
326326
val state = this.state
327327
if (state === this) {
328-
if (addLastIf(node, { this.state === this }))
328+
if (addLastIf(node) { this.state === this })
329329
return
330330
} else { // already selected
331331
handle.dispose()
@@ -344,9 +344,21 @@ internal class SelectBuilderImpl<in R>(
344344
// it is just like start(), but support idempotent start
345345
override fun trySelect(idempotent: Any?): Boolean {
346346
assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
347-
while (true) { // lock-free loop on state
348-
val state = this.state
347+
_state.loop { state -> // lock-free loop on state
349348
when {
349+
state is AtomicSelectOp<*> && state.impl === this -> {
350+
/*
351+
* We cannot do state.perform(this) here and "help" it since it is the same select and
352+
* we'll get StackOverflowError. See https://github.com/Kotlin/kotlinx.coroutines/issues/1411
353+
* "Properly" supporting this case is complicated in the current select architecture, since we
354+
* have to "pull up" body of two clauses that need to be activated and link them
355+
* together so that they are sequentially executed, followed by the continuation of
356+
* the code after the select expression. This requires massive implementation rewrite
357+
* (instead of immediately resuming clause bodies, they have to be stored somewhere first).
358+
*/
359+
error("Cannot use different select clauses on the same object")
360+
}
361+
state is OpDescriptor -> state.perform(this)
350362
state === this -> {
351363
if (_state.compareAndSet(this, idempotent)) {
352364
doAfterSelect()
@@ -361,10 +373,14 @@ internal class SelectBuilderImpl<in R>(
361373
}
362374
}
363375

364-
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
365-
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
376+
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
377+
AtomicSelectOp(this, desc, true).perform(null)
378+
379+
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? =
380+
AtomicSelectOp(this, desc, false).perform(null)
366381

367-
private inner class AtomicSelectOp(
382+
private class AtomicSelectOp<R>(
383+
@JvmField val impl: SelectBuilderImpl<R>,
368384
@JvmField val desc: AtomicDesc,
369385
@JvmField val select: Boolean
370386
) : AtomicOp<Any?>() {
@@ -375,34 +391,45 @@ internal class SelectBuilderImpl<in R>(
375391
// we are originator (affected reference is not null if helping)
376392
prepareIfNotSelected()?.let { return it }
377393
}
378-
return desc.prepare(this)
394+
try {
395+
return desc.prepare(this)
396+
} catch (e: Throwable) {
397+
// undo prepareIfNotSelected on crash (for example if IllegalStateException is thrown)
398+
if (affected == null) undoPrepare()
399+
throw e
400+
}
379401
}
380402

381403
override fun complete(affected: Any?, failure: Any?) {
382404
completeSelect(failure)
383405
desc.complete(this, failure)
384406
}
385407

386-
fun prepareIfNotSelected(): Any? {
387-
_state.loop { state ->
408+
private fun prepareIfNotSelected(): Any? {
409+
impl._state.loop { state ->
388410
when {
389-
state === this@AtomicSelectOp -> return null // already in progress
390-
state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
391-
state === this@SelectBuilderImpl -> {
392-
if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
411+
state === this -> return null // already in progress
412+
state is OpDescriptor -> state.perform(impl) // help
413+
state === impl -> {
414+
if (impl._state.compareAndSet(impl, this))
393415
return null // success
394416
}
395417
else -> return ALREADY_SELECTED
396418
}
397419
}
398420
}
399421

422+
// reverts the change done by prepareIfNotSelected
423+
private fun undoPrepare() {
424+
impl._state.compareAndSet(this, impl)
425+
}
426+
400427
private fun completeSelect(failure: Any?) {
401428
val selectSuccess = select && failure == null
402-
val update = if (selectSuccess) null else this@SelectBuilderImpl
403-
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
429+
val update = if (selectSuccess) null else impl
430+
if (impl._state.compareAndSet(this, update)) {
404431
if (selectSuccess)
405-
doAfterSelect()
432+
impl.doAfterSelect()
406433
}
407434
}
408435
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+34
Original file line numberDiff line numberDiff line change
@@ -409,4 +409,38 @@ class SelectRendezvousChannelTest : TestBase() {
409409
if (!trySelect(null)) return
410410
block.startCoroutineUnintercepted(this)
411411
}
412+
413+
@Test
414+
fun testSelectSendAndReceive() = runTest {
415+
val c = Channel<Int>()
416+
assertFailsWith<IllegalStateException> {
417+
select<Unit> {
418+
c.onSend(1) { expectUnreached() }
419+
c.onReceive { expectUnreached() }
420+
}
421+
}
422+
checkNotBroken(c)
423+
}
424+
425+
@Test
426+
fun testSelectReceiveAndSend() = runTest {
427+
val c = Channel<Int>()
428+
assertFailsWith<IllegalStateException> {
429+
select<Unit> {
430+
c.onReceive { expectUnreached() }
431+
c.onSend(1) { expectUnreached() }
432+
}
433+
}
434+
checkNotBroken(c)
435+
}
436+
437+
// makes sure the channel is not broken
438+
private suspend fun checkNotBroken(c: Channel<Int>) {
439+
coroutineScope {
440+
launch {
441+
c.send(42)
442+
}
443+
assertEquals(42, c.receive())
444+
}
445+
}
412446
}

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -466,9 +466,15 @@ public actual open class LockFreeLinkedListNode {
466466
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
467467
if (affected._next.compareAndSet(next, prepareOp)) {
468468
// prepared -- complete preparations
469-
val prepFail = prepareOp.perform(affected)
470-
if (prepFail === REMOVE_PREPARED) continue // retry
471-
return prepFail
469+
try {
470+
val prepFail = prepareOp.perform(affected)
471+
if (prepFail === REMOVE_PREPARED) continue // retry
472+
return prepFail
473+
} catch (e: Throwable) {
474+
// Crashed during preparation (for example IllegalStateExpception) -- undo & rethrow
475+
affected._next.compareAndSet(prepareOp, next)
476+
throw e
477+
}
472478
}
473479
}
474480
}

0 commit comments

Comments
 (0)