-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBuilders.common.kt
382 lines (350 loc) · 17.5 KB
/
Builders.common.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.concurrent.Volatile
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
// --------------- launch ---------------
/**
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with a corresponding [context] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
*
* Uncaught exceptions in this coroutine cancel the parent job in the context by default
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
* the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the coroutine code which will be invoked in the context of the provided scope.
**/
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
// --------------- async ---------------
/**
* Creates a coroutine and returns its future result as an implementation of [Deferred].
* The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
* The resulting coroutine has a key difference compared with similar primitives in other languages
* and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm.
* To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [context] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
*
* @param block the coroutine code.
*/
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
@OptIn(InternalForInheritanceCoroutinesApi::class)
@Suppress("UNCHECKED_CAST")
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
}
private class LazyDeferredCoroutine<T>(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> T
) : DeferredCoroutine<T>(parentContext, active = false) {
private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
// --------------- withContext ---------------
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
* the result.
*
* The resulting context for the [block] is derived by merging the current [coroutineContext] with the
* specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]).
* This suspending function is cancellable. It immediately checks for cancellation of
* the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive].
*
* Calls to [withContext] whose [context] argument provides a [CoroutineDispatcher] that is
* different from the current one, by necessity, perform additional dispatches: the [block]
* can not be executed immediately and needs to be dispatched for execution on
* the passed [CoroutineDispatcher], and then when the [block] completes, the execution
* has to shift back to the original dispatcher.
*
* Note that the result of `withContext` invocation is dispatched into the original context in a cancellable way
* with a **prompt cancellation guarantee**, which means that if the original [coroutineContext]
* in which `withContext` was invoked is cancelled by the time its dispatcher starts to execute the code,
* it discards the result of `withContext` and throws [CancellationException].
*
* The cancellation behaviour described above is enabled if and only if the dispatcher is being changed.
* For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and
* this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it.
*/
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
// Copy CopyableThreadContextElement if necessary
val newContext = oldContext.newCoroutineContext(context)
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(coroutine.context, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
/**
* Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it
* completes, and returns the result.
*
* This inline function calls [withContext].
*/
public suspend inline operator fun <T> CoroutineDispatcher.invoke(
noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// --------------- implementation ---------------
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
// Used by withContext when context changes, but dispatcher stays the same
internal class UndispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {
/**
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
* See the followin, boiled down example with inlined `withContinuationContext` body:
* ```
* val state = saveThreadContext(ctx)
* try {
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
* // COROUTINE_SUSPENDED is returned
* } finally {
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
* // and it also calls saveThreadContext and clearThreadContext
* }
* ```
*
* Usage note:
*
* This part of the code is performance-sensitive.
* It is a well-established pattern to wrap various activities into system-specific undispatched
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
* undispatched coroutines.
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
* when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
* (You can read more about this effect as "GC nepotism").
*
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
* - It's never accessed when we are sure there are no thread context elements
* - It's cleaned up via [ThreadLocal.remove] as soon as the coroutine is suspended or finished.
*/
private val threadStateToRecover = commonThreadLocal<Pair<CoroutineContext, Any?>?>(Symbol("UndispatchedCoroutine"))
/*
* Indicates that a coroutine has at least one thread context element associated with it
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
* Better than nullable thread-local for easier debugging.
*
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
* (note: tl.get() initializes thread local),
* and is prone to false-positives as it is never reset: otherwise
* it may lead to logical data races between suspensions point where
* coroutine is yet being suspended in one thread while already being resumed
* in another.
*/
@Volatile
private var threadLocalIsSet = false
init {
/*
* This is a hack for a very specific case in #2930 unless #3253 is implemented.
* 'ThreadLocalStressTest' covers this change properly.
*
* The scenario this change covers is the following:
* 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
* e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
* `withContext(tlElement)` which creates `UndispatchedCoroutine`.
* 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
* and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
* do thread context element tracking.
* 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
* but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
*
* Here we detect precisely this situation and properly setup context to recover later.
*
*/
if (uCont.context[ContinuationInterceptor] !is CoroutineDispatcher) {
/*
* We cannot just "read" the elements as there is no such API,
* so we update-restore it immediately and use the intermediate value
* as the initial state, leveraging the fact that thread context element
* is idempotent and such situations are increasingly rare.
*/
val values = updateThreadContext(context, null)
restoreThreadContext(context, values)
saveThreadContext(context, values)
}
}
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
threadLocalIsSet = true // Specify that thread-local is touched at all
threadStateToRecover.set(context to oldValue)
}
fun clearThreadContext(): Boolean {
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
threadStateToRecover.remove()
}
}
override fun afterCompletionUndispatched() {
clearThreadLocal()
}
override fun afterResume(state: Any?) {
clearThreadLocal()
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
withContinuationContext(uCont, null) {
uCont.resumeWith(result)
}
}
private fun clearThreadLocal() {
if (threadLocalIsSet) {
threadStateToRecover.get()?.let { (ctx, value) ->
restoreThreadContext(ctx, value)
}
threadStateToRecover.remove()
}
}
}
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// this is copy-and-paste of a decision state machine inside AbstractionContinuation
// todo: we may some-how abstract it via inline class
private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
override fun afterCompletion(state: Any?) {
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more
// important for afterResume implementation
afterResume(state)
}
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
internal fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
}