@@ -8,44 +8,12 @@ import kotlinx.coroutines.internal.*
8
8
import kotlin.coroutines.*
9
9
import kotlin.jvm.*
10
10
11
- /* *
12
- * Non-cancellable dispatch mode.
13
- *
14
- * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
15
- * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
16
- */
17
- internal const val MODE_ATOMIC = 0
18
-
19
- /* *
20
- * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
21
- * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
22
- *
23
- * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
24
- */
25
- @PublishedApi
26
- internal const val MODE_CANCELLABLE : Int = 1
27
-
28
- /* *
29
- * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
30
- * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
31
- * implementation of reuse checks mode via [Int.isReusableMode] extension.
32
- */
33
- internal const val MODE_CANCELLABLE_REUSABLE = 2
34
-
35
- /* *
36
- * Undispatched mode for [CancellableContinuation.resumeUndispatched].
37
- * It is used when the thread is right, but it needs to be marked with the current coroutine.
38
- */
39
- internal const val MODE_UNDISPATCHED = 4
11
+ @PublishedApi internal const val MODE_ATOMIC_DEFAULT : Int = 0 // schedule non-cancellable dispatch for suspendCoroutine
12
+ @PublishedApi internal const val MODE_CANCELLABLE : Int = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
13
+ @PublishedApi internal const val MODE_UNDISPATCHED : Int = 2 // when the thread is right, but need to mark it with current coroutine
40
14
41
- /* *
42
- * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
43
- * overwritten when continuation is resumed with the actual resume mode.
44
- */
45
- internal const val MODE_UNINITIALIZED = - 1
46
-
47
- internal val Int .isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
48
- internal val Int .isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
15
+ internal val Int .isCancellableMode get() = this == MODE_CANCELLABLE
16
+ internal val Int .isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
49
17
50
18
internal abstract class DispatchedTask <in T >(
51
19
@JvmField public var resumeMode : Int
@@ -54,57 +22,38 @@ internal abstract class DispatchedTask<in T>(
54
22
55
23
internal abstract fun takeState (): Any?
56
24
57
- /* *
58
- * Called when this task was cancelled while it was being dispatched.
59
- */
60
- internal open fun cancelCompletedResult (takenState : Any? , cause : Throwable ) {}
25
+ internal open fun cancelResult (state : Any? , cause : Throwable ) {}
61
26
62
- /* *
63
- * There are two implementations of `DispatchedTask`:
64
- * * [DispatchedContinuation] keeps only simple values as successfully results.
65
- * * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
66
- */
67
27
@Suppress(" UNCHECKED_CAST" )
68
28
internal open fun <T > getSuccessfulResult (state : Any? ): T =
69
29
state as T
70
30
71
- /* *
72
- * There are two implementations of `DispatchedTask`:
73
- * * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
74
- * properly recovered and is ready to pass to the [delegate] continuation directly.
75
- * * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
76
- * its stack-trace has to be recovered, so it overrides this method for that purpose.
77
- */
78
- internal open fun getExceptionalResult (state : Any? ): Throwable ? =
31
+ internal fun getExceptionalResult (state : Any? ): Throwable ? =
79
32
(state as ? CompletedExceptionally )?.cause
80
33
81
34
public final override fun run () {
82
- assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
83
35
val taskContext = this .taskContext
84
36
var fatalException: Throwable ? = null
85
37
try {
86
- val delegate = delegate as DispatchedContinuation <T >
38
+ val delegate = delegate.useLocal() as DispatchedContinuation <T > // cast must succeed
87
39
val continuation = delegate.continuation
88
40
val context = continuation.context
89
41
val state = takeState() // NOTE: Must take state in any case, even if cancelled
90
42
withCoroutineContext(context, delegate.countOrElement) {
91
43
val exception = getExceptionalResult(state)
44
+ val job = if (resumeMode.isCancellableMode) context[Job ] else null
92
45
/*
93
46
* Check whether continuation was originally resumed with an exception.
94
47
* If so, it dominates cancellation, otherwise the original exception
95
48
* will be silently lost.
96
49
*/
97
- val job = if (exception == null && resumeMode.isCancellableMode) context[Job ] else null
98
- if (job != null && ! job.isActive) {
50
+ if (exception == null && job != null && ! job.isActive) {
99
51
val cause = job.getCancellationException()
100
- cancelCompletedResult (state, cause)
52
+ cancelResult (state, cause)
101
53
continuation.resumeWithStackTrace(cause)
102
54
} else {
103
- if (exception != null ) {
104
- continuation.resumeWithException(exception)
105
- } else {
106
- continuation.resume(getSuccessfulResult(state))
107
- }
55
+ if (exception != null ) continuation.resumeWithException(exception)
56
+ else continuation.resume(getSuccessfulResult(state))
108
57
}
109
58
}
110
59
} catch (e: Throwable ) {
@@ -147,47 +96,44 @@ internal abstract class DispatchedTask<in T>(
147
96
}
148
97
}
149
98
150
- internal fun <T > DispatchedTask<T>.dispatch (mode : Int ) {
151
- assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
99
+ internal fun <T > CancellableContinuationImpl<T>.dispatch (mode : Int ) {
152
100
val delegate = this .delegate
153
- val undispatched = mode == MODE_UNDISPATCHED
154
- if (! undispatched && delegate is DispatchedContinuation <* > && mode.isCancellableMode == resumeMode.isCancellableMode) {
101
+ val local = delegate.asLocalOrNull() // go to shareableResume when called from wrong worker
102
+ if (mode.isDispatchedMode && local is DispatchedContinuation <* > && mode.isCancellableMode == resumeMode.isCancellableMode) {
155
103
// dispatch directly using this instance's Runnable implementation
156
- val dispatcher = delegate .dispatcher
157
- val context = delegate .context
104
+ val dispatcher = local .dispatcher
105
+ val context = local .context
158
106
if (dispatcher.isDispatchNeeded(context)) {
159
107
dispatcher.dispatch(context, this )
160
108
} else {
161
109
resumeUnconfined()
162
110
}
163
111
} else {
164
- // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
165
- // or undispatched mode was requested
166
- resume(delegate, undispatched)
112
+ shareableResume(delegate, mode)
167
113
}
168
114
}
169
115
170
- @Suppress(" UNCHECKED_CAST" )
171
- internal fun <T > DispatchedTask<T>.resume (delegate : Continuation <T >, undispatched : Boolean ) {
172
- // This resume is never cancellable. The result is always delivered to delegate continuation.
116
+ internal fun <T > CancellableContinuationImpl<T>.resumeImpl (delegate : Continuation <T >, useMode : Int ) {
173
117
val state = takeState()
174
- val exception = getExceptionalResult(state)
118
+ val exception = getExceptionalResult(state)?. let { recoverStackTrace(it, delegate) }
175
119
val result = if (exception != null ) Result .failure(exception) else Result .success(getSuccessfulResult<T >(state))
176
- when {
177
- undispatched -> (delegate as DispatchedContinuation ).resumeUndispatchedWith(result)
178
- else -> delegate.resumeWith(result)
120
+ when (useMode) {
121
+ MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
122
+ MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
123
+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation ).resumeUndispatchedWith(result)
124
+ else -> error(" Invalid mode $useMode " )
179
125
}
180
126
}
181
127
182
- private fun DispatchedTask < * >.resumeUnconfined () {
128
+ private fun < T > CancellableContinuationImpl<T >.resumeUnconfined () {
183
129
val eventLoop = ThreadLocalEventLoop .eventLoop
184
130
if (eventLoop.isUnconfinedLoopActive) {
185
131
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
186
132
eventLoop.dispatchUnconfined(this )
187
133
} else {
188
134
// Was not active -- run event loop until all unconfined tasks are executed
189
135
runUnconfinedEventLoop(eventLoop) {
190
- resume (delegate, undispatched = true )
136
+ resumeImpl (delegate.useLocal(), MODE_UNDISPATCHED )
191
137
}
192
138
}
193
139
}
0 commit comments