Skip to content

Commit 1408d7c

Browse files
committed
Introduce ThreadContextElement API to integrate with thread-local sensitive code
* Debug thread name is redesigned using ThreadContextElement API where the name of thread reflects the name of currently coroutine. * Intrinsics for startCoroutineUndispatched that correspond to CoroutineStart.UNDISPATCHED properly update coroutine context. * New intrinsics named startCoroutineUnintercepted are introduced. They do not update thread context. * withContext logic is fixed properly update context is various situations. * DebugThreadNameTest is introduced. * Reporting of unhandled errors in TestBase is improved. Its CoroutineExceptionHandler records but does not rethrow exception. This makes sure that failed tests actually fail and do not hang in recursive attempt to handle unhandled coroutine exception. Fixes #119
1 parent be065fd commit 1408d7c

File tree

21 files changed

+469
-74
lines changed

21 files changed

+469
-74
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+14-2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt {
136136
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
137137
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
138138
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
139-
public static final fun restoreThreadContext (Ljava/lang/String;)V
140-
public static final fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/String;
141139
}
142140

143141
public abstract class kotlinx/coroutines/experimental/CoroutineDispatcher : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlin/coroutines/experimental/ContinuationInterceptor {
@@ -436,6 +434,18 @@ public final class kotlinx/coroutines/experimental/ScheduledKt {
436434
public static synthetic fun withTimeoutOrNull$default (JLjava/util/concurrent/TimeUnit;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/experimental/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
437435
}
438436

437+
public abstract interface class kotlinx/coroutines/experimental/ThreadContextElement : kotlin/coroutines/experimental/CoroutineContext$Element {
438+
public abstract fun restoreThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;Ljava/lang/Object;)V
439+
public abstract fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/Object;
440+
}
441+
442+
public final class kotlinx/coroutines/experimental/ThreadContextElement$DefaultImpls {
443+
public static fun fold (Lkotlinx/coroutines/experimental/ThreadContextElement;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
444+
public static fun get (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext$Key;)Lkotlin/coroutines/experimental/CoroutineContext$Element;
445+
public static fun minusKey (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext$Key;)Lkotlin/coroutines/experimental/CoroutineContext;
446+
public static fun plus (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
447+
}
448+
439449
public final class kotlinx/coroutines/experimental/ThreadPoolDispatcher : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase {
440450
public fun close ()V
441451
public fun getExecutor ()Ljava/util/concurrent/Executor;
@@ -939,6 +949,8 @@ public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt {
939949
public final class kotlinx/coroutines/experimental/intrinsics/UndispatchedKt {
940950
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
941951
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
952+
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
953+
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
942954
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
943955
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
944956
}

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,10 @@ configure(subprojects.findAll { !it.name.contains(sourceless) && it.name != "ben
110110
sourceSets {
111111
main.kotlin.srcDirs = ['src']
112112
test.kotlin.srcDirs = ['test']
113+
// todo: do we still need this workaround?
113114
if (!projectName.endsWith("-native")) {
114115
main.resources.srcDirs = ['resources']
116+
test.resources.srcDirs = ['test-resources']
115117
}
116118
}
117119
}

common/kotlinx-coroutines-core-common/src/Builders.common.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ public suspend fun <T> withContext(
119119
// fast path #3 if the new dispatcher is the same as the old one.
120120
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
121121
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
122-
val newContinuation = RunContinuationDirect(newContext, uCont)
123-
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
122+
val newContinuation = RunContinuationUndispatched(newContext, uCont)
123+
// There are some other changes in the context, so this thread needs to be updated
124+
withCoroutineContext(newContext) {
125+
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
126+
}
124127
}
125128
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
126129
require(!start.isLazy) { "$start start is not supported" }
@@ -130,7 +133,6 @@ public suspend fun <T> withContext(
130133
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
131134
)
132135
completion.initParentJobInternal(newContext[Job]) // attach to job
133-
@Suppress("DEPRECATION")
134136
start(block, completion)
135137
completion.getResult()
136138
}
@@ -178,10 +180,22 @@ private class LazyStandaloneCoroutine(
178180
}
179181
}
180182

181-
private class RunContinuationDirect<in T>(
183+
private class RunContinuationUndispatched<in T>(
182184
override val context: CoroutineContext,
183-
continuation: Continuation<T>
184-
) : Continuation<T> by continuation
185+
private val continuation: Continuation<T>
186+
): Continuation<T> {
187+
override fun resume(value: T) {
188+
withCoroutineContext(continuation.context) {
189+
continuation.resume(value)
190+
}
191+
}
192+
193+
override fun resumeWithException(exception: Throwable) {
194+
withCoroutineContext(continuation.context) {
195+
continuation.resumeWithException(exception)
196+
}
197+
}
198+
}
185199

186200
@Suppress("UNCHECKED_CAST")
187201
private class RunCompletion<in T>(

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
531531
// already complete -- select result
532532
if (select.trySelect(null)) {
533533
select.completion.context.checkCompletion() // always check for our completion
534-
block.startCoroutineUndispatched(select.completion)
534+
block.startCoroutineUnintercepted(select.completion)
535535
}
536536
return
537537
}
@@ -992,7 +992,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
992992
if (state is CompletedExceptionally)
993993
select.resumeSelectCancellableWithException(state.cause)
994994
else
995-
block.startCoroutineUndispatched(state as T, select.completion)
995+
block.startCoroutineUnintercepted(state as T, select.completion)
996996
}
997997
return
998998
}

common/kotlinx-coroutines-core-common/src/ResumeMode.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
4343
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
4444
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
4545
MODE_DIRECT -> resume(value)
46-
MODE_UNDISPATCHED -> resume(value)
46+
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
4747
MODE_IGNORE -> {}
4848
else -> error("Invalid mode $mode")
4949
}
@@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
5454
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
5555
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
5656
MODE_DIRECT -> resumeWithException(exception)
57-
MODE_UNDISPATCHED -> resumeWithException(exception)
57+
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
5858
MODE_IGNORE -> {}
5959
else -> error("Invalid mode $mode")
6060
}

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
414414
offerResult === ALREADY_SELECTED -> return
415415
offerResult === OFFER_FAILED -> {} // retry
416416
offerResult === OFFER_SUCCESS -> {
417-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
417+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
418418
return
419419
}
420420
offerResult is Closed<*> -> throw offerResult.sendException
@@ -753,7 +753,7 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
753753
pollResult === POLL_FAILED -> {} // retry
754754
pollResult is Closed<*> -> throw pollResult.receiveException
755755
else -> {
756-
block.startCoroutineUndispatched(pollResult as E, select.completion)
756+
block.startCoroutineUnintercepted(pollResult as E, select.completion)
757757
return
758758
}
759759
}
@@ -788,14 +788,14 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
788788
pollResult is Closed<*> -> {
789789
if (pollResult.closeCause == null) {
790790
if (select.trySelect(null))
791-
block.startCoroutineUndispatched(null, select.completion)
791+
block.startCoroutineUnintercepted(null, select.completion)
792792
return
793793
} else
794794
throw pollResult.closeCause
795795
}
796796
else -> {
797797
// selected successfully
798-
block.startCoroutineUndispatched(pollResult as E, select.completion)
798+
block.startCoroutineUnintercepted(pollResult as E, select.completion)
799799
return
800800
}
801801
}

