From ae85797805f93f98bfad64f5f91ba97e4f2c1fbc Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Oct 2018 15:47:26 +0300 Subject: [PATCH] Cache result of ThreadContext#foldAll in the field of DispatchedCoroutine to avoid context fold in tight loops Fixes #537 --- .../kotlin/benchmarks/ChannelSinkBenchmark.kt | 32 +++++++++++++++---- .../src/Builders.common.kt | 2 +- .../src/CoroutineContext.common.kt | 3 +- .../src/Dispatched.kt | 10 +++--- .../src/ResumeMode.kt | 4 +-- .../src/internal/ThreadContext.common.kt | 9 ++++++ .../src/intrinsics/Undispatched.kt | 4 +-- .../src/CoroutineContext.kt | 4 +-- .../src/internal/ThreadContext.kt | 16 ++++++---- .../src/CoroutineContext.kt | 2 +- .../src/internal/ThreadContext.kt | 9 ++++++ .../src/CoroutineContext.kt | 2 +- .../src/internal/ThreadContext.kt | 9 ++++++ 13 files changed, 80 insertions(+), 26 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/src/internal/ThreadContext.common.kt create mode 100644 js/kotlinx-coroutines-core-js/src/internal/ThreadContext.kt create mode 100644 native/kotlinx-coroutines-core-native/src/internal/ThreadContext.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt index 020af22522..8b5e90aaf5 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt @@ -10,19 +10,39 @@ import org.openjdk.jmh.annotations.* import java.util.concurrent.* import kotlin.coroutines.* -@Warmup(iterations = 10, time = 1) -@Measurement(iterations = 10, time = 1) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) -@Fork(2) +@Fork(1) open class ChannelSinkBenchmark { + private val tl = ThreadLocal.withInitial({ 42 }) + private val tl2 = ThreadLocal.withInitial({ 239 }) + + private val unconfined = Dispatchers.Unconfined + private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement() + private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement() @Benchmark fun channelPipeline(): Int = runBlocking { - Channel - .range(1, 1_000_000, Dispatchers.Unconfined) - .filter(Dispatchers.Unconfined) { it % 4 == 0 } + run(unconfined) + } + + @Benchmark + fun channelPipelineOneThreadLocal(): Int = runBlocking { + run(unconfinedOneElement) + } + + @Benchmark + fun channelPipelineTwoThreadLocals(): Int = runBlocking { + run(unconfinedTwoElements) + } + + private suspend inline fun run(context: CoroutineContext): Int { + return Channel + .range(1, 1_000_000, context) + .filter(context) { it % 4 == 0 } .fold(0) { a, b -> a + b } } diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt index 4a4616a17f..b077b41d51 100644 --- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt +++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt @@ -138,7 +138,7 @@ public suspend fun withContext( if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED // There are changes in the context, so this thread needs to be updated - withCoroutineContext(newContext) { + withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt b/common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt index d831233539..785e8a7691 100644 --- a/common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt +++ b/common/kotlinx-coroutines-core-common/src/CoroutineContext.common.kt @@ -17,6 +17,7 @@ internal expect fun createDefaultDispatcher(): CoroutineDispatcher @Suppress("PropertyName") internal expect val DefaultDelay: Delay -internal expect inline fun withCoroutineContext(context: CoroutineContext, block: () -> T): T +// countOrElement -- pre-cached value for ThreadContext.kt +internal expect inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T internal expect fun Continuation<*>.toDebugString(): String internal expect val CoroutineContext.coroutineName: String? \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index a861d4dca6..811fe3e9fb 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -17,6 +17,8 @@ internal class DispatchedContinuation( ) : Continuation by continuation, DispatchedTask { private var _state: Any? = UNDEFINED public override var resumeMode: Int = 0 + @JvmField // pre-cached value to avoid ctx.fold on every resumption + internal val countOrElement = threadContextElements(context) override fun takeState(): Any? { val state = _state @@ -80,21 +82,21 @@ internal class DispatchedContinuation( @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeUndispatchedWith(result: Result) { - withCoroutineContext(context) { + withCoroutineContext(context, countOrElement) { continuation.resumeWith(result) } } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeUndispatched(value: T) { - withCoroutineContext(context) { + withCoroutineContext(context, countOrElement) { continuation.resume(value) } } @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack inline fun resumeUndispatchedWithException(exception: Throwable) { - withCoroutineContext(context) { + withCoroutineContext(context, countOrElement) { continuation.resumeWithException(exception) } } @@ -151,7 +153,7 @@ internal interface DispatchedTask : Runnable { val context = continuation.context val job = if (resumeMode.isCancellableMode) context[Job] else null val state = takeState() // NOTE: Must take state in any case, even if cancelled - withCoroutineContext(context) { + withCoroutineContext(context, delegate.countOrElement) { if (job != null && !job.isActive) continuation.resumeWithException(job.getCancellationException()) else { diff --git a/common/kotlinx-coroutines-core-common/src/ResumeMode.kt b/common/kotlinx-coroutines-core-common/src/ResumeMode.kt index 885dfa3362..0afea98c7f 100644 --- a/common/kotlinx-coroutines-core-common/src/ResumeMode.kt +++ b/common/kotlinx-coroutines-core-common/src/ResumeMode.kt @@ -43,7 +43,7 @@ internal fun Continuation.resumeUninterceptedMode(value: T, mode: Int) { MODE_ATOMIC_DEFAULT -> intercepted().resume(value) MODE_CANCELLABLE -> intercepted().resumeCancellable(value) MODE_DIRECT -> resume(value) - MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) } + MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) } MODE_IGNORE -> {} else -> error("Invalid mode $mode") } @@ -54,7 +54,7 @@ internal fun Continuation.resumeUninterceptedWithExceptionMode(exception: MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception) MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception) MODE_DIRECT -> resumeWithException(exception) - MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) } + MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resumeWithException(exception) } MODE_IGNORE -> {} else -> error("Invalid mode $mode") } diff --git a/common/kotlinx-coroutines-core-common/src/internal/ThreadContext.common.kt b/common/kotlinx-coroutines-core-common/src/internal/ThreadContext.common.kt new file mode 100644 index 0000000000..43b5dbe652 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/internal/ThreadContext.common.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlin.coroutines.* + +internal expect fun threadContextElements(context: CoroutineContext): Any diff --git a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt index b59bb44739..098aacc8e0 100644 --- a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt @@ -37,7 +37,7 @@ internal fun (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, */ internal fun (suspend () -> T).startCoroutineUndispatched(completion: Continuation) { startDirect(completion) { - withCoroutineContext(completion.context) { + withCoroutineContext(completion.context, null) { startCoroutineUninterceptedOrReturn(completion) } } @@ -50,7 +50,7 @@ internal fun (suspend () -> T).startCoroutineUndispatched(completion: Contin */ internal fun (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation) { startDirect(completion) { - withCoroutineContext(completion.context) { + withCoroutineContext(completion.context, null) { startCoroutineUninterceptedOrReturn(receiver, completion) } } diff --git a/core/kotlinx-coroutines-core/src/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/CoroutineContext.kt index 4d0c08eb00..bd586d6e2a 100644 --- a/core/kotlinx-coroutines-core/src/CoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/CoroutineContext.kt @@ -63,8 +63,8 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): /** * Executes a block using a given coroutine context. */ -internal actual inline fun withCoroutineContext(context: CoroutineContext, block: () -> T): T { - val oldValue = updateThreadContext(context) +internal actual inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T { + val oldValue = updateThreadContext(context, countOrElement) try { return block() } finally { diff --git a/core/kotlinx-coroutines-core/src/internal/ThreadContext.kt b/core/kotlinx-coroutines-core/src/internal/ThreadContext.kt index c2d1663db5..7dafb4711f 100644 --- a/core/kotlinx-coroutines-core/src/internal/ThreadContext.kt +++ b/core/kotlinx-coroutines-core/src/internal/ThreadContext.kt @@ -57,20 +57,24 @@ private val restoreState = return state } -internal fun updateThreadContext(context: CoroutineContext): Any? { - val count = context.fold(0, countAll) +internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!! + +// countOrElement is pre-cached in dispatched continuation +internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? { + @Suppress("NAME_SHADOWING") + val countOrElement = countOrElement ?: threadContextElements(context) @Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS") return when { - count === 0 -> ZERO // very fast path when there are no active ThreadContextElements + countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements // ^^^ identity comparison for speed, we know zero always has the same identity - count is Int -> { + countOrElement is Int -> { // slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values - context.fold(ThreadState(context, count), updateState) + context.fold(ThreadState(context, countOrElement), updateState) } else -> { // fast path for one ThreadContextElement (no allocations, no additional context scan) @Suppress("UNCHECKED_CAST") - val element = count as ThreadContextElement + val element = countOrElement as ThreadContextElement element.updateThreadContext(context) } } diff --git a/js/kotlinx-coroutines-core-js/src/CoroutineContext.kt b/js/kotlinx-coroutines-core-js/src/CoroutineContext.kt index 4c551a9c34..3f73823b4f 100644 --- a/js/kotlinx-coroutines-core-js/src/CoroutineContext.kt +++ b/js/kotlinx-coroutines-core-js/src/CoroutineContext.kt @@ -34,6 +34,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): } // No debugging facilities on JS -internal actual inline fun withCoroutineContext(context: CoroutineContext, block: () -> T): T = block() +internal actual inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block() internal actual fun Continuation<*>.toDebugString(): String = toString() internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS diff --git a/js/kotlinx-coroutines-core-js/src/internal/ThreadContext.kt b/js/kotlinx-coroutines-core-js/src/internal/ThreadContext.kt new file mode 100644 index 0000000000..d9daf256e3 --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/internal/ThreadContext.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlin.coroutines.* + +internal actual fun threadContextElements(context: CoroutineContext): Any = 0 diff --git a/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt b/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt index cc57b4ec7b..baa656bd77 100644 --- a/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt +++ b/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt @@ -44,6 +44,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): } // No debugging facilities on native -internal actual inline fun withCoroutineContext(context: CoroutineContext, block: () -> T): T = block() +internal actual inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block() internal actual fun Continuation<*>.toDebugString(): String = toString() internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native diff --git a/native/kotlinx-coroutines-core-native/src/internal/ThreadContext.kt b/native/kotlinx-coroutines-core-native/src/internal/ThreadContext.kt new file mode 100644 index 0000000000..d9daf256e3 --- /dev/null +++ b/native/kotlinx-coroutines-core-native/src/internal/ThreadContext.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlin.coroutines.* + +internal actual fun threadContextElements(context: CoroutineContext): Any = 0