Skip to content

Commit a729d8b

Browse files
committed
Multithreaded support in select expression
Fixes #1764
1 parent 632ad6c commit a729d8b

File tree

15 files changed

+252
-67
lines changed

15 files changed

+252
-67
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10741074
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
10751075
public fun resumeWith (Ljava/lang/Object;)V
10761076
public fun toString ()Ljava/lang/String;
1077-
public fun trySelect ()Z
1078-
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1077+
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1078+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10791079
}
10801080

10811081
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1096,8 +1096,12 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10961096
public abstract fun isSelected ()Z
10971097
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10981098
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
1099-
public abstract fun trySelect ()Z
1100-
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1099+
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1100+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1101+
}
1102+
1103+
public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
1104+
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
11011105
}
11021106

11031107
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
138138
*/
139139
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
140140
initParentJob()
141-
startCoroutine(start, this, receiver, block)
141+
startCoroutine(start, receiver, this, block)
142142
}
143143
}

kotlinx-coroutines-core/common/src/Builders.common.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174174

175175
internal fun <T, R> startCoroutineImpl(
176176
start: CoroutineStart,
177-
coroutine: AbstractCoroutine<T>,
178177
receiver: R,
178+
completion: Continuation<T>,
179179
block: suspend R.() -> T
180180
) = when (start) {
181-
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
182-
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
183-
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
181+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
182+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
183+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
184184
CoroutineStart.LAZY -> Unit // will start lazily
185185
}
186186

@@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
189189
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
190190
internal expect fun <T, R> startCoroutine(
191191
start: CoroutineStart,
192-
coroutine: AbstractCoroutine<T>,
193192
receiver: R,
193+
completion: Continuation<T>,
194194
block: suspend R.() -> T
195195
)
196196

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+50-22
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,15 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382382
select.disposeOnSelect(node)
383383
return
384384
}
385-
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
385+
enqueueResult is Closed<*> -> {
386+
node.block.shareableDispose(useIt = true)
387+
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
388+
}
386389
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
387390
enqueueResult is Receive<*> -> {} // try to offer
388391
else -> error("enqueueSend returned $enqueueResult ")
389392
}
393+
node.block.shareableDispose(useIt = true)
390394
}
391395
// hm... receiver is waiting or buffer is not full. try to offer
392396
val offerResult = offerSelectInternal(element, select)
@@ -448,16 +452,19 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
448452
override val pollResult: Any?,
449453
@JvmField val channel: AbstractSendChannel<E>,
450454
@JvmField val select: SelectInstance<R>,
451-
@JvmField val block: suspend (SendChannel<E>) -> R
455+
block: suspend (SendChannel<E>) -> R
452456
) : Send(), DisposableHandle {
457+
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()
458+
453459
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
454-
select.trySelectOther(otherOp) as Symbol? // must return symbol
460+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol
455461

456462
override fun completeResumeSend() {
457-
block.startCoroutine(receiver = channel, completion = select.completion)
463+
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)
458464
}
459465

460466
override fun dispose() { // invoked on select completion
467+
block.shareableDispose(useIt = false)
461468
remove()
462469
}
463470

@@ -773,7 +780,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
773780
): Boolean {
774781
val node = ReceiveSelect(this, select, block, receiveMode)
775782
val result = enqueueReceive(node)
776-
if (result) select.disposeOnSelect(node)
783+
if (result) {
784+
select.disposeOnSelect(node)
785+
} else {
786+
node.block.shareableDispose(useIt = true)
787+
}
777788
return result
778789
}
779790

@@ -871,41 +882,47 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
871882
}
872883

873884
private class ReceiveElement<in E>(
874-
@JvmField val cont: CancellableContinuation<Any?>,
885+
cont: CancellableContinuation<Any?>,
875886
@JvmField val receiveMode: Int
876887
) : Receive<E>() {
888+
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
889+
877890
fun resumeValue(value: E): Any? = when (receiveMode) {
878891
RECEIVE_RESULT -> ValueOrClosed.value(value)
879892
else -> value
880893
}
881894

882895
@Suppress("IMPLICIT_CAST_TO_ANY")
883896
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
884-
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
897+
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: return null
885898
assert { token === RESUME_TOKEN } // the only other possible result
886899
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
887900
otherOp?.finishPrepare()
888901
return RESUME_TOKEN
889902
}
890903