common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
262262
select.resumeSelectCancellableWithException(it.sendException)
263263
return
264264
}
265-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
265+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
266266
}
267267

268268
@Suppress("DEPRECATION")

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

+46-15
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,69 @@ import kotlin.coroutines.experimental.*
99
import kotlin.coroutines.experimental.intrinsics.*
1010

1111
/**
12-
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
12+
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
13+
* when the code is already in the context of this coroutine.
14+
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
1315
*/
14-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
15-
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
16-
val value = try {
16+
public fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Continuation<T>) {
17+
startDirect(completion) {
1718
startCoroutineUninterceptedOrReturn(completion)
18-
} catch (e: Throwable) {
19-
completion.resumeWithException(e)
20-
return
2119
}
22-
if (value !== COROUTINE_SUSPENDED)
23-
completion.resume(value as T)
2420
}
2521

2622
/**
27-
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
23+
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
24+
* when the code is already in the context of this coroutine.
25+
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
26+
*/
27+
public fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) {
28+
startDirect(completion) {
29+
startCoroutineUninterceptedOrReturn(receiver, completion)
30+
}
31+
}
32+
33+
/**
34+
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
35+
* immediately execute coroutine in the current thread until next suspension.
36+
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
37+
*/
38+
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
39+
startDirect(completion) {
40+
withCoroutineContext(completion.context) {
41+
startCoroutineUninterceptedOrReturn(completion)
42+
}
43+
}
44+
}
45+
46+
/**
47+
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
48+
* immediately execute coroutine in the current thread until next suspension.
49+
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
2850
*/
29-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
3051
public fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
52+
startDirect(completion) {
53+
withCoroutineContext(completion.context) {
54+
startCoroutineUninterceptedOrReturn(receiver, completion)
55+
}
56+
}
57+
}
58+
59+
private inline fun <T> startDirect(completion: Continuation<T>, block: () -> Any?) {
3160
val value = try {
32-
startCoroutineUninterceptedOrReturn(receiver, completion)
61+
block()
3362
} catch (e: Throwable) {
3463
completion.resumeWithException(e)
3564
return
3665
}
37-
if (value !== COROUTINE_SUSPENDED)
66+
if (value !== COROUTINE_SUSPENDED) {
67+
@Suppress("UNCHECKED_CAST")
3868
completion.resume(value as T)
69+
}
3970
}
4071

