Skip to content

Commit fe0f570

Browse files
committed
Introduce CoroutineContextThreadLocal API to integrate with thread-local sensitive code
* Debug thread name is redesigned in the style of "thread local" where the name of thread reflect the name of currently coroutine. * Intrinsics for startCoroutineUndispatched that correspond to CoroutineStart.UNDISPATCHED properly update coroutine context. * New intrinsics named startCoroutineUnintercepted are introduced. They do not update thread context. * withContext logic is fixed properly update context is various situations. * DebugThreadNameTest is introduced. * Reporting of unhandled errors in TestBase is improved. Its CoroutineExceptionHandler records but does not rethrow exception. This makes sure that failed tests actually fail and do not hang in recursive attempt to handle unhandled coroutine exception. Fixes #119
1 parent a608a33 commit fe0f570

File tree

23 files changed

+415
-74
lines changed

23 files changed

+415
-74
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+7-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,11 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt {
138138
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
139139
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
140140
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
141-
public static final fun restoreThreadContext (Ljava/lang/String;)V
142-
public static final fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/String;
141+
}
142+
143+
public abstract interface class kotlinx/coroutines/experimental/CoroutineContextThreadLocal {
144+
public abstract fun restoreThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;Ljava/lang/Object;)V
145+
public abstract fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/Object;
143146
}
144147

145148
public abstract class kotlinx/coroutines/experimental/CoroutineDispatcher : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlin/coroutines/experimental/ContinuationInterceptor {
@@ -881,6 +884,8 @@ public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt {
881884
public final class kotlinx/coroutines/experimental/intrinsics/UndispatchedKt {
882885
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
883886
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
887+
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
888+
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
884889
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
885890
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
886891
}

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ configure(subprojects.findAll { !it.name.contains(sourceless) && it.name != "ben
105105
sourceSets {
106106
main.kotlin.srcDirs = ['src']
107107
test.kotlin.srcDirs = ['test']
108+
// todo: do we still need this workaround?
108109
if (!projectName.endsWith("-native")) {
109110
main.resources.srcDirs = ['resources']
111+
test.resources.srcDirs = ['test-resources']
110112
}
111113
}
112114
}

common/kotlinx-coroutines-core-common/src/Builders.common.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ public suspend fun <T> withContext(
119119
// fast path #3 if the new dispatcher is the same as the old one.
120120
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
121121
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
122-
val newContinuation = RunContinuationDirect(newContext, uCont)
123-
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
122+
val newContinuation = RunContinuationUndispatched(newContext, uCont)
123+
// There are some other changes in the context, so this thread needs to be updated
124+
withCoroutineContext(newContext) {
125+
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
126+
}
124127
}
125128
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
126129
require(!start.isLazy) { "$start start is not supported" }
@@ -130,7 +133,6 @@ public suspend fun <T> withContext(
130133
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
131134
)
132135
completion.initParentJobInternal(newContext[Job]) // attach to job
133-
@Suppress("DEPRECATION")
134136
start(block, completion)
135137
completion.getResult()
136138
}
@@ -172,10 +174,22 @@ private class LazyStandaloneCoroutine(
172174
}
173175
}
174176

175-
private class RunContinuationDirect<in T>(
177+
private class RunContinuationUndispatched<in T>(
176178
override val context: CoroutineContext,
177-
continuation: Continuation<T>
178-
) : Continuation<T> by continuation
179+
private val continuation: Continuation<T>
180+
): Continuation<T> {
181+
override fun resume(value: T) {
182+
withCoroutineContext(continuation.context) {
183+
continuation.resume(value)
184+
}
185+
}
186+
187+
override fun resumeWithException(exception: Throwable) {
188+
withCoroutineContext(continuation.context) {
189+
continuation.resumeWithException(exception)
190+
}
191+
}
192+
}
179193

180194
@Suppress("UNCHECKED_CAST")
181195
private class RunCompletion<in T>(

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
431431
// already complete -- select result
432432
if (select.trySelect(null)) {
433433
select.completion.context.checkCompletion() // always check for our completion
434-
block.startCoroutineUndispatched(select.completion)
434+
block.startCoroutineUnintercepted(select.completion)
435435
}
436436
return
437437
}
@@ -803,7 +803,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
803803
if (state is CompletedExceptionally)
804804
select.resumeSelectCancellableWithException(state.cause)
805805
else
806-
block.startCoroutineUndispatched(state as T, select.completion)
806+
block.startCoroutineUnintercepted(state as T, select.completion)
807807
}
808808
return
809809
}

