-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathCancellableContinuationImpl.kt
378 lines (326 loc) · 14.6 KB
/
CancellableContinuationImpl.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
/**
* @suppress **This is unstable API and it is subject to change.**
*/
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
public override val context: CoroutineContext = delegate.context
/*
* Implementation notes
*
* AbstractContinuation is a subset of Job with following limitations:
* 1) It can have only cancellation listeners
* 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
* 3) It can have at most one cancellation listener
* 4) Its cancellation listeners cannot be deregistered
* As a consequence it has much simpler state machine, more lightweight machinery and
* less dependencies.
*/
/* decision state machine
+-----------+ trySuspend +-----------+
| UNDECIDED | -------------> | SUSPENDED |
+-----------+ +-----------+
|
| tryResume
V
+-----------+
| RESUMED |
+-----------+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
*/
private val _decision = atomic(UNDECIDED)
/*
=== Internal states ===
name state class public state description
------ ------------ ------------ -----------
ACTIVE Active : Active active, no listeners
SINGLE_A CancelHandler : Active active, one cancellation listener
CANCELLED CancelledContinuation: Cancelled cancelled (final state)
COMPLETED any : Completed produced some result or threw an exception (final state)
*/
private val _state = atomic<Any?>(Active)
@Volatile
private var parentHandle: DisposableHandle? = null
internal val state: Any? get() = _state.value
public override val isActive: Boolean get() = state is NotCompleted
public override val isCompleted: Boolean get() = state !is NotCompleted
public override val isCancelled: Boolean get() = state is CancelledContinuation
public override fun initCancellability() {
// This method does nothing. Leftover for binary compatibility with old compiled code
}
// It is only invoked from an internal getResult function, so we can be sure it is not invoked twice
private fun installParentCancellationHandler() {
if (isCompleted) return // fast path 1 -- don't need to do anything if already completed
val parent = delegate.context[Job] ?: return // fast path 2 -- don't do anything without parent
parent.start() // make sure the parent is started
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(parent, this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
if (isCompleted) {
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement? = null
override fun takeState(): Any? = state
override fun cancelResult(state: Any?, cause: Throwable) {
if (state is CompletedWithCancellation) {
invokeHandlerSafely {
state.onCancellation(cause)
}
}
}
public override fun cancel(cause: Throwable?): Boolean {
_state.loop { state ->
if (state !is NotCompleted) return false // false if already complete or cancelling
// Active -- update to final state
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
// Invoke cancel handler if it was present
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
// Complete state update
disposeParentHandle()
dispatchResume(mode = MODE_ATOMIC_DEFAULT)
return true
}
}
private inline fun invokeHandlerSafely(block: () -> Unit) {
try {
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
context,
CompletionHandlerException("Exception in cancellation handler for $this", ex)
)
}
}
/**
* It is used when parent is cancelled to get the cancellation cause for this continuation.
*/
open fun getContinuationCancellationCause(parent: Job): Throwable =
parent.getCancellationException()
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
@PublishedApi
internal fun getResult(): Any? {
installParentCancellationHandler()
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
return getSuccessfulResult(state)
}
override fun resumeWith(result: Result<T>) {
resumeImpl(result.toState(), resumeMode)
}
override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
if (cancelled != null) {
// too late to resume (was cancelled) -- call handler
invokeHandlerSafely {
onCancellation(cancelled.cause)
}
}
}
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
resumeImpl(CompletedExceptionally(exception), mode)
public override fun invokeOnCancellation(handler: CompletionHandler) {
var handleCache: CancelHandler? = null
_state.loop { state ->
when (state) {
is Active -> {
val node = handleCache ?: makeHandler(handler).also { handleCache = it }
if (_state.compareAndSet(state, node)) return // quit on cas success
}
is CancelHandler -> multipleHandlersError(handler, state)
is CancelledContinuation -> {
/*
* Continuation was already cancelled, invoke directly.
* NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
* so we check to make sure that handler was installed just once.
*/
if (!state.makeHandled()) multipleHandlersError(handler, state)
/*
* :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
*/
invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) }
return
}
else -> {
/*
* Continuation was already completed, do nothing.
* NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
* but we have no way to check that it was installed just once in this case.
*/
return
}
}
}
}
private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
}
private fun makeHandler(handler: CompletionHandler): CancelHandler =
if (handler is CancelHandler) handler else InvokeOnCancel(handler)
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}
// returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
disposeParentHandle()
dispatchResume(resumeMode)
return null
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then resume attempt must be ignored,
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
if (state.makeResumed()) return state // tried to resume just once, but was cancelled
}
}
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
}
}
private fun alreadyResumedError(proposedUpdate: Any?) {
error("Already resumed, but proposed with update $proposedUpdate")
}
// Unregister from parent job
private fun disposeParentHandle() {
parentHandle?.let { // volatile read parentHandle (once)
it.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
override fun tryResume(value: T, idempotent: Any?): Any? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
val update: Any? = if (idempotent == null) value else
CompletedIdempotentResult(idempotent, value, state)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
disposeParentHandle()
return state
}
is CompletedIdempotentResult -> {
return if (state.idempotentResume === idempotent) {
check(state.result === value) { "Non-idempotent resume" }
state.token
} else {
null
}
}
else -> return null // cannot resume -- not active anymore
}
}
}
override fun tryResumeWithException(exception: Throwable): Any? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
val update = CompletedExceptionally(exception)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
disposeParentHandle()
return state
}
else -> return null // cannot resume -- not active anymore
}
}
}
override fun completeResume(token: Any) {
// note: We don't actually use token anymore, because handler needs to be invoked on cancellation only
dispatchResume(resumeMode)
}
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
val dc = delegate as? DispatchedContinuation
resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
is CompletedIdempotentResult -> state.result as T
is CompletedWithCancellation -> state.result as T
else -> state as T
}
// For nicer debugging
public override fun toString(): String =
"${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress"
protected open fun nameString(): String =
"CancellableContinuation"
}
// Marker for active continuation
internal interface NotCompleted
private object Active : NotCompleted {
override fun toString(): String = "Active"
}
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
private val handler: CompletionHandler
) : CancelHandler() {
override fun invoke(cause: Throwable?) {
handler.invoke(cause)
}
override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
}
private class CompletedIdempotentResult(
@JvmField val idempotentResume: Any?,
@JvmField val result: Any?,
@JvmField val token: NotCompleted
) {
override fun toString(): String = "CompletedIdempotentResult[$result]"
}
private class CompletedWithCancellation(
@JvmField val result: Any?,
@JvmField val onCancellation: (cause: Throwable) -> Unit
) {
override fun toString(): String = "CompletedWithCancellation[$result]"
}