4172
/**
4273
* Starts this coroutine with the given code [block] in the same context and returns result when it
43-
* completes without suspnesion.
74+
* completes without suspension.
4475
* This function shall be invoked at most once on this coroutine.
4576
*
4677
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
@@ -53,7 +84,7 @@ public fun <T> AbstractCoroutine<T>.startUndispatchedOrReturn(block: suspend ()
5384

5485
/**
5586
* Starts this coroutine with the given code [block] in the same context and returns result when it
56-
* completes without suspnesion.
87+
* completes without suspension.
5788
* This function shall be invoked at most once on this coroutine.
5889
*
5990
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it

common/kotlinx-coroutines-core-common/src/selects/Select.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ internal class SelectBuilderImpl<in R>(
407407
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
408408
if (time <= 0L) {
409409
if (trySelect(null))
410-
block.startCoroutineUndispatched(completion)
410+
block.startCoroutineUnintercepted(completion)
411411
return
412412
}
413413
val action = Runnable {

common/kotlinx-coroutines-core-common/src/sync/Mutex.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
252252
val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
253253
when {
254254
failure == null -> { // success
255-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
255+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
256256
return
257257
}
258258
failure === ALREADY_SELECTED -> return // already selected -- bail out

common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,6 @@ class SelectArrayChannelTest : TestBase() {
289289
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
290290
this as SelectBuilderImpl // type assertion
291291
if (!trySelect(null)) return
292-
block.startCoroutineUndispatched(this)
292+
block.startCoroutineUnintercepted(this)
293293
}
294294
}

common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,6 @@ class SelectRendezvousChannelTest : TestBase() {
310310
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
311311
this as SelectBuilderImpl // type assertion
312312
if (!trySelect(null)) return
313-
block.startCoroutineUndispatched(this)
313+
block.startCoroutineUnintercepted(this)
314314
}
315315
}

core/kotlinx-coroutines-core/src/CoroutineContext.kt

+27-24
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import java.util.*
78
import kotlinx.coroutines.experimental.internal.*
89
import kotlinx.coroutines.experimental.scheduling.*
910
import java.util.concurrent.atomic.*
@@ -98,44 +99,46 @@ public actual fun newCoroutineContext(context: CoroutineContext, parent: Job? =
9899
* Executes a block using a given coroutine context.
99100
*/
100101
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
101-
val oldName = context.updateThreadContext()
102+
val oldValue = updateThreadContext(context)
102103
try {
103104
return block()
104105
} finally {
105-
restoreThreadContext(oldName)
106+
restoreThreadContext(context, oldValue)
106107
}
107108
}
108109

109-
@PublishedApi
110-
internal fun CoroutineContext.updateThreadContext(): String? {
111-
if (!DEBUG) return null
112-
val coroutineId = this[CoroutineId] ?: return null
113-
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
114-
val currentThread = Thread.currentThread()
115-
val oldName = currentThread.name
116-
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
117-
append(oldName)
118-
append(" @")
119-
append(coroutineName)
120-
append('#')
121-
append(coroutineId.id)
122-
}
123-
return oldName
124-
}
125-
126110
internal actual val CoroutineContext.coroutineName: String? get() {
127111
if (!DEBUG) return null
128112
val coroutineId = this[CoroutineId] ?: return null
129113
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
130114
return "$coroutineName#${coroutineId.id}"
131115
}
132116

133-
@PublishedApi
134-
internal fun restoreThreadContext(oldName: String?) {
135-
if (oldName != null) Thread.currentThread().name = oldName
136-
}
117+
private const val DEBUG_THREAD_NAME_SEPARATOR = " @"
137118

138-
private class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) {
119+
internal data class CoroutineId(
120+
val id: Long
121+
) : ThreadContextElement<String>, AbstractCoroutineContextElement(CoroutineId) {
139122
companion object Key : CoroutineContext.Key<CoroutineId>
140123
override fun toString(): String = "CoroutineId($id)"
124+
125+
override fun updateThreadContext(context: CoroutineContext): String {
126+
val coroutineName = context[CoroutineName]?.name ?: "coroutine"
127+
val currentThread = Thread.currentThread()
128+
val oldName = currentThread.name
129+
var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR)
130+
if (lastIndex < 0) lastIndex = oldName.length
131+
currentThread.name = buildString(lastIndex + coroutineName.length + 10) {
132+
append(oldName.substring(0, lastIndex))
133+
append(DEBUG_THREAD_NAME_SEPARATOR)
134+
append(coroutineName)
135+
append('#')
136+
append(id)
137+
}
138+
return oldName
139+
}
140+
141+
override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
142+
Thread.currentThread().name = oldState
143+
}
141144
}

0 commit comments

Comments
 (0)