Skip to content

Commit 877c70f

Browse files
committed
Move ThreadContextElement to common
1 parent be5be56 commit 877c70f

19 files changed

+600
-526
lines changed

Diff for: kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

+5
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ abstract interface <#A: kotlin/Any?> kotlinx.coroutines/CompletableDeferred : ko
186186
abstract fun completeExceptionally(kotlin/Throwable): kotlin/Boolean // kotlinx.coroutines/CompletableDeferred.completeExceptionally|completeExceptionally(kotlin.Throwable){}[0]
187187
}
188188

189+
abstract interface <#A: kotlin/Any?> kotlinx.coroutines/ThreadContextElement : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/ThreadContextElement|null[0]
190+
abstract fun restoreThreadContext(kotlin.coroutines/CoroutineContext, #A) // kotlinx.coroutines/ThreadContextElement.restoreThreadContext|restoreThreadContext(kotlin.coroutines.CoroutineContext;1:0){}[0]
191+
abstract fun updateThreadContext(kotlin.coroutines/CoroutineContext): #A // kotlinx.coroutines/ThreadContextElement.updateThreadContext|updateThreadContext(kotlin.coroutines.CoroutineContext){}[0]
192+
}
193+
189194
abstract interface <#A: kotlin/Throwable & kotlinx.coroutines/CopyableThrowable<#A>> kotlinx.coroutines/CopyableThrowable { // kotlinx.coroutines/CopyableThrowable|null[0]
190195
abstract fun createCopy(): #A? // kotlinx.coroutines/CopyableThrowable.createCopy|createCopy(){}[0]
191196
}

Diff for: kotlinx-coroutines-core/common/src/Builders.common.kt

+108-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.atomicfu.*
88
import kotlinx.coroutines.internal.*
99
import kotlinx.coroutines.intrinsics.*
1010
import kotlinx.coroutines.selects.*
11+
import kotlin.concurrent.Volatile
1112
import kotlin.contracts.*
1213
import kotlin.coroutines.*
1314
import kotlin.coroutines.intrinsics.*
@@ -205,10 +206,115 @@ private class LazyStandaloneCoroutine(
205206
}
206207

207208
// Used by withContext when context changes, but dispatcher stays the same
208-
internal expect class UndispatchedCoroutine<in T>(
209+
internal class UndispatchedCoroutine<in T>(
209210
context: CoroutineContext,
210211
uCont: Continuation<T>
211-
) : ScopeCoroutine<T>
212+
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {
213+
214+
/**
215+
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
216+
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
217+
* See the followin, boiled down example with inlined `withContinuationContext` body:
218+
* ```
219+
* val state = saveThreadContext(ctx)
220+
* try {
221+
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
222+
* // COROUTINE_SUSPENDED is returned
223+
* } finally {
224+
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
225+
* // and it also calls saveThreadContext and clearThreadContext
226+
* }
227+
* ```
228+
*
229+
* Usage note:
230+
*
231+
* This part of the code is performance-sensitive.
232+
* It is a well-established pattern to wrap various activities into system-specific undispatched
233+
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
234+
* undispatched coroutines.
235+
* Each access to [CommonThreadLocal] on JVM leaves a footprint in the corresponding Thread's `ThreadLocalMap`
236+
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected.
237+
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
238+
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
239+
* (You can read more about this effect as "GC nepotism").
240+
*
241+
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
242+
* - It's never accessed when we are sure there are no thread context elements
243+
* - It's cleaned up via [CommonThreadLocal.remove] as soon as the coroutine is suspended or finished.
244+
*/
245+
private val threadStateToRecover = commonThreadLocal<Pair<CoroutineContext, Any?>?>(Symbol("UndispatchedCoroutine"))
246+
247+
/*
248+
* Indicates that a coroutine has at least one thread context element associated with it
249+
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
250+
* Better than nullable thread-local for easier debugging.
251+
*
252+
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
253+
* (note: tl.get() initializes thread local),
254+
* and is prone to false-positives as it is never reset: otherwise
255+
* it may lead to logical data races between suspensions point where
256+
* coroutine is yet being suspended in one thread while already being resumed
257+
* in another.
258+
*/
259+
@Volatile
260+
private var threadLocalIsSet = false
261+
262+
init {
263+
/*
264+
* This is a hack for a very specific case in #2930 unless #3253 is implemented.
265+
* 'ThreadLocalStressTest' covers this change properly.
266+
*
267+
* The scenario this change covers is the following:
268+
* 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
269+
* e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
270+
* `withContext(tlElement)` which creates `UndispatchedCoroutine`.
271+
* 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
272+
* and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
273+
* do thread context element tracking.
274+
* 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
275+
* but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
276+
*
277+
* Here we detect precisely this situation and properly setup context to recover later.
278+
*
279+
*/
280+
if (uCont.context[ContinuationInterceptor] !is CoroutineDispatcher) {
281+
/*
282+
* We cannot just "read" the elements as there is no such API,
283+
* so we update-restore it immediately and use the intermediate value
284+
* as the initial state, leveraging the fact that thread context element
285+
* is idempotent and such situations are increasingly rare.
286+
*/
287+
val values = updateThreadContext(context, null)
288+
restoreThreadContext(context, values)
289+
saveThreadContext(context, values)
290+
}
291+
}
292+
293+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
294+
threadLocalIsSet = true // Specify that thread-local is touched at all
295+
threadStateToRecover.set(context to oldValue)
296+
}
297+
298+
fun clearThreadContext(): Boolean {
299+
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
300+
threadStateToRecover.remove()
301+
}
302+
}
303+
304+
override fun afterResume(state: Any?) {
305+
if (threadLocalIsSet) {
306+
threadStateToRecover.get()?.let { (ctx, value) ->
307+
restoreThreadContext(ctx, value)
308+
}
309+
threadStateToRecover.remove()
310+
}
311+
// resume undispatched -- update context but stay on the same dispatcher
312+
val result = recoverResult(state, uCont)
313+
withContinuationContext(uCont, null) {
314+
uCont.resumeWith(result)
315+
}
316+
}
317+
}
212318

