Skip to content

Commit 5f7e77a

Browse files
committed
Multithreaded support in select expressions (WIP)
Fixes #1764
1 parent 08a314c commit 5f7e77a

File tree

13 files changed

+248
-68
lines changed

13 files changed

+248
-68
lines changed

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
138138
*/
139139
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
140140
initParentJob()
141-
startCoroutine(start, this, receiver, block)
141+
startCoroutine(start, receiver, this, block)
142142
}
143143
}

kotlinx-coroutines-core/common/src/Builders.common.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174174

175175
internal fun <T, R> startCoroutineImpl(
176176
start: CoroutineStart,
177-
coroutine: AbstractCoroutine<T>,
178177
receiver: R,
178+
completion: Continuation<T>,
179179
block: suspend R.() -> T
180180
) = when (start) {
181-
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
182-
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
183-
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
181+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
182+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
183+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
184184
CoroutineStart.LAZY -> Unit // will start lazily
185185
}
186186

@@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
189189
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
190190
internal expect fun <T, R> startCoroutine(
191191
start: CoroutineStart,
192-
coroutine: AbstractCoroutine<T>,
193192
receiver: R,
193+
completion: Continuation<T>,
194194
block: suspend R.() -> T
195195
)
196196

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+58-28
Original file line numberDiff line numberDiff line change
@@ -448,16 +448,18 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
448448
override val pollResult: Any?,
449449
@JvmField val channel: AbstractSendChannel<E>,
450450
@JvmField val select: SelectInstance<R>,
451-
@JvmField val block: suspend (SendChannel<E>) -> R
451+
block: suspend (SendChannel<E>) -> R
452452
) : Send(), DisposableHandle {
453+
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()
454+
453455
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
454-
select.trySelectOther(otherOp) as Symbol? // must return symbol
456+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol
455457

456-
override fun completeResumeSend() {
457-
block.startCoroutine(receiver = channel, completion = select.completion)
458-
}
458+
override fun completeResumeSend() =
459+
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)
459460

460461
override fun dispose() { // invoked on select completion
462+
block.shareableDispose(useIt = false)
461463
remove()
462464
}
463465

@@ -871,41 +873,53 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
871873
}
872874

873875
private class ReceiveElement<in E>(
874-
@JvmField val cont: CancellableContinuation<Any?>,
876+
cont: CancellableContinuation<Any?>,
875877
@JvmField val receiveMode: Int
876878
) : Receive<E>() {
879+
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
880+
private fun useCont() = _cont.getAndSet(null)
881+
877882
fun resumeValue(value: E): Any? = when (receiveMode) {
878883
RECEIVE_RESULT -> ValueOrClosed.value(value)
879884
else -> value
880885
}
881886

882887
@Suppress("IMPLICIT_CAST_TO_ANY")
883888
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
884-
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
889+
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: run {
890+
_cont.value = null
891+
return null
892+
}
885893
assert { token === RESUME_TOKEN } // the only other possible result
886894
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
887895
otherOp?.finishPrepare()
888896
return RESUME_TOKEN
889897
}
890898

891-
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
899+
override fun completeResumeReceive(value: E) { useCont()?.completeResume(RESUME_TOKEN) }
892900

893901
override fun resumeReceiveClosed(closed: Closed<*>) {
894902
when {
895-
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
896-
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
897-
else -> cont.resumeWithException(closed.receiveException)
903+
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> useCont()?.resume(null)
904+
receiveMode == RECEIVE_RESULT -> useCont()?.resume(closed.toResult<Any>())
905+
else -> useCont()?.resumeWithException(closed.receiveException)
898906
}
899907
}
900908
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
901909
}
902910

903911
private class ReceiveHasNext<E>(
904912
@JvmField val iterator: Itr<E>,
905-
@JvmField val cont: CancellableContinuation<Boolean>
913+
cont: CancellableContinuation<Boolean>
906914
) : Receive<E>() {
915+
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
916+
private fun useCont() = _cont.getAndSet(null)
917+
907918
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
908-
val token = cont.tryResume(true, otherOp?.desc) ?: return null
919+
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: run{
920+
_cont.value = null
921+
return null
922+
}
909923
assert { token === RESUME_TOKEN } // the only other possible result
910924
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
911925
otherOp?.finishPrepare()
@@ -918,51 +932,61 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
918932
but completeResumeReceive is called once so we set iterator result here.
919933
*/
920934
iterator.result = value
921-
cont.completeResume(RESUME_TOKEN)
935+
useCont()?.completeResume(RESUME_TOKEN)
922936
}
923937

