Skip to content

Commit 4e9a346

Browse files
elizarovh0tk3y
authored andcommitted
Multithreaded support in select expression
Fixes #1764
1 parent 34c15b0 commit 4e9a346

File tree

15 files changed

+269
-75
lines changed

15 files changed

+269
-75
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10731073
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
10741074
public fun resumeWith (Ljava/lang/Object;)V
10751075
public fun toString ()Ljava/lang/String;
1076-
public fun trySelect ()Z
1077-
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1076+
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1077+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10781078
}
10791079

10801080
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1095,8 +1095,12 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10951095
public abstract fun isSelected ()Z
10961096
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10971097
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
1098-
public abstract fun trySelect ()Z
1099-
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1098+
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1099+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1100+
}
1101+
1102+
public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
1103+
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
11001104
}
11011105

11021106
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
138138
*/
139139
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
140140
initParentJob()
141-
startCoroutine(start, this, receiver, block)
141+
startCoroutine(start, receiver, this, block)
142142
}
143143
}

kotlinx-coroutines-core/common/src/Builders.common.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174174

175175
internal fun <T, R> startCoroutineImpl(
176176
start: CoroutineStart,
177-
coroutine: AbstractCoroutine<T>,
178177
receiver: R,
178+
completion: Continuation<T>,
179179
block: suspend R.() -> T
180180
) = when (start) {
181-
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
182-
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
183-
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
181+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
182+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
183+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
184184
CoroutineStart.LAZY -> Unit // will start lazily
185185
}
186186

@@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
189189
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
190190
internal expect fun <T, R> startCoroutine(
191191
start: CoroutineStart,
192-
coroutine: AbstractCoroutine<T>,
193192
receiver: R,
193+
completion: Continuation<T>,
194194
block: suspend R.() -> T
195195
)
196196

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+67-30
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382382
select.disposeOnSelect(node)
383383
return
384384
}
385-
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
385+
enqueueResult is Closed<*> -> {
386+
node.dispose()
387+
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
388+
}
386389
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
387390
enqueueResult is Receive<*> -> {} // try to offer
388391
else -> error("enqueueSend returned $enqueueResult ")
@@ -448,16 +451,18 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
448451
override val pollResult: Any?,
449452
@JvmField val channel: AbstractSendChannel<E>,
450453
@JvmField val select: SelectInstance<R>,
451-
@JvmField val block: suspend (SendChannel<E>) -> R
454+
block: suspend (SendChannel<E>) -> R
452455
) : Send(), DisposableHandle {
456+
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()
457+
453458
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
454-
select.trySelectOther(otherOp) as Symbol? // must return symbol
459+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol
455460

456-
override fun completeResumeSend() {
457-
block.startCoroutine(receiver = channel, completion = select.completion)
458-
}
461+
override fun completeResumeSend() =
462+
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)
459463

460464
override fun dispose() { // invoked on select completion
465+
block.shareableDispose(useIt = false)
461466
remove()
462467
}
463468

@@ -773,7 +778,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
773778
): Boolean {
774779
val node = ReceiveSelect(this, select, block, receiveMode)
775780
val result = enqueueReceive(node)
776-
if (result) select.disposeOnSelect(node)
781+
if (result) {
782+
select.disposeOnSelect(node)
783+
} else {
784+
node.dispose()
785+
}
777786
return result
778787
}
779788

@@ -871,41 +880,53 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
871880
}
872881

873882
private class ReceiveElement<in E>(
874-
@JvmField val cont: CancellableContinuation<Any?>,
883+
cont: CancellableContinuation<Any?>,
875884
@JvmField val receiveMode: Int
876885
) : Receive<E>() {
886+
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
887+
private fun useCont() = _cont.getAndSet(null)
888+
877889
fun resumeValue(value: E): Any? = when (receiveMode) {
878890
RECEIVE_RESULT -> ValueOrClosed.value(value)
879891
else -> value
880892
}
881893

882894
@Suppress("IMPLICIT_CAST_TO_ANY")
883895
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
884-
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
896+
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: run {
897+
_cont.value = null
898+
return null
899+
}
885900
assert { token === RESUME_TOKEN } // the only other possible result
886901
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
887902
otherOp?.finishPrepare()
888903
return RESUME_TOKEN
889904
}
890905

891-
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
906+
override fun completeResumeReceive(value: E) { useCont()?.completeResume(RESUME_TOKEN) }
892907