common/kotlinx-coroutines-core-common/src/ResumeMode.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
4343
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
4444
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
4545
MODE_DIRECT -> resume(value)
46-
MODE_UNDISPATCHED -> resume(value)
46+
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
4747
MODE_IGNORE -> {}
4848
else -> error("Invalid mode $mode")
4949
}
@@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
5454
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
5555
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
5656
MODE_DIRECT -> resumeWithException(exception)
57-
MODE_UNDISPATCHED -> resumeWithException(exception)
57+
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
5858
MODE_IGNORE -> {}
5959
else -> error("Invalid mode $mode")
6060
}

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
415415
offerResult === ALREADY_SELECTED -> return
416416
offerResult === OFFER_FAILED -> {} // retry
417417
offerResult === OFFER_SUCCESS -> {
418-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
418+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
419419
return
420420
}
421421
offerResult is Closed<*> -> throw offerResult.sendException
@@ -754,7 +754,7 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
754754
pollResult === POLL_FAILED -> {} // retry
755755
pollResult is Closed<*> -> throw pollResult.receiveException
756756
else -> {
757-
block.startCoroutineUndispatched(pollResult as E, select.completion)
757+
block.startCoroutineUnintercepted(pollResult as E, select.completion)
758758
return
759759
}
760760
}
@@ -789,14 +789,14 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
789789
pollResult is Closed<*> -> {
790790
if (pollResult.closeCause == null) {
791791
if (select.trySelect(null))
792-
block.startCoroutineUndispatched(null, select.completion)
792+
block.startCoroutineUnintercepted(null, select.completion)
793793
return
794794
} else
795795
throw pollResult.closeCause
796796
}
797797
else -> {
798798
// selected successfully
799-
block.startCoroutineUndispatched(pollResult as E, select.completion)
799+
block.startCoroutineUnintercepted(pollResult as E, select.completion)
800800
return
801801
}
802802
}

common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
263263
select.resumeSelectCancellableWithException(it.sendException)
264264
return
265265
}
266-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
266+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
267267
}
268268

269269
@Suppress("DEPRECATION")

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

+46-15
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,69 @@ import kotlin.coroutines.experimental.*
99
import kotlin.coroutines.experimental.intrinsics.*
1010

1111
/**
12-
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
12+
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
13+
* when the code is already in the context of this coroutine.
14+
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
1315
*/
14-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
15-
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
16-
val value = try {
16+
public fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Continuation<T>) {
17+
startDirect(completion) {
1718
startCoroutineUninterceptedOrReturn(completion)
18-
} catch (e: Throwable) {
19-
completion.resumeWithException(e)
20-
return
2119
}
22-
if (value !== COROUTINE_SUSPENDED)
23-
completion.resume(value as T)
2420
}
2521

2622
/**
27-
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
23+
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
24+
* when the code is already in the context of this coroutine.
25+
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
26+
*/
27+
public fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) {
28+
startDirect(completion) {
29+
startCoroutineUninterceptedOrReturn(receiver, completion)
30+
}
31+
}
32+
33+
/**
34+
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
35+
* immediately execute coroutine in the current thread until next suspension.
36+
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
37+
*/
38+
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
39+
startDirect(completion) {
40+
withCoroutineContext(completion.context) {
41+
startCoroutineUninterceptedOrReturn(completion)
42+
}
43+
}
44+
}
45+
46+
/**
47+
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
48+
* immediately execute coroutine in the current thread until next suspension.
49+
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
2850
*/
29-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
3051
public fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
52+
startDirect(completion) {
53+
withCoroutineContext(completion.context) {
54+
startCoroutineUninterceptedOrReturn(receiver, completion)
55+
}
56+
}
57+
}
58+
59+
private inline fun <T> startDirect(completion: Continuation<T>, block: () -> Any?) {
3160
val value = try {
32-
startCoroutineUninterceptedOrReturn(receiver, completion)
61+
block()
3362
} catch (e: Throwable) {
3463
completion.resumeWithException(e)
3564
return
3665
}
37-
if (value !== COROUTINE_SUSPENDED)
66+
if (value !== COROUTINE_SUSPENDED) {
67+
@Suppress("UNCHECKED_CAST")
3868
completion.resume(value as T)
69+
}
3970
}
4071

