Skip to content

Commit 082dbe7

Browse files
committed
Fix SOE with select onSend/onReceive clauses on the same channel
* Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. Fixes #1411
1 parent a3763e8 commit 082dbe7

File tree

5 files changed

+115
-27
lines changed

5 files changed

+115
-27
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
6161
*/
6262
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
6363
// offer atomically with select
64-
val offerOp = describeTryOffer(element)
64+
val offerOp = describeTryOffer(element, select)
6565
val failure = select.performAtomicTrySelect(offerOp)
6666
if (failure != null) return failure
6767
val receive = offerOp.result
@@ -322,15 +322,17 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
322322
/**
323323
* @suppress **This is unstable API and it is subject to change.**
324324
*/
325-
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
325+
protected fun describeTryOffer(element: E, select: SelectInstance<*>): TryOfferDesc<E> =
326+
TryOfferDesc(element, select, queue)
326327

327328
/**
328329
* @suppress **This is unstable API and it is subject to change.**
329330
*/
330331
protected class TryOfferDesc<E>(
331332
@JvmField val element: E,
333+
override val select: SelectInstance<*>,
332334
queue: LockFreeLinkedListHead
333-
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
335+
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue), SelectInstanceDesc {
334336
@JvmField var resumeToken: Any? = null
335337

336338
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
@@ -521,7 +523,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
521523
*/
522524
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
523525
// poll atomically with select
524-
val pollOp = describeTryPoll()
526+
val pollOp = describeTryPoll(select)
525527
val failure = select.performAtomicTrySelect(pollOp)
526528
if (failure != null) return failure
527529
val send = pollOp.result
@@ -650,12 +652,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
650652
/**
651653
* @suppress **This is unstable API and it is subject to change.**
652654
*/
653-
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
655+
protected fun describeTryPoll(select: SelectInstance<*>): TryPollDesc<E> =
656+
TryPollDesc(select, queue)
654657

655658
/**
656659
* @suppress **This is unstable API and it is subject to change.**
657660
*/
658-
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
661+
protected class TryPollDesc<E>(
662+
override val select: SelectInstance<*>,
663+
queue: LockFreeLinkedListHead
664+
) : RemoveFirstDesc<Send>(queue), SelectInstanceDesc {
659665
@JvmField var resumeToken: Any? = null
660666
@JvmField var pollResult: E? = null
661667

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ internal open class ArrayChannel<E>(
8888
// check for receivers that were waiting on empty queue
8989
if (size == 0) {
9090
loop@ while (true) {
91-
val offerOp = describeTryOffer(element)
91+
val offerOp = describeTryOffer(element, select)
9292
val failure = select.performAtomicTrySelect(offerOp)
9393
when {
9494
failure == null -> { // offered successfully
@@ -175,7 +175,7 @@ internal open class ArrayChannel<E>(
175175
var replacement: Any? = POLL_FAILED
176176
if (size == capacity) {
177177
loop@ while (true) {
178-
val pollOp = describeTryPoll()
178+
val pollOp = describeTryPoll(select)
179179
val failure = select.performAtomicTrySelect(pollOp)
180180
when {
181181
failure == null -> { // polled successfully

kotlinx-coroutines-core/common/src/selects/Select.kt

+58-16
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ public interface SelectInstance<in R> {
133133
public fun disposeOnSelect(handle: DisposableHandle)
134134
}
135135

136+
/**
137+
* Descriptor that is a part of the atomic select operator.
138+
*
139+
* @suppress **This is unstable API and it is subject to change.**
140+
*/
141+
@InternalCoroutinesApi
142+
public interface SelectInstanceDesc {
143+
val select: SelectInstance<*>
144+
}
145+
136146
/**
137147
* Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
138148
* in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
@@ -325,7 +335,7 @@ internal class SelectBuilderImpl<in R>(
325335
while (true) { // lock-free loop on state
326336
val state = this.state
327337
if (state === this) {
328-
if (addLastIf(node, { this.state === this }))
338+
if (addLastIf(node) { this.state === this })
329339
return
330340
} else { // already selected
331341
handle.dispose()
@@ -344,9 +354,26 @@ internal class SelectBuilderImpl<in R>(
344354
// it is just like start(), but support idempotent start
345355
override fun trySelect(idempotent: Any?): Boolean {
346356
assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
347-
while (true) { // lock-free loop on state
348-
val state = this.state
357+
_state.loop { state -> // lock-free loop on state
349358
when {
359+
// Found descriptor from the same select instance while registering another clause on the same instance
360+
state is AtomicSelectOp<*> && state.impl === this &&
361+
idempotent is SelectInstanceDesc && idempotent.select === this ->
362+
{
363+
/*
364+
* We cannot do state.perform(this) here and "help" it since it is the same select and
365+
* we'll get StackOverflowError. See https://github.com/Kotlin/kotlinx.coroutines/issues/1411
366+
* "Properly" supporting this case is complicated in the current select architecture, since we
367+
* have to "pull up" body of two clauses that need to be activated and link them
368+
* together so that they are sequentially executed, followed by the continuation of
369+
* the code after the select expression. This requires massive implementation rewrite
370+
* (instead of immediately resuming clause bodies, they have to be stored somewhere first).
371+
*/
372+
error("Cannot use matching select clauses on the same object")
373+
}
374+
// Found descriptor of any other atomic operation -- help it
375+
state is OpDescriptor -> state.perform(this)
376+
// Found initial state (not selected yet)
350377
state === this -> {
351378
if (_state.compareAndSet(this, idempotent)) {
352379
doAfterSelect()
@@ -361,10 +388,14 @@ internal class SelectBuilderImpl<in R>(
361388
}
362389
}
363390

364-
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
365-
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
391+
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
392+
AtomicSelectOp(this, desc, true).perform(null)
393+
394+
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? =
395+
AtomicSelectOp(this, desc, false).perform(null)
366396

367-
private inner class AtomicSelectOp(
397+
private class AtomicSelectOp<R>(
398+
@JvmField val impl: SelectBuilderImpl<R>,
368399
@JvmField val desc: AtomicDesc,
369400
@JvmField val select: Boolean
370401
) : AtomicOp<Any?>() {
@@ -375,34 +406,45 @@ internal class SelectBuilderImpl<in R>(
375406
// we are originator (affected reference is not null if helping)
376407
prepareIfNotSelected()?.let { return it }
377408
}
378-
return desc.prepare(this)
409+
try {
410+
return desc.prepare(this)
411+
} catch (e: Throwable) {
412+
// undo prepareIfNotSelected on crash (for example if IllegalStateException is thrown)
413+
if (affected == null) undoPrepare()
414+
throw e
415+
}
379416
}
380417

381418
override fun complete(affected: Any?, failure: Any?) {
382419
completeSelect(failure)
383420
desc.complete(this, failure)
384421
}
385422

386-
fun prepareIfNotSelected(): Any? {
387-
_state.loop { state ->
423+
private fun prepareIfNotSelected(): Any? {
424+
impl._state.loop { state ->
388425
when {
389-
state === this@AtomicSelectOp -> return null // already in progress
390-
state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
391-
state === this@SelectBuilderImpl -> {
392-
if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
426+
state === this -> return null // already in progress
427+
state is OpDescriptor -> state.perform(impl) // help
428+
state === impl -> {
429+
if (impl._state.compareAndSet(impl, this))
393430
return null // success
394431
}
395432
else -> return ALREADY_SELECTED
396433
}
397434
}
398435
}
399436

437+
// reverts the change done by prepareIfNotSelected
438+
private fun undoPrepare() {
439+
impl._state.compareAndSet(this, impl)
440+
}
441+
400442
private fun completeSelect(failure: Any?) {
401443
val selectSuccess = select && failure == null
402-
val update = if (selectSuccess) null else this@SelectBuilderImpl
403-
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
444+
val update = if (selectSuccess) null else impl
445+
if (impl._state.compareAndSet(this, update)) {
404446
if (selectSuccess)
405-
doAfterSelect()
447+
impl.doAfterSelect()
406448
}
407449
}
408450
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+34
Original file line numberDiff line numberDiff line change
@@ -409,4 +409,38 @@ class SelectRendezvousChannelTest : TestBase() {
409409
if (!trySelect(null)) return
410410
block.startCoroutineUnintercepted(this)
411411
}
412+
413+
@Test
414+
fun testSelectSendAndReceive() = runTest {
415+
val c = Channel<Int>()
416+
assertFailsWith<IllegalStateException> {
417+
select<Unit> {
418+
c.onSend(1) { expectUnreached() }
419+
c.onReceive { expectUnreached() }
420+
}
421+
}
422+
checkNotBroken(c)
423+
}
424+
425+
@Test
426+
fun testSelectReceiveAndSend() = runTest {
427+
val c = Channel<Int>()
428+
assertFailsWith<IllegalStateException> {
429+
select<Unit> {
430+
c.onReceive { expectUnreached() }
431+
c.onSend(1) { expectUnreached() }
432+
}
433+
}
434+
checkNotBroken(c)
435+
}
436+
437+
// makes sure the channel is not broken
438+
private suspend fun checkNotBroken(c: Channel<Int>) {
439+
coroutineScope {
440+
launch {
441+
c.send(42)
442+
}
443+
assertEquals(42, c.receive())
444+
}
445+
}
412446
}

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -466,9 +466,15 @@ public actual open class LockFreeLinkedListNode {
466466
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
467467
if (affected._next.compareAndSet(next, prepareOp)) {
468468
// prepared -- complete preparations
469-
val prepFail = prepareOp.perform(affected)
470-
if (prepFail === REMOVE_PREPARED) continue // retry
471-
return prepFail
469+
try {
470+
val prepFail = prepareOp.perform(affected)
471+
if (prepFail === REMOVE_PREPARED) continue // retry
472+
return prepFail
473+
} catch (e: Throwable) {
474+
// Crashed during preparation (for example IllegalStateExpception) -- undo & rethrow
475+
affected._next.compareAndSet(prepareOp, next)
476+
throw e
477+
}
472478
}
473479
}
474480
}

0 commit comments

Comments
 (0)