Skip to content

Commit 1cebb93

Browse files
committed
Restore thread context elements when directly resuming to parent (WIP)
Note, that this fix has potentially severe performance impact since it always walk up the coroutine completion stack in search for parent UndispatchedCoroutine. :todo: figure out how to optimize it Fixes #985
1 parent 12a0318 commit 1cebb93

File tree

9 files changed

+304
-27
lines changed

9 files changed

+304
-27
lines changed

kotlinx-coroutines-core/common/src/Builders.common.kt

+3-11
Original file line numberDiff line numberDiff line change
@@ -196,25 +196,17 @@ private class LazyStandaloneCoroutine(
196196
}
197197

198198
// Used by withContext when context changes, but dispatcher stays the same
199-
private class UndispatchedCoroutine<in T>(
199+
internal expect class UndispatchedCoroutine<in T>(
200200
context: CoroutineContext,
201201
uCont: Continuation<T>
202-
) : ScopeCoroutine<T>(context, uCont) {
203-
override fun afterResume(state: Any?) {
204-
// resume undispatched -- update context by stay on the same dispatcher
205-
val result = recoverResult(state, uCont)
206-
withCoroutineContext(uCont.context, null) {
207-
uCont.resumeWith(result)
208-
}
209-
}
210-
}
202+
) : ScopeCoroutine<T>
211203

212204
private const val UNDECIDED = 0
213205
private const val SUSPENDED = 1
214206
private const val RESUMED = 2
215207