924938
override fun resumeReceiveClosed(closed: Closed<*>) {
925939
val token = if (closed.closeCause == null) {
926-
cont.tryResume(false)
940+
_cont.value?.tryResume(false)
927941
} else {
928-
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
942+
_cont.value?.let { cont ->
943+
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
944+
}
929945
}
930946
if (token != null) {
931947
iterator.result = closed
932-
cont.completeResume(token)
948+
_cont.value?.completeResume(token)
933949
}
950+
_cont.value = null
934951
}
935952
override fun toString(): String = "ReceiveHasNext@$hexAddress"
936953
}
937954

938955
private class ReceiveSelect<R, E>(
939956
@JvmField val channel: AbstractChannel<E>,
940957
@JvmField val select: SelectInstance<R>,
941-
@JvmField val block: suspend (Any?) -> R,
958+
block: suspend (Any?) -> R,
942959
@JvmField val receiveMode: Int
943960
) : Receive<E>(), DisposableHandle {
961+
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening
962+
944963
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
945-
select.trySelectOther(otherOp) as Symbol?
964+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
946965

947966
@Suppress("UNCHECKED_CAST")
948967
override fun completeResumeReceive(value: E) {
949-
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
968+
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
950969
}
951970

952971
override fun resumeReceiveClosed(closed: Closed<*>) {
953-
if (!select.trySelect()) return
972+
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
954973
when (receiveMode) {
955-
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
956-
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
974+
RECEIVE_THROWS_ON_CLOSE -> {
975+
block.shareableDispose(useIt = true)
976+
select.resumeSelectWithException(closed.receiveException)
977+
}
978+
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
957979
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
958-
block.startCoroutine(null, select.completion)
980+
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
959981
} else {
982+
block.shareableDispose(useIt = true)
960983
select.resumeSelectWithException(closed.receiveException)
961984
}
962985
}
963986
}
964987

965988
override fun dispose() { // invoked on select completion
989+
block.shareableDispose(useIt = false)
966990
if (remove())
967991
channel.onReceiveDequeued() // notify cancellation of receive
968992
}
@@ -1031,17 +1055,23 @@ internal interface ReceiveOrClosed<in E> {
10311055
@Suppress("UNCHECKED_CAST")
10321056
internal class SendElement(
10331057
override val pollResult: Any?,
1034-
@JvmField val cont: CancellableContinuation<Unit>
1058+
cont: CancellableContinuation<Unit>
10351059
) : Send() {
1060+
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
1061+
private fun useCont() = _cont.getAndSet(null)
1062+
10361063
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1037-
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1064+
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: run {
1065+
_cont.value = null
1066+
return null
1067+
}
10381068
assert { token === RESUME_TOKEN } // the only other possible result
10391069
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
10401070
otherOp?.finishPrepare() // finish preparations
10411071
return RESUME_TOKEN
10421072
}
1043-
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1044-
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1073+
override fun completeResumeSend() { useCont()?.completeResume(RESUME_TOKEN) }
1074+
override fun resumeSendClosed(closed: Closed<*>) { useCont()?.resumeWithException(closed.sendException) }
10451075
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
10461076
}
10471077

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
1717
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
20+
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21+
internal expect fun <T> Continuation<T>.shareableDispose()
2022
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2123
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
24+
25+
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26+
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27+
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
28+
2229
internal expect fun isReuseSupportedInPlatform(): Boolean
2330
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
2431
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)

kotlinx-coroutines-core/common/src/selects/Select.kt

