Skip to content

Commit ae85797

Browse files
committed
Cache result of ThreadContext#foldAll in the field of DispatchedCoroutine to avoid context fold in tight loops
Fixes #537
1 parent ca98207 commit ae85797

File tree

13 files changed

+80
-26
lines changed

13 files changed

+80
-26
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt

+26-6
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,39 @@ import org.openjdk.jmh.annotations.*
1010
import java.util.concurrent.*
1111
import kotlin.coroutines.*
1212

13-
@Warmup(iterations = 10, time = 1)
14-
@Measurement(iterations = 10, time = 1)
13+
@Warmup(iterations = 5, time = 1)
14+
@Measurement(iterations = 5, time = 1)
1515
@BenchmarkMode(Mode.AverageTime)
1616
@OutputTimeUnit(TimeUnit.MILLISECONDS)
1717
@State(Scope.Benchmark)
18-
@Fork(2)
18+
@Fork(1)
1919
open class ChannelSinkBenchmark {
20+
private val tl = ThreadLocal.withInitial({ 42 })
21+
private val tl2 = ThreadLocal.withInitial({ 239 })
22+
23+
private val unconfined = Dispatchers.Unconfined
24+
private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
25+
private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement()
2026

2127
@Benchmark
2228
fun channelPipeline(): Int = runBlocking {
23-
Channel
24-
.range(1, 1_000_000, Dispatchers.Unconfined)
25-
.filter(Dispatchers.Unconfined) { it % 4 == 0 }
29+
run(unconfined)
30+
}
31+
32+
@Benchmark
33+
fun channelPipelineOneThreadLocal(): Int = runBlocking {
34+
run(unconfinedOneElement)
35+
}
36+
37+
@Benchmark
38+
fun channelPipelineTwoThreadLocals(): Int = runBlocking {
39+
run(unconfinedTwoElements)
40+
}
41+
42+
private suspend inline fun run(context: CoroutineContext): Int {
43+
return Channel
44+
.range(1, 1_000_000, context)
45+
.filter(context) { it % 4 == 0 }
2646
.fold(0) { a, b -> a + b }
2747
}
2848

common/kotlinx-coroutines-core-common/src/Builders.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public suspend fun <T> withContext(
138138
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
139139
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
140140
// There are changes in the context, so this thread needs to be updated
141-
withCoroutineContext(newContext) {
141+
withCoroutineContext(newContext, null) {
142142
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
143143
}
144144
}

common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ internal expect fun createDefaultDispatcher(): CoroutineDispatcher
1717
@Suppress("PropertyName")
1818
internal expect val DefaultDelay: Delay
1919

20-
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T
20+
// countOrElement -- pre-cached value for ThreadContext.kt
21+
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
2122
internal expect fun Continuation<*>.toDebugString(): String
2223
internal expect val CoroutineContext.coroutineName: String?

common/kotlinx-coroutines-core-common/src/Dispatched.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ internal class DispatchedContinuation<in T>(
1717
) : Continuation<T> by continuation, DispatchedTask<T> {
1818
private var _state: Any? = UNDEFINED
1919
public override var resumeMode: Int = 0
20+
@JvmField // pre-cached value to avoid ctx.fold on every resumption
21+
internal val countOrElement = threadContextElements(context)
2022

2123
override fun takeState(): Any? {
2224
val state = _state
@@ -80,21 +82,21 @@ internal class DispatchedContinuation<in T>(
8082

8183
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
8284
inline fun resumeUndispatchedWith(result: Result<T>) {
83-
withCoroutineContext(context) {
85+
withCoroutineContext(context, countOrElement) {
8486
continuation.resumeWith(result)
8587
}
8688
}
8789

8890
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
8991
inline fun resumeUndispatched(value: T) {
90-
withCoroutineContext(context) {
92+
withCoroutineContext(context, countOrElement) {
9193
continuation.resume(value)
9294
}
9395
}
9496

9597
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
9698
inline fun resumeUndispatchedWithException(exception: Throwable) {
97-
withCoroutineContext(context) {
99+
withCoroutineContext(context, countOrElement) {
98100
continuation.resumeWithException(exception)
99101
}
100102
}
@@ -151,7 +153,7 @@ internal interface DispatchedTask<in T> : Runnable {
151153
val context = continuation.context
152154
val job = if (resumeMode.isCancellableMode) context[Job] else null
153155
val state = takeState() // NOTE: Must take state in any case, even if cancelled
154-
withCoroutineContext(context) {
156+
withCoroutineContext(context, delegate.countOrElement) {
155157
if (job != null && !job.isActive)
156158
continuation.resumeWithException(job.getCancellationException())
157159
else {

common/kotlinx-coroutines-core-common/src/ResumeMode.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
4343
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
4444
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
4545
MODE_DIRECT -> resume(value)
46-
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
46+
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
4747
MODE_IGNORE -> {}
4848
else -> error("Invalid mode $mode")
4949
}
@@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
5454
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
5555
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
5656
MODE_DIRECT -> resumeWithException(exception)
57-
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
57+
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resumeWithException(exception) }
5858
MODE_IGNORE -> {}
5959
else -> error("Invalid mode $mode")
6060
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal expect fun threadContextElements(context: CoroutineContext): Any

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R,
3737
*/
3838
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
3939
startDirect(completion) {
40-
withCoroutineContext(completion.context) {
40+
withCoroutineContext(completion.context, null) {
4141
startCoroutineUninterceptedOrReturn(completion)
4242
}
4343
}
@@ -50,7 +50,7 @@ internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Contin
5050
*/
5151
internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
5252
startDirect(completion) {
53-
withCoroutineContext(completion.context) {
53+
withCoroutineContext(completion.context, null) {
5454
startCoroutineUninterceptedOrReturn(receiver, completion)
5555
}
5656
}

core/kotlinx-coroutines-core/src/CoroutineContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
6363
/**
6464
* Executes a block using a given coroutine context.
6565
*/
66-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
67-
val oldValue = updateThreadContext(context)
66+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
67+
val oldValue = updateThreadContext(context, countOrElement)
6868
try {
6969
return block()
7070
} finally {

core/kotlinx-coroutines-core/src/internal/ThreadContext.kt

+10-6
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,24 @@ private val restoreState =
5757
return state
5858
}
5959

60-
internal fun updateThreadContext(context: CoroutineContext): Any? {
61-
val count = context.fold(0, countAll)
60+
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!
61+
62+
// countOrElement is pre-cached in dispatched continuation
63+
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
64+
@Suppress("NAME_SHADOWING")
65+
val countOrElement = countOrElement ?: threadContextElements(context)
6266
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
6367
return when {
64-
count === 0 -> ZERO // very fast path when there are no active ThreadContextElements
68+
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
6569
// ^^^ identity comparison for speed, we know zero always has the same identity
66-
count is Int -> {
70+
countOrElement is Int -> {
6771
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
68-
context.fold(ThreadState(context, count), updateState)
72+
context.fold(ThreadState(context, countOrElement), updateState)
6973
}
7074
else -> {
7175
// fast path for one ThreadContextElement (no allocations, no additional context scan)
7276
@Suppress("UNCHECKED_CAST")
73-
val element = count as ThreadContextElement<Any?>
77+
val element = countOrElement as ThreadContextElement<Any?>
7478
element.updateThreadContext(context)
7579
}
7680
}

js/kotlinx-coroutines-core-js/src/CoroutineContext.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
3434
}
3535

3636
// No debugging facilities on JS
37-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
37+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
3838
internal actual fun Continuation<*>.toDebugString(): String = toString()
3939
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal actual fun threadContextElements(context: CoroutineContext): Any = 0

native/kotlinx-coroutines-core-native/src/CoroutineContext.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4444
}
4545

4646
// No debugging facilities on native
47-
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
47+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
4848
internal actual fun Continuation<*>.toDebugString(): String = toString()
4949
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
internal actual fun threadContextElements(context: CoroutineContext): Any = 0

0 commit comments

Comments
 (0)