-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathSelect.kt
447 lines (395 loc) · 18 KB
/
Select.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.selects
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
/**
* Scope for [select] invocation.
*/
public interface SelectBuilder<in R> {
/**
* Registers a clause in this [select] expression without additional parameters that does not select any value.
*/
public operator fun SelectClause0.invoke(block: suspend () -> R)
/**
* Registers clause in this [select] expression without additional parameters that selects value of type [Q].
*/
public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
/**
* Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
*/
public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
/**
* Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
* with the `null` value for this parameter that selects value of type [Q].
*/
public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
/**
* Clause that selects the given [block] after a specified timeout passes.
* If timeout is negative or zero, [block] is selected immediately.
*
* **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
*
* @param timeMillis timeout time in milliseconds.
*/
@ExperimentalCoroutinesApi
public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}
/**
* Clause for [select] expression without additional parameters that does not select any value.
*/
public interface SelectClause0 {
/**
* Registers this clause with the specified [select] instance and [block] of code.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
}
/**
* Clause for [select] expression without additional parameters that selects value of type [Q].
*/
public interface SelectClause1<out Q> {
/**
* Registers this clause with the specified [select] instance and [block] of code.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
}
/**
* Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
*/
public interface SelectClause2<in P, out Q> {
/**
* Registers this clause with the specified [select] instance and [block] of code.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
}
/**
* Internal representation of select instance. This instance is called _selected_ when
* the clause to execute is already picked.
*
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public interface SelectInstance<in R> {
/**
* Returns `true` when this [select] statement had already picked a clause to execute.
*/
public val isSelected: Boolean
/**
* Tries to select this instance.
*/
public fun trySelect(idempotent: Any?): Boolean
/**
* Performs action atomically with [trySelect].
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
/**
* Performs action atomically when [isSelected] is `false`.
*/
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
* All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
*/
public val completion: Continuation<R>
/**
* Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
*/
public fun resumeSelectCancellableWithException(exception: Throwable)
/**
* Disposes the specified handle when this instance is selected.
*/
public fun disposeOnSelect(handle: DisposableHandle)
}
/**
* Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
* in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
* is either _selected_ or _fails_.
*
* At most one clause is *atomically* selected and its block is executed. The result of the selected clause
* becomes the result of the select. If any clause _fails_, then the select invocation produces the
* corresponding exception. No clause is selected in this case.
*
* This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
* the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
* the clauses.
* There is no `default` clause for select expression. Instead, each selectable suspending function has the
* corresponding non-suspending version that can be used with a regular `when` expression to select one
* of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
*
* | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
* | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
* | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted]
* | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] | [offer][SendChannel.offer]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] | [poll][ReceiveChannel.poll]
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll]
* | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] | [tryLock][Mutex.tryLock]
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* Atomicity of cancellation depends on the clause: [onSend][SendChannel.onSend], [onReceive][ReceiveChannel.onReceive],
* [onReceiveOrNull][ReceiveChannel.onReceiveOrNull], and [onLock][Mutex.onLock] clauses are
* *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
* their respective operations.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this select operation
* was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
*
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
val scope = SelectBuilderImpl(uCont)
try {
builder(scope)
} catch (e: Throwable) {
scope.handleBuilderException(e)
}
scope.getResult()
}
@SharedImmutable
internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
@SharedImmutable
private val UNDECIDED: Any = Symbol("UNDECIDED")
@SharedImmutable
private val RESUMED: Any = Symbol("RESUMED")
@PublishedApi
internal class SelectBuilderImpl<in R>(
private val uCont: Continuation<R> // unintercepted delegate continuation
) : LockFreeLinkedListHead(), SelectBuilder<R>,
SelectInstance<R>, Continuation<R>, CoroutineStackFrame {
override val callerFrame: CoroutineStackFrame?
get() = uCont as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
// selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
private val _state = atomic<Any?>(this)
// this is basically our own SafeContinuation
private val _result = atomic<Any?>(UNDECIDED)
// cancellability support
@Volatile
private var parentHandle: DisposableHandle? = null
/* Result state machine
+-----------+ getResult +---------------------+ resume +---------+
| UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
+-----------+ +---------------------+ +---------+
|
| resume
V
+------------+ getResult
| value/Fail | -----------+
+------------+ |
^ |
| |
+-------------------+
*/
override val context: CoroutineContext get() = uCont.context
override val completion: Continuation<R> get() = this
private inline fun doResume(value: () -> Any?, block: () -> Unit) {
assert { isSelected } // "Must be selected first"
_result.loop { result ->
when {
result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED,
RESUMED
)) {
block()
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
// Resumes in MODE_DIRECT
override fun resumeWith(result: Result<R>) {
doResume({ result.toState() }) {
if (result.isFailure) {
uCont.resumeWithStackTrace(result.exceptionOrNull()!!)
} else {
uCont.resumeWith(result)
}
}
}
// Resumes in MODE_CANCELLABLE
override fun resumeSelectCancellableWithException(exception: Throwable) {
doResume({ CompletedExceptionally(exception) }) {
uCont.intercepted().resumeCancellableWithException(exception)
}
}
@PublishedApi
internal fun getResult(): Any? {
if (!isSelected) initCancellability()
var result = _result.value // atomic read
if (result === UNDECIDED) {
if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = _result.value // reread volatile var
}
when {
result === RESUMED -> throw IllegalStateException("Already resumed")
result is CompletedExceptionally -> throw result.cause
else -> return result // either COROUTINE_SUSPENDED or data
}
}
private fun initCancellability() {
val parent = context[Job] ?: return
val newRegistration = parent.invokeOnCompletion(
onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}
private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invoke(cause: Throwable?) {
if (trySelect(null))
resumeSelectCancellableWithException(job.getCancellationException())
}
override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
}
private val state: Any? get() {
_state.loop { state ->
if (state !is OpDescriptor) return state
state.perform(this)
}
}
@PublishedApi
internal fun handleBuilderException(e: Throwable) {
if (trySelect(null)) {
resumeWithException(e)
} else if (e !is CancellationException) {
/*
* Cannot handle this exception -- builder was already resumed with a different exception,
* so treat it as "unhandled exception". But only if it is not the completion reason
* and it's not the cancellation. Otherwise, in the face of structured concurrency
* the same exception will be reported to theglobal exception handler.
*/
val result = getResult()
if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
handleCoroutineException(context, e)
}
}
}
override val isSelected: Boolean get() = state !== this
override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
while (true) { // lock-free loop on state
val state = this.state
if (state === this) {
if (addLastIf(node, { this.state === this }))
return
} else { // already selected
handle.dispose()
return
}
}
}
private fun doAfterSelect() {
parentHandle?.dispose()
forEach<DisposeNode> {
it.handle.dispose()
}
}
// it is just like start(), but support idempotent start
override fun trySelect(idempotent: Any?): Boolean {
assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker"
while (true) { // lock-free loop on state
val state = this.state
when {
state === this -> {
if (_state.compareAndSet(this, idempotent)) {
doAfterSelect()
return true
}
}
// otherwise -- already selected
idempotent == null -> return false // already selected
state === idempotent -> return true // was selected with this marker
else -> return false
}
}
}
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
private inner class AtomicSelectOp(
@JvmField val desc: AtomicDesc,
@JvmField val select: Boolean
) : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
// helpers should never do it, or risk ruining progress when they come late
if (affected == null) {
// we are originator (affected reference is not null if helping)
prepareIfNotSelected()?.let { return it }
}
return desc.prepare(this)
}
override fun complete(affected: Any?, failure: Any?) {
completeSelect(failure)
desc.complete(this, failure)
}
fun prepareIfNotSelected(): Any? {
_state.loop { state ->
when {
state === this@AtomicSelectOp -> return null // already in progress
state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
state === this@SelectBuilderImpl -> {
if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
return null // success
}
else -> return ALREADY_SELECTED
}
}
}
private fun completeSelect(failure: Any?) {
val selectSuccess = select && failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
if (selectSuccess)
doAfterSelect()
}
}
}
override fun SelectClause0.invoke(block: suspend () -> R) {
registerSelectClause0(this@SelectBuilderImpl, block)
}
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
registerSelectClause1(this@SelectBuilderImpl, block)
}
override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
registerSelectClause2(this@SelectBuilderImpl, param, block)
}
override fun onTimeout(timeMillis: Long, block: suspend () -> R) {
if (timeMillis <= 0L) {
if (trySelect(null))
block.startCoroutineUnintercepted(completion)
return
}
val action = Runnable {
// todo: we could have replaced startCoroutine with startCoroutineUndispatched
// But we need a way to know that Delay.invokeOnTimeout had used the right thread
if (trySelect(null))
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
}
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
}
private class DisposeNode(
@JvmField val handle: DisposableHandle
) : LockFreeLinkedListNode()
}