216208
// Used by withContext when context dispatcher changes
217-
private class DispatchedCoroutine<in T>(
209+
internal class DispatchedCoroutine<in T>(
218210
context: CoroutineContext,
219211
uCont: Continuation<T>
220212
) : ScopeCoroutine<T>(context, uCont) {

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -19,5 +19,6 @@ internal expect val DefaultDelay: Delay
1919

2020
// countOrElement -- pre-cached value for ThreadContext.kt
2121
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
22+
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2223
internal expect fun Continuation<*>.toDebugString(): String
2324
internal expect val CoroutineContext.coroutineName: String?

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal class DispatchedContinuation<in T>(
2323
@JvmField
2424
@Suppress("PropertyName")
2525
internal var _state: Any? = UNDEFINED
26-
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
26+
override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
2727
override fun getStackTraceElement(): StackTraceElement? = null
2828
@JvmField // pre-cached value to avoid ctx.fold on every resumption
2929
internal val countOrElement = threadContextElements(context)
@@ -206,7 +206,7 @@ internal class DispatchedContinuation<in T>(
206206

207207
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
208208
inline fun resumeUndispatchedWith(result: Result<T>) {
209-
withCoroutineContext(context, countOrElement) {
209+
withContinuationContext(continuation, countOrElement) {
210210
continuation.resumeWith(result)
211211
}
212212
}

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ internal abstract class DispatchedTask<in T>(
3737
try {
3838
val delegate = delegate as DispatchedContinuation<T>
3939
val continuation = delegate.continuation
40-
val context = continuation.context
41-
val state = takeState() // NOTE: Must take state in any case, even if cancelled
42-
withCoroutineContext(context, delegate.countOrElement) {
40+
withContinuationContext(continuation, delegate.countOrElement) {
41+
val state = takeState() // NOTE: Must take state in any case, even if cancelled
4342
val exception = getExceptionalResult(state)
44-
val job = if (resumeMode.isCancellableMode) context[Job] else null
43+
val job = if (resumeMode.isCancellableMode) continuation.context[Job] else null
4544
/*
4645
* Check whether continuation was originally resumed with an exception.
4746
* If so, it dominates cancellation, otherwise the original exception

kotlinx-coroutines-core/js/src/CoroutineContext.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
78
import kotlin.browser.*
89
import kotlin.coroutines.*
910

@@ -49,5 +50,13 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4950

5051
// No debugging facilities on JS
5152
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
53+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T = block()
5254
internal actual fun Continuation<*>.toDebugString(): String = toString()
53-
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
55+
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
56+
57+
internal actual class UndispatchedCoroutine<in T> actual constructor(
58+
context: CoroutineContext,
59+
uCont: Continuation<T>
60+
) : ScopeCoroutine<T>(context, uCont) {
61+
override fun afterResume(state: Any?) = uCont.resumeWith(recoverResult(state, uCont))
62+
}

kotlinx-coroutines-core/jvm/src/CoroutineContext.kt

+70-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6+
57
package kotlinx.coroutines
68

79
import kotlinx.coroutines.internal.*
810
import kotlinx.coroutines.scheduling.*
9-
import java.util.concurrent.atomic.*
1011
import kotlin.coroutines.*
12+
import kotlin.coroutines.jvm.internal.*
1113

1214
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
1315

@@ -48,6 +50,72 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c
4850
}
4951
}
5052

53+
/**
54+
* Executes a block using a context of a given continuation.
55+
*/
56+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
57+
val context = continuation.context
58+
val oldValue = updateThreadContext(context, countOrElement)
59+
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
60+
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
61+
continuation.undispatchedCompletion()
62+
} else
63+
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
64+
undispatchedCompletion?.saveThreadContext(context, oldValue)
65+
try {
66+
return block()
67+
} finally {
68+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext())
69+
restoreThreadContext(context, oldValue)
70+
}
71+
}
72+
73+
internal tailrec fun Continuation<*>.undispatchedCompletion(): UndispatchedCoroutine<*>? {
74+
// Find direct completion of this continuation
75+
val completion: Continuation<*> = when (this) {
76+
is BaseContinuationImpl -> completion ?: return null // regular suspending function -- direct resume
77+
is DispatchedCoroutine -> return null // dispatches on resume
78+
is ScopeCoroutine -> uCont // other scoped coroutine -- direct resume
79+
else -> return null // something else -- not supported
80+
}
81+
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
82+
return completion.undispatchedCompletion() // walk up the call stack with tail call
83+
}
84+
85+
// Used by withContext when context changes, but dispatcher stays the same
86+
internal actual class UndispatchedCoroutine<in T> actual constructor(
87+
context: CoroutineContext,
88+
uCont: Continuation<T>
89+
) : ScopeCoroutine<T>(context, uCont) {
90+
private var savedContext: CoroutineContext? = null
91+
private var savedOldValue: Any? = null
92+
93+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
94+
savedContext = context
95+
savedOldValue = oldValue
96+
}
97+
98+
fun clearThreadContext(): Boolean {
99+
if (savedContext == null) return false
100+
savedContext = null
101+
savedOldValue = null
102+
return true
103+
}
104+
105+
override fun afterResume(state: Any?) {
106+
savedContext?.let { context ->
107+
restoreThreadContext(context, savedOldValue)
108+
savedContext = null
109+
savedOldValue = null
110+
}
111+
// resume undispatched -- update context but stay on the same dispatcher
112+
val result = recoverResult(state, uCont)
113+
withContinuationContext(uCont, null) {
114+
uCont.resumeWith(result)
115+
}
116+
}
117+
}
118+
51119
internal actual val CoroutineContext.coroutineName: String? get() {
52120
if (!DEBUG) return null
53121
val coroutineId = this[CoroutineId] ?: return null

kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ package kotlinx.coroutines.internal
77
import kotlinx.coroutines.*
88
import kotlin.coroutines.*
99

10-
11-
private val ZERO = Symbol("ZERO")
10+
@JvmField
11+
internal val NO_THREAD_ELEMENTS = Symbol("NO_THREAD_ELEMENTS")
1212

1313
// Used when there are >= 2 active elements in the context
1414
private class ThreadState(val context: CoroutineContext, n: Int) {
@@ -60,12 +60,13 @@ private val restoreState =
6060
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!
6161

6262
// countOrElement is pre-cached in dispatched continuation
63+
// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements
6364
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
6465
@Suppress("NAME_SHADOWING")
6566
val countOrElement = countOrElement ?: threadContextElements(context)
6667
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
6768
return when {
68-
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
69+
countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements
6970
// ^^^ identity comparison for speed, we know zero always has the same identity
7071
countOrElement is Int -> {
7172
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
@@ -82,7 +83,7 @@ internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?
8283

8384
internal fun restoreThreadContext(context: CoroutineContext, oldState: Any?) {
8485
when {
85-
oldState === ZERO -> return // very fast path when there are no ThreadContextElements
86+
oldState === NO_THREAD_ELEMENTS -> return // very fast path when there are no ThreadContextElements
8687
oldState is ThreadState -> {
8788
// slow path with multiple stored ThreadContextElements
8889
oldState.start()

0 commit comments

Comments
 (0)