Skip to content

Remove ThreadLocal from ThreadLocalMap when finishing UndispatchedCor… #3593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
uCont: Continuation<T>
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {

/*
* The state is thread-local because this coroutine can be used concurrently.
* Scenario of usage (withContinuationContext):
/**
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
* See the followin, boiled down example with inlined `withContinuationContext` body:
* ```
* val state = saveThreadContext(ctx)
* try {
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
Expand All @@ -178,8 +180,40 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
* // and it also calls saveThreadContext and clearThreadContext
* }
* ```
*
* Usage note:
*
* This part of the code is performance-sensitive.
* It is a well-established pattern to wrap various activities into system-specific undispatched
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
* undispatched coroutines.
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected.
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
* (You can read more about this effect as "GC nepotism").
*
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
* * It's never accessed when we are sure there are no thread context elements
* * It's cleaned up via [ThreadLocal.remove] as soon as the coroutine is suspended or finished.
*/
private val threadStateToRecover = ThreadLocal<Pair<CoroutineContext, Any?>>()

/*
* Indicates that a coroutine has at least one thread context element associated with it
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
* Better than nullable thread-local for easier debugging.
*
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
* (note: tl.get() initializes thread local),
* and is prone to false-positives as it is never reset: otherwise
* it may lead to logical data races between suspensions point where
* coroutine is yet being suspended in one thread while already being resumed
* in another.
*/
private var threadStateToRecover = ThreadLocal<Pair<CoroutineContext, Any?>>()
@Volatile
private var threadLocalIsSet = false

init {
/*
Expand Down Expand Up @@ -213,19 +247,22 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
}

fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
threadLocalIsSet = true // Specify that thread-local is touched at all
threadStateToRecover.set(context to oldValue)
}

fun clearThreadContext(): Boolean {
if (threadStateToRecover.get() == null) return false
threadStateToRecover.set(null)
return true
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
threadStateToRecover.remove()
}
}

override fun afterResume(state: Any?) {
threadStateToRecover.get()?.let { (ctx, value) ->
restoreThreadContext(ctx, value)
threadStateToRecover.set(null)
if (threadLocalIsSet) {
threadStateToRecover.get()?.let { (ctx, value) ->
restoreThreadContext(ctx, value)
}
threadStateToRecover.remove()
}
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
Expand Down