Skip to content

Commit 5d0298f

Browse files
elizarovqwwdfsad
authored andcommitted
Restore thread context elements when directly resuming to parent
This fix solves the problem of restoring thread-context when returning to another context in undispatched way. It impacts suspend/resume performance of coroutines that use ThreadContextElement since we have to walk up the coroutine completion stack in search for parent UndispatchedCoroutine. However, there is a fast-path to ensure that there is no performance impact in cases when ThreadContextElement is not used by a coroutine. Fixes #985
1 parent 167c44e commit 5d0298f

File tree

9 files changed

+299
-21
lines changed

9 files changed

+299
-21
lines changed

kotlinx-coroutines-core/common/src/Builders.common.kt

+3-11
Original file line numberDiff line numberDiff line change
@@ -207,25 +207,17 @@ private class LazyStandaloneCoroutine(
207207
}
208208

209209
// Used by withContext when context changes, but dispatcher stays the same
210-
private class UndispatchedCoroutine<in T>(
210+
internal expect class UndispatchedCoroutine<in T>(
211211
context: CoroutineContext,
212212
uCont: Continuation<T>
213-
) : ScopeCoroutine<T>(context, uCont) {
214-
override fun afterResume(state: Any?) {
215-
// resume undispatched -- update context by stay on the same dispatcher
216-
val result = recoverResult(state, uCont)
217-
withCoroutineContext(uCont.context, null) {
218-
uCont.resumeWith(result)
219-
}
220-
}
221-
}
213+
) : ScopeCoroutine<T>
222214

223215
private const val UNDECIDED = 0
224216
private const val SUSPENDED = 1
225217
private const val RESUMED = 2
226218

