-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathAbstractContinuation.kt
292 lines (248 loc) · 10.9 KB
/
AbstractContinuation.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal abstract class AbstractContinuation<in T>(
public final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), Continuation<T> {
/*
* Implementation notes
*
* AbstractContinuation is a subset of Job with following limitations:
* 1) It can have only cancellation listeners
* 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
* 3) It can have at most one cancellation listener
* 4) Its cancellation listeners cannot be deregistered
* As a consequence it has much simpler state machine, more lightweight machinery and
* less dependencies.
*/
/* decision state machine
+-----------+ trySuspend +-----------+
| UNDECIDED | -------------> | SUSPENDED |
+-----------+ +-----------+
|
| tryResume
V
+-----------+
| RESUMED |
+-----------+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
*/
private val _decision = atomic(UNDECIDED)
/*
=== Internal states ===
name state class public state description
------ ------------ ------------ -----------
ACTIVE Active : Active active, no listeners
SINGLE_A CancelHandler : Active active, one cancellation listener
CANCELLED Cancelled : Cancelled cancelled (final state)
COMPLETED any : Completed produced some result or threw an exception (final state)
*/
private val _state = atomic<Any?>(ACTIVE)
@Volatile
private var parentHandle: DisposableHandle? = null
internal val state: Any? get() = _state.value
public val isActive: Boolean get() = state is NotCompleted
public val isCompleted: Boolean get() = state !is NotCompleted
public val isCancelled: Boolean get() = state is CancelledContinuation
internal fun initParentJobInternal(parent: Job?) {
check(parentHandle == null)
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
val handle = parent.invokeOnCompletion(onCancelling = true,
handler = ChildContinuation(parent, this).asHandler)
parentHandle = handle
// now check our state _after_ registering (see updateStateToFinal order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
override fun takeState(): Any? = state
public fun cancel(cause: Throwable?): Boolean =
cancelImpl(cause)
fun cancelImpl(cause: Throwable?): Boolean {
loopOnState { state ->
if (state !is NotCompleted) return false // quit if already complete
val update = CancelledContinuation(this, cause)
if (updateStateToFinal(state, update, mode = MODE_ATOMIC_DEFAULT)) return true
}
}
/**
* It is used when parent is cancelled to get the cancellation cause for this continuation.
*/
open fun getContinuationCancellationCause(parent: Job): Throwable =
parent.getCancellationException()
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
@PublishedApi
internal fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
return getSuccessfulResult(state)
}
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(), resumeMode)
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
resumeImpl(CompletedExceptionally(exception), mode)
public fun invokeOnCancellation(handler: CompletionHandler) {
var handleCache: CancelHandler? = null
loopOnState { state ->
when (state) {
is Active -> {
val node = handleCache ?: makeHandler(handler).also { handleCache = it }
if (_state.compareAndSet(state, node)) {
return
}
}
is CancelHandler -> error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
is CancelledContinuation -> {
/*
* Continuation is complete, invoke directly.
* NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
* It's inconsistent with running continuation, but currently, we have no mechanism to check
* whether any handler was registered during continuation lifecycle without additional overhead.
* This may be changed in the future.
*
* :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
*/
handler.invokeIt((state as? CompletedExceptionally)?.cause)
return
}
else -> return
}
}
}
private fun makeHandler(handler: CompletionHandler): CancelHandler =
if (handler is CancelHandler) handler else InvokeOnCancel(handler)
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}
protected inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
loopOnState { state ->
when (state) {
is NotCompleted -> {
if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then all further resumes must be
* ignored, because cancellation is asynchronous and may race with resume.
* Racy exception are reported so no exceptions are lost
*
* :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
*/
if (proposedUpdate is CompletedExceptionally) {
handleException(proposedUpdate.cause)
}
return
}
else -> error("Already resumed, but proposed with update $proposedUpdate")
}
}
}
/**
* Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
*/
private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
return false
}
completeStateUpdate(expect, proposedUpdate, mode)
return true
}
protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean {
require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed
if (!_state.compareAndSet(expect, update)) return false
// Unregister from parent job
parentHandle?.let {
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
return true // continues in completeStateUpdate
}
protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
val exceptionally = update as? CompletedExceptionally
if (update is CancelledContinuation && expect is CancelHandler) {
try {
expect.invoke(exceptionally?.cause)
} catch (ex: Throwable) {
handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
}
}
// Notify all handlers before dispatching, otherwise behaviour will be timing-dependent
// and confusing with Unconfined
dispatchResume(mode)
}
private fun handleException(exception: Throwable) {
handleCoroutineException(context, exception)
}
// For nicer debugging
public override fun toString(): String =
"${nameString()}{${stateString()}}@$hexAddress"
protected open fun nameString(): String = classSimpleName
private fun stateString(): String {
val state = this.state
return when (state) {
is NotCompleted ->"Active"
is CancelledContinuation -> "Cancelled"
is CompletedExceptionally -> "CompletedExceptionally"
else -> "Completed"
}
}
}
// Marker for active continuation
internal interface NotCompleted
private class Active : NotCompleted
private val ACTIVE: Active = Active()
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
private val handler: CompletionHandler
) : CancelHandler() {
override fun invoke(cause: Throwable?) {
handler.invoke(cause)
}
override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
}