891-
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
904+
override fun completeResumeReceive(value: E) { _cont.getAndSet(null)!!.completeResume(RESUME_TOKEN) }
892905

893906
override fun resumeReceiveClosed(closed: Closed<*>) {
907+
val cont = _cont.getAndSet(null)!!
894908
when {
895909
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
896910
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
897911
else -> cont.resumeWithException(closed.receiveException)
898912
}
899913
}
914+
900915
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
901916
}
902917

903918
private class ReceiveHasNext<E>(
904919
@JvmField val iterator: Itr<E>,
905-
@JvmField val cont: CancellableContinuation<Boolean>
920+
cont: CancellableContinuation<Boolean>
906921
) : Receive<E>() {
922+
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
923+
907924
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
908-
val token = cont.tryResume(true, otherOp?.desc) ?: return null
925+
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: return null
909926
assert { token === RESUME_TOKEN } // the only other possible result
910927
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
911928
otherOp?.finishPrepare()
@@ -918,10 +935,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
918935
but completeResumeReceive is called once so we set iterator result here.
919936
*/
920937
iterator.result = value
921-
cont.completeResume(RESUME_TOKEN)
938+
_cont.getAndSet(null)!!.completeResume(RESUME_TOKEN)
922939
}
923940

924941
override fun resumeReceiveClosed(closed: Closed<*>) {
942+
val cont = _cont.getAndSet(null)!!
925943
val token = if (closed.closeCause == null) {
926944
cont.tryResume(false)
927945
} else {
@@ -932,37 +950,45 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
932950
cont.completeResume(token)
933951
}
934952
}
953+
935954
override fun toString(): String = "ReceiveHasNext@$hexAddress"
936955
}
937956

938957
private class ReceiveSelect<R, E>(
939958
@JvmField val channel: AbstractChannel<E>,
940959
@JvmField val select: SelectInstance<R>,
941-
@JvmField val block: suspend (Any?) -> R,
960+
block: suspend (Any?) -> R,
942961
@JvmField val receiveMode: Int
943962
) : Receive<E>(), DisposableHandle {
963+
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening
964+
944965
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
945-
select.trySelectOther(otherOp) as Symbol?
966+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
946967

947968
@Suppress("UNCHECKED_CAST")
948969
override fun completeResumeReceive(value: E) {
949-
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
970+
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
950971
}
951972

952973
override fun resumeReceiveClosed(closed: Closed<*>) {
953-
if (!select.trySelect()) return
974+
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
954975
when (receiveMode) {
955-
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
956-
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
976+
RECEIVE_THROWS_ON_CLOSE -> {
977+
block.shareableDispose(useIt = true)
978+
select.resumeSelectWithException(closed.receiveException)
979+
}
980+
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
957981
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
958-
block.startCoroutine(null, select.completion)
982+
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
959983
} else {
984+
block.shareableDispose(useIt = true)
960985
select.resumeSelectWithException(closed.receiveException)
961986
}
962987
}
963988
}
964989

965990
override fun dispose() { // invoked on select completion
991+
block.shareableDispose(useIt = false)
966992
if (remove())
967993
channel.onReceiveDequeued() // notify cancellation of receive
968994
}
@@ -1031,17 +1057,19 @@ internal interface ReceiveOrClosed<in E> {
10311057
@Suppress("UNCHECKED_CAST")
10321058
internal class SendElement(
10331059
override val pollResult: Any?,
1034-
@JvmField val cont: CancellableContinuation<Unit>
1060+
cont: CancellableContinuation<Unit>
10351061
) : Send() {
1062+
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
1063+
10361064
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1037-
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1065+
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: return null
10381066
assert { token === RESUME_TOKEN } // the only other possible result
10391067
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
10401068
otherOp?.finishPrepare() // finish preparations
10411069
return RESUME_TOKEN
10421070
}
1043-
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1044-
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1071+
override fun completeResumeSend() { _cont.getAndSet(null)!!.completeResume(RESUME_TOKEN) }
1072+
override fun resumeSendClosed(closed: Closed<*>) { _cont.getAndSet(null)!!.resumeWithException(closed.sendException) }
10451073
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
10461074
}
10471075

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
1717
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
20+
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21+
internal expect fun <T> Continuation<T>.shareableDispose()
2022
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2123
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
24+
25+
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26+
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27+
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
28+
2229
internal expect fun isReuseSupportedInPlatform(): Boolean
2330
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
2431
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)

0 commit comments

Comments
 (0)