227219
// Used by withContext when context dispatcher changes
228-
private class DispatchedCoroutine<in T>(
220+
internal class DispatchedCoroutine<in T>(
229221
context: CoroutineContext,
230222
uCont: Continuation<T>
231223
) : ScopeCoroutine<T>(context, uCont) {

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ internal expect val DefaultDelay: Delay
1919

2020
// countOrElement -- pre-cached value for ThreadContext.kt
2121
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
22+
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2223
internal expect fun Continuation<*>.toDebugString(): String
2324
internal expect val CoroutineContext.coroutineName: String?

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal class DispatchedContinuation<in T>(
2323
@JvmField
2424
@Suppress("PropertyName")
2525
internal var _state: Any? = UNDEFINED
26-
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
26+
override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
2727
override fun getStackTraceElement(): StackTraceElement? = null
2828
@JvmField // pre-cached value to avoid ctx.fold on every resumption
2929
internal val countOrElement = threadContextElements(context)
@@ -235,7 +235,7 @@ internal class DispatchedContinuation<in T>(
235235

236236
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
237237
inline fun resumeUndispatchedWith(result: Result<T>) {
238-
withCoroutineContext(context, countOrElement) {
238+
withContinuationContext(continuation, countOrElement) {
239239
continuation.resumeWith(result)
240240
}
241241
}

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ internal abstract class DispatchedTask<in T>(
8585
try {
8686
val delegate = delegate as DispatchedContinuation<T>
8787
val continuation = delegate.continuation
88-
val context = continuation.context
89-
val state = takeState() // NOTE: Must take state in any case, even if cancelled
90-
withCoroutineContext(context, delegate.countOrElement) {
88+
withContinuationContext(continuation, delegate.countOrElement) {
89+
val context = continuation.context
90+
val state = takeState() // NOTE: Must take state in any case, even if cancelled
9191
val exception = getExceptionalResult(state)
9292
/*
9393
* Check whether continuation was originally resumed with an exception.

kotlinx-coroutines-core/js/src/CoroutineContext.kt

+9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
78
import kotlin.browser.*
89
import kotlin.coroutines.*
910

@@ -49,5 +50,13 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4950

5051
// No debugging facilities on JS
5152
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
53+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T = block()
5254
internal actual fun Continuation<*>.toDebugString(): String = toString()
5355
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
56+
57+
internal actual class UndispatchedCoroutine<in T> actual constructor(
58+
context: CoroutineContext,
59+
uCont: Continuation<T>
60+
) : ScopeCoroutine<T>(context, uCont) {
61+
override fun afterResume(state: Any?) = uCont.resumeWith(recoverResult(state, uCont))
62+
}

kotlinx-coroutines-core/jvm/src/CoroutineContext.kt

+69-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6+
57
package kotlinx.coroutines
68

79
import kotlinx.coroutines.internal.*
810
import kotlinx.coroutines.scheduling.*
9-
import java.util.concurrent.atomic.*
1011
import kotlin.coroutines.*
12+
import kotlin.coroutines.jvm.internal.*
1113

1214
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
1315

@@ -48,6 +50,72 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c
4850
}
4951
}
5052

53+
/**
54+
* Executes a block using a context of a given continuation.
55+
*/
56+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
57+
val context = continuation.context
58+
val oldValue = updateThreadContext(context, countOrElement)
59+
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
60+
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
61+
continuation.undispatchedCompletion()
62+
} else
63+
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
64+
undispatchedCompletion?.saveThreadContext(context, oldValue)
65+
try {
66+
return block()
67+
} finally {
68+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext())
69+
restoreThreadContext(context, oldValue)
70+
}
71+
}
72+
73+
internal tailrec fun Continuation<*>.undispatchedCompletion(): UndispatchedCoroutine<*>? {
74+
// Find direct completion of this continuation
75+
val completion: Continuation<*> = when (this) {
76+
is BaseContinuationImpl -> completion ?: return null // regular suspending function -- direct resume
77+
is DispatchedCoroutine -> return null // dispatches on resume
78+
is ScopeCoroutine -> uCont // other scoped coroutine -- direct resume
79+
else -> return null // something else -- not supported
80+
}
81+
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
82+
return completion.undispatchedCompletion() // walk up the call stack with tail call
83+
}
84+
85+
// Used by withContext when context changes, but dispatcher stays the same
86+
internal actual class UndispatchedCoroutine<in T> actual constructor(
87+
context: CoroutineContext,
88+
uCont: Continuation<T>
89+
) : ScopeCoroutine<T>(context, uCont) {
90+
private var savedContext: CoroutineContext? = null
91+
private var savedOldValue: Any? = null
92+
93+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
94+
savedContext = context
95+
savedOldValue = oldValue
96+
}
97+
98+
fun clearThreadContext(): Boolean {
99+
if (savedContext == null) return false
100+
savedContext = null
101+
savedOldValue = null
102+
return true
103+
}
104+
105+
override fun afterResume(state: Any?) {
106+
savedContext?.let { context ->
107+
restoreThreadContext(context, savedOldValue)
108+
savedContext = null
109+
savedOldValue = null
110+
}
111+
// resume undispatched -- update context but stay on the same dispatcher
112+
val result = recoverResult(state, uCont)
113+
withContinuationContext(uCont, null) {
114+
uCont.resumeWith(result)
115+
}
116+
}
117+
}
118+
51119
internal actual val CoroutineContext.coroutineName: String? get() {
52120
if (!DEBUG) return null
53121
val coroutineId = this[CoroutineId] ?: return null

kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ package kotlinx.coroutines.internal
77
import kotlinx.coroutines.*
88
import kotlin.coroutines.*
99

10-
11-
private val ZERO = Symbol("ZERO")
10+
@JvmField
11+
internal val NO_THREAD_ELEMENTS = Symbol("NO_THREAD_ELEMENTS")
1212

1313
// Used when there are >= 2 active elements in the context
1414
private class ThreadState(val context: CoroutineContext, n: Int) {
@@ -60,12 +60,13 @@ private val restoreState =
6060
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!
6161

6262
// countOrElement is pre-cached in dispatched continuation
63+
// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements
6364
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
6465
@Suppress("NAME_SHADOWING")
6566
val countOrElement = countOrElement ?: threadContextElements(context)
6667
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
6768
return when {
68-
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
69+
countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements
6970
// ^^^ identity comparison for speed, we know zero always has the same identity
7071
countOrElement is Int -> {
7172
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
@@ -82,7 +83,7 @@ internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?
8283

8384
internal fun restoreThreadContext(context: CoroutineContext, oldState: Any?) {
8485
when {
85-
oldState === ZERO -> return // very fast path when there are no ThreadContextElements
86+
oldState === NO_THREAD_ELEMENTS -> return // very fast path when there are no ThreadContextElements
8687
oldState is ThreadState -> {
8788
// slow path with multiple stored ThreadContextElements
8889
oldState.start()

0 commit comments

Comments
 (0)