+39-21
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public interface SelectInstance<in R> {
116116
/**
117117
* Tries to select this instance. Returns `true` on success.
118118
*/
119-
public fun trySelect(): Boolean
119+
public fun trySelect(onSelect: () -> Unit = {}): Boolean
120120

121121
/**
122122
* Tries to select this instance. Returns:
@@ -130,7 +130,7 @@ public interface SelectInstance<in R> {
130130
* member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface`
131131
* (see KT-222860) we can declare this method as internal.
132132
*/
133-
public fun trySelectOther(otherOp: PrepareOp?): Any?
133+
public fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any?
134134

135135
/**
136136
* Performs action atomically with [trySelect].
@@ -233,10 +233,12 @@ private val selectOpSequenceNumber = SeqNumber()
233233

234234
@PublishedApi
235235
internal class SelectBuilderImpl<in R>(
236-
private val uCont: Continuation<R> // unintercepted delegate continuation
236+
uCont: Continuation<R>
237237
) : LockFreeLinkedListHead(), SelectBuilder<R>,
238238
SelectInstance<R>, Continuation<R>, CoroutineStackFrame
239239
{
240+
private val uCont: Continuation<R> = uCont.asShareable() // unintercepted delegate continuation, shareable
241+
240242
override val callerFrame: CoroutineStackFrame?
241243
get() = uCont as? CoroutineStackFrame
242244

@@ -280,7 +282,10 @@ internal class SelectBuilderImpl<in R>(
280282
when {
281283
result === UNDECIDED -> {
282284
val update = value()
283-
if (_result.compareAndSet(UNDECIDED, update)) return
285+
if (_result.compareAndSet(UNDECIDED, update)) {
286+
uCont.shareableDispose() // will return result without calling continuation
287+
return
288+
}
284289
}
285290
result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
286291
block()
@@ -305,7 +310,7 @@ internal class SelectBuilderImpl<in R>(
305310
// Resumes in dispatched way so that it can be called from an arbitrary context
306311
override fun resumeSelectWithException(exception: Throwable) {
307312
doResume({ CompletedExceptionally(recoverStackTrace(exception, uCont)) }) {
308-
uCont.intercepted().resumeWith(Result.failure(exception))
313+
uCont.shareableInterceptedResumeWith(Result.failure(exception))
309314
}
310315
}
311316

@@ -346,16 +351,19 @@ internal class SelectBuilderImpl<in R>(
346351
internal fun handleBuilderException(e: Throwable) {
347352
if (trySelect()) {
348353
resumeWithException(e)
349-
} else if (e !is CancellationException) {
350-
/*
351-
* Cannot handle this exception -- builder was already resumed with a different exception,
352-
* so treat it as "unhandled exception". But only if it is not the completion reason
353-
* and it's not the cancellation. Otherwise, in the face of structured concurrency
354-
* the same exception will be reported to the global exception handler.
355-
*/
356-
val result = getResult()
357-
if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
358-
handleCoroutineException(context, e)
354+
} else {
355+
disposeLockFreeLinkedList { this }
356+
if (e !is CancellationException) {
357+
/*
358+
* Cannot handle this exception -- builder was already resumed with a different exception,
359+
* so treat it as "unhandled exception". But only if it is not the completion reason
360+
* and it's not the cancellation. Otherwise, in the face of structured concurrency
361+
* the same exception will be reported to the global exception handler.
362+
*/
363+
val result = getResult()
364+
if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
365+
handleCoroutineException(context, e)
366+
}
359367
}
360368
}
361369
}
@@ -381,14 +389,16 @@ internal class SelectBuilderImpl<in R>(
381389
}
382390

383391
private fun doAfterSelect() {
392+
val parentHandle = _parentHandle.getAndSet(null)
384393
parentHandle?.dispose()
385394
forEach<DisposeNode> {
386-
it.handle.dispose()
395+
it.dispose()
387396
}
397+
disposeLockFreeLinkedList { this }
388398
}
389399

390-
override fun trySelect(): Boolean {
391-
val result = trySelectOther(null)
400+
override fun trySelect(onSelect: () -> Unit): Boolean {
401+
val result = trySelectOther(null, onSelect)
392402
return when {
393403
result === RESUME_TOKEN -> true
394404
result == null -> false
@@ -481,7 +491,7 @@ internal class SelectBuilderImpl<in R>(
481491

482492
// it is just like plain trySelect, but support idempotent start
483493
// Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
484-
override fun trySelectOther(otherOp: PrepareOp?): Any? {
494+
override fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any? {
485495
_state.loop { state -> // lock-free loop on state
486496
when {
487497
// Found initial state (not selected yet) -- try to make it selected
@@ -496,6 +506,7 @@ internal class SelectBuilderImpl<in R>(
496506
val decision = pairSelectOp.perform(this)
497507
if (decision !== null) return decision
498508
}
509+
onSelect()
499510
doAfterSelect()
500511
return RESUME_TOKEN
501512
}
@@ -653,6 +664,13 @@ internal class SelectBuilderImpl<in R>(
653664
}
654665

655666
private class DisposeNode(
656-
@JvmField val handle: DisposableHandle
657-
) : LockFreeLinkedListNode()
667+
handle: DisposableHandle
668+
) : LockFreeLinkedListNode() {
669+
private val _handle = atomic<DisposableHandle?>(handle)
670+
671+
fun dispose() {
672+
val handle = _handle.getAndSet(null)
673+
handle?.dispose()
674+
}
675+
}
658676
}

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
372372
override fun tryResumeLockWaiter(): Any? = if (select.trySelect()) SELECT_SUCCESS else null
373373
override fun completeResumeLockWaiter(token: Any) {
374374
assert { token === SELECT_SUCCESS }
375-
block.startCoroutine(receiver = mutex, completion = select.completion)
375+
startCoroutine(CoroutineStart.ATOMIC, mutex, select.completion, block)
376376
}
377377
override fun toString(): String = "LockSelect[$owner, $mutex, $select]"
378378
}

0 commit comments

Comments
 (0)