Skip to content

Commit 9d9dbff

Browse files
committed
Restore thread context elements when directly resuming to parent (WIP)
Note, that this fix has potentially severe performance impact since it always walk up the coroutine completion stack in search for parent UndispatchedCoroutine. :todo: figure out how to optimize it Fixes #985
1 parent f7656ea commit 9d9dbff

9 files changed

+301
-27
lines changed

kotlinx-coroutines-core/common/src/Builders.common.kt

+3-11
Original file line numberDiff line numberDiff line change
@@ -196,25 +196,17 @@ private class LazyStandaloneCoroutine(
196196
}
197197

198198
// Used by withContext when context changes, but dispatcher stays the same
199-
private class UndispatchedCoroutine<in T>(
199+
internal expect class UndispatchedCoroutine<in T>(
200200
context: CoroutineContext,
201201
uCont: Continuation<T>
202-
) : ScopeCoroutine<T>(context, uCont) {
203-
override fun afterResume(state: Any?) {
204-
// resume undispatched -- update context by stay on the same dispatcher
205-
val result = recoverResult(state, uCont)
206-
withCoroutineContext(uCont.context, null) {
207-
uCont.resumeWith(result)
208-
}
209-
}
210-
}
202+
) : ScopeCoroutine<T>
211203

212204
private const val UNDECIDED = 0
213205
private const val SUSPENDED = 1
214206
private const val RESUMED = 2
215207

216208
// Used by withContext when context dispatcher changes
217-
private class DispatchedCoroutine<in T>(
209+
internal class DispatchedCoroutine<in T>(
218210
context: CoroutineContext,
219211
uCont: Continuation<T>
220212
) : ScopeCoroutine<T>(context, uCont) {

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -19,5 +19,6 @@ internal expect val DefaultDelay: Delay
1919

2020
// countOrElement -- pre-cached value for ThreadContext.kt
2121
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
22+
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2223
internal expect fun Continuation<*>.toDebugString(): String
2324
internal expect val CoroutineContext.coroutineName: String?

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ internal class DispatchedContinuation<in T>(
203203

204204
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
205205
inline fun resumeUndispatchedWith(result: Result<T>) {
206-
withCoroutineContext(context, countOrElement) {
206+
withContinuationContext(continuation, countOrElement) {
207207
continuation.resumeWith(result)
208208
}
209209
}

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ internal abstract class DispatchedTask<in T>(
3737
try {
3838
val delegate = delegate as DispatchedContinuation<T>
3939
val continuation = delegate.continuation
40-
val context = continuation.context
41-
val state = takeState() // NOTE: Must take state in any case, even if cancelled
42-
withCoroutineContext(context, delegate.countOrElement) {
40+
withContinuationContext(continuation, delegate.countOrElement) {
41+
val state = takeState() // NOTE: Must take state in any case, even if cancelled
4342
val exception = getExceptionalResult(state)
44-
val job = if (resumeMode.isCancellableMode) context[Job] else null
43+
val job = if (resumeMode.isCancellableMode) continuation.context[Job] else null
4544
/*
4645
* Check whether continuation was originally resumed with an exception.
4746
* If so, it dominates cancellation, otherwise the original exception

kotlinx-coroutines-core/common/test/TestBase.common.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("unused")
@@ -71,9 +71,8 @@ public class RecoverableTestCancellationException(message: String? = null) : Can
7171
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
7272
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
7373
return object : CoroutineDispatcher() {
74-
override fun dispatch(context: CoroutineContext, block: Runnable) {
75-
dispatcher.dispatch(context, block)
76-
}
74+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
75+
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)
7776
}
7877
}
7978

kotlinx-coroutines-core/js/src/CoroutineContext.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
78
import kotlin.browser.*
89
import kotlin.coroutines.*
910

@@ -49,5 +50,13 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4950

5051
// No debugging facilities on JS
5152
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
53+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T = block()
5254
internal actual fun Continuation<*>.toDebugString(): String = toString()
53-
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
55+
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
56+
57+
internal actual class UndispatchedCoroutine<in T> actual constructor(
58+
context: CoroutineContext,
59+
uCont: Continuation<T>
60+
) : ScopeCoroutine<T>(context, uCont) {
61+
override fun afterResume(state: Any?) = uCont.resumeWith(recoverResult(state, uCont))
62+
}

kotlinx-coroutines-core/jvm/src/CoroutineContext.kt

+69-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6+
57
package kotlinx.coroutines
68

79
import kotlinx.coroutines.internal.*
810
import kotlinx.coroutines.scheduling.*
9-
import java.util.concurrent.atomic.*
1011
import kotlin.coroutines.*
12+
import kotlin.coroutines.jvm.internal.*
1113

1214
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
1315

@@ -48,6 +50,71 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c
4850
}
4951
}
5052

53+
/**
54+
* Executes a block using a context of a given continuation.
55+
*/
56+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
57+
val context = continuation.context
58+
val oldValue = updateThreadContext(context, countOrElement)
59+
val undispatchedCompletion = continuation.undispatchedCompletion()
60+
undispatchedCompletion?.saveThreadContext(context, oldValue)
61+
try {
62+
return block()
63+
} finally {
64+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext())
65+
restoreThreadContext(context, oldValue)
66+
}
67+
}
68+
69+
internal tailrec fun Continuation<*>.undispatchedCompletion(): UndispatchedCoroutine<*>? {
70+
val completion = directCompletion() ?: return null
71+
if (completion is UndispatchedCoroutine<*>) return completion
72+
return completion.undispatchedCompletion()
73+
}
74+
75+
private fun Continuation<*>.directCompletion(): Continuation<*>? = when (this) {
76+
is BaseContinuationImpl -> completion // regular suspending function -- direct resume
77+
is DispatchedCoroutine -> null // dispatches on resume
78+
is ScopeCoroutine -> uCont // other scoped coroutine -- direct resume
79+
else -> null
80+
}
81+
82+
// Used by withContext when context changes, but dispatcher stays the same
83+
internal actual class UndispatchedCoroutine<in T> actual constructor(
84+
context: CoroutineContext,
85+
uCont: Continuation<T>
86+
) : ScopeCoroutine<T>(context, uCont) {
87+
private var savedContext: CoroutineContext? = null
88+
private var savedOldValue: Any? = null
89+
90+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
91+
savedContext = context
92+
savedOldValue = oldValue
93+
}
94+
95+
fun clearThreadContext(): Boolean {
96+
if (savedContext == null) return false
97+
savedContext = null
98+
savedOldValue = null
99+
return true
100+
}
101+
102+
override fun afterResume(state: Any?) {
103+
savedContext?.let { context ->
104+
restoreThreadContext(context, savedOldValue)
105+
savedContext = null
106+
savedOldValue = null
107+
}
108+
// resume undispatched -- update context but stay on the same dispatcher
109+
val result = recoverResult(state, uCont)
110+
withContinuationContext(uCont, null) {
111+
uCont.resumeWith(result)
112+
}
113+
}
114+
115+
}
116+
117+
51118
internal actual val CoroutineContext.coroutineName: String? get() {
52119
if (!DEBUG) return null
53120
val coroutineId = this[CoroutineId] ?: return null

0 commit comments

Comments
 (0)