Skip to content

Commit 675a628

Browse files
committed
Multithreaded support in select expression
Fixes #1764
1 parent 1241e23 commit 675a628

File tree

18 files changed

+329
-123
lines changed

18 files changed

+329
-123
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10731073
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
10741074
public fun resumeWith (Ljava/lang/Object;)V
10751075
public fun toString ()Ljava/lang/String;
1076-
public fun trySelect ()Z
1077-
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1076+
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1077+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10781078
}
10791079

10801080
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1095,8 +1095,12 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10951095
public abstract fun isSelected ()Z
10961096
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10971097
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
1098-
public abstract fun trySelect ()Z
1099-
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1098+
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1099+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1100+
}
1101+
1102+
public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
1103+
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
11001104
}
11011105

11021106
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
138138
*/
139139
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
140140
initParentJob()
141-
startCoroutine(start, this, receiver, block)
141+
startCoroutine(start, receiver, this, block)
142142
}
143143
}

kotlinx-coroutines-core/common/src/Builders.common.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174174

175175
internal fun <T, R> startCoroutineImpl(
176176
start: CoroutineStart,
177-
coroutine: AbstractCoroutine<T>,
178177
receiver: R,
178+
completion: Continuation<T>,
179179
block: suspend R.() -> T
180180
) = when (start) {
181-
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
182-
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
183-
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
181+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
182+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
183+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
184184
CoroutineStart.LAZY -> Unit // will start lazily
185185
}
186186