213319
private const val UNDECIDED = 0
214320
private const val SUSPENDED = 1

Diff for: kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+76-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines
22

3+
import kotlinx.coroutines.internal.*
34
import kotlin.coroutines.*
45

56
/**
@@ -20,8 +21,80 @@ public expect fun CoroutineContext.newCoroutineContext(addedContext: CoroutineCo
2021
@Suppress("PropertyName")
2122
internal expect val DefaultDelay: Delay
2223

23-
// countOrElement -- pre-cached value for ThreadContext.kt
24-
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
25-
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2624
internal expect fun Continuation<*>.toDebugString(): String
2725
internal expect val CoroutineContext.coroutineName: String?
26+
27+
/**
28+
* Executes a block using a given coroutine context.
29+
* @param countOrElement pre-cached value for [updateThreadContext]
30+
*/
31+
internal inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
32+
val oldValue = updateThreadContext(context, countOrElement)
33+
try {
34+
return block()
35+
} finally {
36+
restoreThreadContext(context, oldValue)
37+
}
38+
}
39+
40+
/**
41+
* Executes a block using a context of a given continuation.
42+
* @param countOrElement pre-cached value for [updateThreadContext]
43+
*/
44+
internal inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
45+
val context = continuation.context
46+
val oldValue = updateThreadContext(context, countOrElement)
47+
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
48+
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
49+
continuation.updateUndispatchedCompletion(context, oldValue)
50+
} else {
51+
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
52+
}
53+
try {
54+
return block()
55+
} finally {
56+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
57+
restoreThreadContext(context, oldValue)
58+
}
59+
}
60+
}
61+
62+
private fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? {
63+
if (this !is CoroutineStackFrame) return null
64+
/*
65+
* Fast-path to detect whether we have undispatched coroutine at all in our stack.
66+
*
67+
* Implementation note.
68+
* If we ever find that stackwalking for thread-locals is way too slow, here is another idea:
69+
* 1) Store undispatched coroutine right in the `UndispatchedMarker` instance
70+
* 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker`
71+
* from the context when creating dispatched coroutine in `withContext`.
72+
* Another option is to "unmark it" instead of removing to save an allocation.
73+
* Both options should work, but it requires more careful studying of the performance
74+
* and, mostly, maintainability impact.
75+
*/
76+
val potentiallyHasUndispatchedCoroutine = context[UndispatchedMarker] !== null
77+
if (!potentiallyHasUndispatchedCoroutine) return null
78+
val completion = undispatchedCompletion()
79+
completion?.saveThreadContext(context, oldValue)
80+
return completion
81+
}
82+
83+
private tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? {
84+
// Find direct completion of this continuation
85+
val completion: CoroutineStackFrame = when (this) {
86+
is DispatchedCoroutine<*> -> return null
87+
else -> callerFrame ?: return null // something else -- not supported
88+
}
89+
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
90+
return completion.undispatchedCompletion() // walk up the call stack with tail call
91+
}
92+
93+
/**
94+
* Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack.
95+
* Used as a performance optimization to avoid stack walking where it is not necessary.
96+
*/
97+
internal object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> {
98+
override val key: CoroutineContext.Key<*>
99+
get() = this
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package kotlinx.coroutines
2+
3+
import kotlin.coroutines.*
4+
5+
/**
6+
* Defines elements in [CoroutineContext] that are installed into thread context
7+
* every time the coroutine with this element in the context is resumed on a thread.
8+
*
9+
* Implementations of this interface define a type [S] of the thread-local state that they need to store on
10+
* resume of a coroutine and restore later on suspend. The infrastructure provides the corresponding storage.
11+
*
12+
* Example usage looks like this:
13+
*
14+
* ```
15+
* // Appends "name" of a coroutine to a current thread name when coroutine is executed
16+
* class CoroutineName(val name: String) : ThreadContextElement<String> {
17+
* // declare companion object for a key of this element in coroutine context
18+
* companion object Key : CoroutineContext.Key<CoroutineName>
19+
*
20+
* // provide the key of the corresponding context element
21+
* override val key: CoroutineContext.Key<CoroutineName>
22+
* get() = Key
23+
*
24+
* // this is invoked before coroutine is resumed on current thread
25+
* override fun updateThreadContext(context: CoroutineContext): String {
26+
* val previousName = Thread.currentThread().name
27+
* Thread.currentThread().name = "$previousName # $name"
28+
* return previousName
29+
* }
30+
*
31+
* // this is invoked after coroutine has suspended on current thread
32+
* override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
33+
* Thread.currentThread().name = oldState
34+
* }
35+
* }
36+
*
37+
* // Usage
38+
* launch(Dispatchers.Main + CoroutineName("Progress bar coroutine")) { ... }
39+
* ```
40+
*
41+
* Every time this coroutine is resumed on a thread, UI thread name is updated to
42+
* "UI thread original name # Progress bar coroutine" and the thread name is restored to the original one when
43+
* this coroutine suspends.
44+
*
45+
* To use [ThreadLocal] variable within the coroutine use [ThreadLocal.asContextElement][asContextElement] function.
46+
*
47+
* ### Reentrancy and thread-safety
48+
*
49+
* Correct implementations of this interface must expect that calls to [restoreThreadContext]
50+
* may happen in parallel to the subsequent [updateThreadContext] and [restoreThreadContext] operations.
51+
* See [CopyableThreadContextElement] for advanced interleaving details.
52+
*
53+
* All implementations of [ThreadContextElement] should be thread-safe and guard their internal mutable state
54+
* within an element accordingly.
55+
*/
56+
public interface ThreadContextElement<S> : CoroutineContext.Element {
57+
/**
58+
* Updates context of the current thread.
59+
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread
60+
* when the context of the coroutine this element.
61+
* The result of this function is the old value of the thread-local state that will be passed to [restoreThreadContext].
62+
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
63+
* context is updated in an undefined state and may crash an application.
64+
*
65+
* @param context the coroutine context.
66+
*/
67+
public fun updateThreadContext(context: CoroutineContext): S
68+
69+
/**
70+
* Restores context of the current thread.
71+
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread
72+
* if [updateThreadContext] was previously invoked on resume of this coroutine.
73+
* The value of [oldState] is the result of the previous invocation of [updateThreadContext] and it should
74+
* be restored in the thread-local state by this function.
75+
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
76+
* context is updated in an undefined state and may crash an application.
77+
*
78+
* @param context the coroutine context.
79+
* @param oldState the value returned by the previous invocation of [updateThreadContext].
80+
*/
81+
public fun restoreThreadContext(context: CoroutineContext, oldState: S)
82+
}

0 commit comments

Comments
 (0)