4172
/**
4273
* Starts this coroutine with the given code [block] in the same context and returns result when it
43-
* completes without suspnesion.
74+
* completes without suspension.
4475
* This function shall be invoked at most once on this coroutine.
4576
*
4677
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
@@ -53,7 +84,7 @@ public fun <T> AbstractCoroutine<T>.startUndispatchedOrReturn(block: suspend ()
5384

5485
/**
5586
* Starts this coroutine with the given code [block] in the same context and returns result when it
56-
* completes without suspnesion.
87+
* completes without suspension.
5788
* This function shall be invoked at most once on this coroutine.
5889
*
5990
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it

common/kotlinx-coroutines-core-common/src/selects/Select.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ internal class SelectBuilderImpl<in R>(
408408
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
409409
if (time <= 0L) {
410410
if (trySelect(null))
411-
block.startCoroutineUndispatched(completion)
411+
block.startCoroutineUnintercepted(completion)
412412
return
413413
}
414414
val action = Runnable {

common/kotlinx-coroutines-core-common/src/sync/Mutex.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
253253
val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
254254
when {
255255
failure == null -> { // success
256-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
256+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
257257
return
258258
}
259259
failure === ALREADY_SELECTED -> return // already selected -- bail out

common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,6 @@ class SelectArrayChannelTest : TestBase() {
289289
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
290290
this as SelectBuilderImpl // type assertion
291291
if (!trySelect(null)) return
292-
block.startCoroutineUndispatched(this)
292+
block.startCoroutineUnintercepted(this)
293293
}
294294
}

common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,6 @@ class SelectRendezvousChannelTest : TestBase() {
310310
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
311311
this as SelectBuilderImpl // type assertion
312312
if (!trySelect(null)) return
313-
block.startCoroutineUndispatched(this)
313+
block.startCoroutineUnintercepted(this)
314314
}
315315
}

core/kotlinx-coroutines-core/src/CoroutineContext.kt

+37-22
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import java.util.*
78
import java.util.concurrent.atomic.AtomicLong
89
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
910
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -40,6 +41,17 @@ internal val DEBUG = run {
4041
}
4142
}
4243

44+
@Suppress("UNCHECKED_CAST")
45+
internal val coroutineContextThreadLocal: CoroutineContextThreadLocal<Any?>? = run {
46+
val services = ServiceLoader.load(CoroutineContextThreadLocal::class.java).toMutableList()
47+
if (DEBUG) services.add(0, DebugThreadName)
48+
when (services.size) {
49+
0 -> null
50+
1 -> services.single() as CoroutineContextThreadLocal<Any?>
51+
else -> CoroutineContextThreadLocalList((services as List<CoroutineContextThreadLocal<Any?>>).toTypedArray())
52+
}
53+
}
54+
4355
private val COROUTINE_ID = AtomicLong()
4456

4557
// for tests only
@@ -89,29 +101,37 @@ public actual fun newCoroutineContext(context: CoroutineContext, parent: Job? =
89101
* Executes a block using a given coroutine context.
90102
*/
91103
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
92-
val oldName = context.updateThreadContext()
104+
val oldValue = coroutineContextThreadLocal?.updateThreadContext(context)
93105
try {
94106
return block()
95107
} finally {
96-
restoreThreadContext(oldName)
108+
coroutineContextThreadLocal?.restoreThreadContext(context, oldValue)
97109
}
98110
}
99111

100-
@PublishedApi
101-
internal fun CoroutineContext.updateThreadContext(): String? {
102-
if (!DEBUG) return null
103-
val coroutineId = this[CoroutineId] ?: return null
104-
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
105-
val currentThread = Thread.currentThread()
106-
val oldName = currentThread.name
107-
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
108-
append(oldName)
109-
append(" @")
110-
append(coroutineName)
111-
append('#')
112-
append(coroutineId.id)
112+
private const val DEBUG_THREAD_NAME_SEPARATOR = " @"
113+
114+
private object DebugThreadName : CoroutineContextThreadLocal<String?> {
115+
override fun updateThreadContext(context: CoroutineContext): String? {
116+
val coroutineId = context[CoroutineId] ?: return null
117+
val coroutineName = context[CoroutineName]?.name ?: "coroutine"
118+
val currentThread = Thread.currentThread()
119+
val oldName = currentThread.name
120+
var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR)
121+
if (lastIndex < 0) lastIndex = oldName.length
122+
currentThread.name = buildString(lastIndex + coroutineName.length + 10) {
123+
append(oldName.substring(0, lastIndex))
124+
append(DEBUG_THREAD_NAME_SEPARATOR)
125+
append(coroutineName)
126+
append('#')
127+
append(coroutineId.id)
128+
}
129+
return oldName
130+
}
131+
132+
override fun restoreThreadContext(context: CoroutineContext, oldValue: String?) {
133+
if (oldValue != null) Thread.currentThread().name = oldValue
113134
}
114-
return oldName
115135
}
116136

117137
internal actual val CoroutineContext.coroutineName: String? get() {
@@ -121,12 +141,7 @@ internal actual val CoroutineContext.coroutineName: String? get() {
121141
return "$coroutineName#${coroutineId.id}"
122142
}
123143

124-
@PublishedApi
125-
internal fun restoreThreadContext(oldName: String?) {
126-
if (oldName != null) Thread.currentThread().name = oldName
127-
}
128-
129-
private class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) {
144+
internal data class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) {
130145
companion object Key : CoroutineContext.Key<CoroutineId>
131146
override fun toString(): String = "CoroutineId($id)"
132147
}

0 commit comments

Comments
 (0)