@@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
189189
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
190190
internal expect fun <T, R> startCoroutine(
191191
start: CoroutineStart,
192-
coroutine: AbstractCoroutine<T>,
193192
receiver: R,
193+
completion: Continuation<T>,
194194
block: suspend R.() -> T
195195
)
196196

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+100-56
Large diffs are not rendered by default.

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ internal class ArrayBroadcastChannel<E>(
143143
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
144144
// update head in a tail rec loop
145145
var send: Send? = null
146+
var token: Any? = null
146147
state.withLock {
147148
if (addSub != null) {
148149
addSub.subHead = tail // start from last element
@@ -171,9 +172,8 @@ internal class ArrayBroadcastChannel<E>(
171172
while (true) {
172173
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
173174
if (send is Closed<*>) break // break when closed for send
174-
val token = send!!.tryResumeSend(null)
175+
token = send!!.tryResumeSend(null)
175176
if (token != null) {
176-
assert { token === RESUME_TOKEN }
177177
// put sent element to the buffer
178178
state.setBufferAt((tail % capacity).toInt(), (send as Send).pollResult)
179179
this.size = size + 1
@@ -186,7 +186,7 @@ internal class ArrayBroadcastChannel<E>(
186186
return // done updating here -> return
187187
}
188188
// we only get out of the lock normally when there is a sender to resume
189-
send!!.completeResumeSend()
189+
send!!.completeResumeSend(token!!)
190190
// since we've just sent an element, we might need to resume some receivers
191191
checkSubOffers()
192192
// tailrec call to recheck
@@ -237,6 +237,7 @@ internal class ArrayBroadcastChannel<E>(
237237
if (!subLock.tryLock()) break
238238
val receive: ReceiveOrClosed<E>?
239239
var result: Any?
240+
var token: Any?
240241
try {
241242
result = peekUnderLock()
242243
when {
@@ -249,15 +250,14 @@ internal class ArrayBroadcastChannel<E>(
249250
// find a receiver for an element
250251
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
251252
if (receive is Closed<*>) break // noting more to do if this sub already closed
252-
val token = receive.tryResumeReceive(result as E, null) ?: continue
253-
assert { token === RESUME_TOKEN }
253+
token = receive.tryResumeReceive(result as E, null) ?: continue
254254
val subHead = this.subHead
255255
this.subHead = subHead + 1 // retrieved element for this subscriber
256256
updated = true
257257
} finally {
258258
subLock.unlock()
259259
}
260-
receive!!.completeResumeReceive(result as E)
260+
receive!!.completeResumeReceive(result as E, token!!)
261261
}
262262
// do close outside of lock if needed
263263
closed?.also { close(cause = it.closeCause) }

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+14-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.channels
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.RESUME_TOKEN
89
import kotlinx.coroutines.internal.*
910
import kotlinx.coroutines.selects.*
1011
import kotlin.math.*
@@ -45,6 +46,7 @@ internal open class ArrayChannel<E>(
4546
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
4647
protected override fun offerInternal(element: E): Any {
4748
var receive: ReceiveOrClosed<E>? = null
49+
var token: Any? = null
4850
state.withLock {
4951
val size = state.size
5052
closedForSend?.let { return it }
@@ -59,9 +61,8 @@ internal open class ArrayChannel<E>(
5961
state.size = size // restore size
6062
return receive!!
6163
}
62-
val token = receive!!.tryResumeReceive(element, null)
64+
token = receive!!.tryResumeReceive(element, null)
6365
if (token != null) {
64-
assert { token === RESUME_TOKEN }
6566
state.size = size // restore size
6667
return@withLock
6768
}
@@ -75,13 +76,14 @@ internal open class ArrayChannel<E>(
7576
return OFFER_FAILED
7677
}
7778
// breaks here if offer meets receiver
78-
receive!!.completeResumeReceive(element)
79+
receive!!.completeResumeReceive(element, token!!)
7980
return receive!!.offerResult
8081
}
8182

8283
// result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
8384
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
8485
var receive: ReceiveOrClosed<E>? = null
86+
var token: Any? = null
8587
state.withLock {
8688
val size = state.size
8789
closedForSend?.let { return it }
@@ -97,6 +99,7 @@ internal open class ArrayChannel<E>(
9799
failure == null -> { // offered successfully
98100
state.size = size // restore size
99101
receive = offerOp.result
102+
token = offerOp.takeToken()
100103
return@withLock
101104
}
102105
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
@@ -122,7 +125,7 @@ internal open class ArrayChannel<E>(
122125
return OFFER_FAILED
123126
}
124127
// breaks here if offer meets receiver
125-
receive!!.completeResumeReceive(element)
128+
receive!!.completeResumeReceive(element, token!!)
126129
return receive!!.offerResult
127130
}
128131

@@ -133,6 +136,7 @@ internal open class ArrayChannel<E>(
133136
// result is `E | POLL_FAILED | Closed`
134137
protected override fun pollInternal(): Any? {
135138
var send: Send? = null
139+
var token: Any? = null
136140
var resumed = false
137141
var result: Any? = null
138142
state.withLock {
@@ -148,9 +152,8 @@ internal open class ArrayChannel<E>(
148152
loop@ while (true) {
149153
send = takeFirstSendOrPeekClosed() ?: break
150154
disposeQueue { send as? Closed<*> }
151-
val token = send!!.tryResumeSend(null)
155+
token = send!!.tryResumeSend(null)
152156
if (token != null) {
153-
assert { token === RESUME_TOKEN }
154157
resumed = true
155158
replacement = send!!.pollResult
156159
break@loop
@@ -165,7 +168,7 @@ internal open class ArrayChannel<E>(
165168
}
166169
// complete send the we're taken replacement from
167170
if (resumed)
168-
send!!.completeResumeSend()
171+
send!!.completeResumeSend(token!!)
169172
return result
170173
}
171174

@@ -174,6 +177,7 @@ internal open class ArrayChannel<E>(
174177
var send: Send? = null
175178
var success = false
176179
var result: Any? = null
180+
var token: Any? = null
177181
state.withLock {
178182
val size = state.size
179183
if (size == 0) return closedForSend ?: POLL_FAILED
@@ -190,6 +194,7 @@ internal open class ArrayChannel<E>(
190194
when {
191195
failure == null -> { // polled successfully
192196
send = pollOp.result
197+
token = pollOp.takeToken()
193198
success = true
194199
replacement = send!!.pollResult
195200
break@loop
@@ -203,6 +208,7 @@ internal open class ArrayChannel<E>(
203208
}
204209
failure is Closed<*> -> {
205210
send = failure
211+
token = RESUME_TOKEN // :KLUDGE: We know that's the token to resume failed send`
206212
success = true
207213
replacement = failure
208214
break@loop
@@ -226,7 +232,7 @@ internal open class ArrayChannel<E>(
226232
}
227233
// complete send the we're taken replacement from
228234
if (success)
229-
send!!.completeResumeSend()
235+
send!!.completeResumeSend(token!!)
230236
return result
231237
}
232238

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+7-8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.coroutines.channels
66

7-
import kotlinx.coroutines.*
87
import kotlinx.coroutines.internal.*
98
import kotlinx.coroutines.selects.*
109
import kotlin.jvm.*
@@ -36,6 +35,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
3635
// result is `OFFER_SUCCESS | Closed`
3736
protected override fun offerInternal(element: E): Any {
3837
var receive: ReceiveOrClosed<E>? = null
38+
var token: Any? = null
3939
state.withLock {
4040
closedForSend?.let { return it }
4141
// if there is no element written in buffer
@@ -46,24 +46,22 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
4646
if (receive is Closed) {
4747
return receive!!
4848
}
49-
val token = receive!!.tryResumeReceive(element, null)
50-
if (token != null) {
51-
assert { token === RESUME_TOKEN }
52-
return@withLock
53-
}
49+
token = receive!!.tryResumeReceive(element, null)
50+
if (token != null) return@withLock
5451
}
5552
}
5653
state.value = element
5754
return OFFER_SUCCESS
5855
}
5956
// breaks here if offer meets receiver
60-
receive!!.completeResumeReceive(element)
57+
receive!!.completeResumeReceive(element, token!!)
6158
return receive!!.offerResult
6259
}
6360

6461
// result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
6562
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
6663
var receive: ReceiveOrClosed<E>? = null
64+
var token: Any? = null
6765
state.withLock {
6866
closedForSend?.let { return it }
6967
if (state.value === EMPTY) {
@@ -73,6 +71,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
7371
when {
7472
failure == null -> { // offered successfully
7573
receive = offerOp.result
74+
token = offerOp.takeToken()
7675
return@withLock
7776
}
7877
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
@@ -90,7 +89,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
9089
return OFFER_SUCCESS
9190
}
9291
// breaks here if offer meets receiver
93-
receive!!.completeResumeReceive(element)
92+
receive!!.completeResumeReceive(element, token!!)
9493
return receive!!.offerResult
9594
}
9695

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
1717
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
20+
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21+
internal expect fun <T> Continuation<T>.shareableDispose()
2022
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2123
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
24+
25+
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26+
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27+
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
28+
2229
internal expect fun isReuseSupportedInPlatform(): Boolean
2330
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
2431
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)

0 commit comments

Comments
 (0)