893908
override fun resumeReceiveClosed(closed: Closed<*>) {
894909
when {
895-
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
896-
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
897-
else -> cont.resumeWithException(closed.receiveException)
910+
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> useCont()?.resume(null)
911+
receiveMode == RECEIVE_RESULT -> useCont()?.resume(closed.toResult<Any>())
912+
else -> useCont()?.resumeWithException(closed.receiveException)
898913
}
899914
}
900915
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
901916
}
902917

903918
private class ReceiveHasNext<E>(
904919
@JvmField val iterator: Itr<E>,
905-
@JvmField val cont: CancellableContinuation<Boolean>
920+
cont: CancellableContinuation<Boolean>
906921
) : Receive<E>() {
922+
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
923+
private fun useCont() = _cont.getAndSet(null)
924+
907925
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
908-
val token = cont.tryResume(true, otherOp?.desc) ?: return null
926+
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: run{
927+
_cont.value = null
928+
return null
929+
}
909930
assert { token === RESUME_TOKEN } // the only other possible result
910931
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
911932
otherOp?.finishPrepare()
@@ -918,51 +939,61 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
918939
but completeResumeReceive is called once so we set iterator result here.
919940
*/
920941
iterator.result = value
921-
cont.completeResume(RESUME_TOKEN)
942+
useCont()?.completeResume(RESUME_TOKEN)
922943
}
923944

924945
override fun resumeReceiveClosed(closed: Closed<*>) {
925946
val token = if (closed.closeCause == null) {
926-
cont.tryResume(false)
947+
_cont.value?.tryResume(false)
927948
} else {
928-
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
949+
_cont.value?.let { cont ->
950+
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
951+
}
929952
}
930953
if (token != null) {
931954
iterator.result = closed
932-
cont.completeResume(token)
955+
_cont.value?.completeResume(token)
933956
}
957+
_cont.value = null
934958
}
935959
override fun toString(): String = "ReceiveHasNext@$hexAddress"
936960
}
937961

938962
private class ReceiveSelect<R, E>(
939963
@JvmField val channel: AbstractChannel<E>,
940964
@JvmField val select: SelectInstance<R>,
941-
@JvmField val block: suspend (Any?) -> R,
965+
block: suspend (Any?) -> R,
942966
@JvmField val receiveMode: Int
943967
) : Receive<E>(), DisposableHandle {
968+
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening
969+
944970
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
945-
select.trySelectOther(otherOp) as Symbol?
971+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
946972

947973
@Suppress("UNCHECKED_CAST")
948974
override fun completeResumeReceive(value: E) {
949-
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
975+
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
950976
}
951977

952978
override fun resumeReceiveClosed(closed: Closed<*>) {
953-
if (!select.trySelect()) return
979+
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
954980
when (receiveMode) {
955-
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
956-
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
981+
RECEIVE_THROWS_ON_CLOSE -> {
982+
block.shareableDispose(useIt = true)
983+
select.resumeSelectWithException(closed.receiveException)
984+
}
985+
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
957986
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
958-
block.startCoroutine(null, select.completion)
987+
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
959988
} else {
989+
block.shareableDispose(useIt = true)
960990
select.resumeSelectWithException(closed.receiveException)
961991
}
962992
}
963993
}
964994

965995
override fun dispose() { // invoked on select completion
996+
block.shareableDispose(useIt = false)
966997
if (remove())
967998
channel.onReceiveDequeued() // notify cancellation of receive
968999
}
@@ -1031,17 +1062,23 @@ internal interface ReceiveOrClosed<in E> {
10311062
@Suppress("UNCHECKED_CAST")
10321063
internal class SendElement(
10331064
override val pollResult: Any?,
1034-
@JvmField val cont: CancellableContinuation<Unit>
1065+
cont: CancellableContinuation<Unit>
10351066
) : Send() {
1067+
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
1068+
private fun useCont() = _cont.getAndSet(null)
1069+
10361070
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1037-
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1071+
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: run {
1072+
_cont.value = null
1073+
return null
1074+
}
10381075
assert { token === RESUME_TOKEN } // the only other possible result
10391076
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
10401077
otherOp?.finishPrepare() // finish preparations
10411078
return RESUME_TOKEN
10421079
}
1043-
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1044-
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1080+
override fun completeResumeSend() { useCont()?.completeResume(RESUME_TOKEN) }
1081+
override fun resumeSendClosed(closed: Closed<*>) { useCont()?.resumeWithException(closed.sendException) }
10451082
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
10461083
}
10471084

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
1717
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
20+
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21+
internal expect fun <T> Continuation<T>.shareableDispose()
2022
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2123
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
24+
25+
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26+
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27+
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
28+
2229
internal expect fun isReuseSupportedInPlatform(): Boolean
2330
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
2431
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)

0 commit comments

Comments
 (0)