Skip to content

Cache result of ThreadContext.foldAll in the field of DispatchedCorou… #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,39 @@ import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
import kotlin.coroutines.*

@Warmup(iterations = 10, time = 1)
@Measurement(iterations = 10, time = 1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(2)
@Fork(1)
open class ChannelSinkBenchmark {
private val tl = ThreadLocal.withInitial({ 42 })
private val tl2 = ThreadLocal.withInitial({ 239 })

private val unconfined = Dispatchers.Unconfined
private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement()

@Benchmark
fun channelPipeline(): Int = runBlocking {
Channel
.range(1, 1_000_000, Dispatchers.Unconfined)
.filter(Dispatchers.Unconfined) { it % 4 == 0 }
run(unconfined)
}

@Benchmark
fun channelPipelineOneThreadLocal(): Int = runBlocking {
run(unconfinedOneElement)
}

@Benchmark
fun channelPipelineTwoThreadLocals(): Int = runBlocking {
run(unconfinedTwoElements)
}

private suspend inline fun run(context: CoroutineContext): Int {
return Channel
.range(1, 1_000_000, context)
.filter(context) { it % 4 == 0 }
.fold(0) { a, b -> a + b }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public suspend fun <T> withContext(
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext) {
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal expect fun createDefaultDispatcher(): CoroutineDispatcher
@Suppress("PropertyName")
internal expect val DefaultDelay: Delay

internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T
// countOrElement -- pre-cached value for ThreadContext.kt
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
internal expect fun Continuation<*>.toDebugString(): String
internal expect val CoroutineContext.coroutineName: String?
10 changes: 6 additions & 4 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ internal class DispatchedContinuation<in T>(
) : Continuation<T> by continuation, DispatchedTask<T> {
private var _state: Any? = UNDEFINED
public override var resumeMode: Int = 0
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)

override fun takeState(): Any? {
val state = _state
Expand Down Expand Up @@ -80,21 +82,21 @@ internal class DispatchedContinuation<in T>(

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatched(value: T) {
withCoroutineContext(context) {
withCoroutineContext(context, countOrElement) {
continuation.resume(value)
}
}

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWithException(exception: Throwable) {
withCoroutineContext(context) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWithException(exception)
}
}
Expand Down Expand Up @@ -151,7 +153,7 @@ internal interface DispatchedTask<in T> : Runnable {
val context = continuation.context
val job = if (resumeMode.isCancellableMode) context[Job] else null
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context) {
withCoroutineContext(context, delegate.countOrElement) {
if (job != null && !job.isActive)
continuation.resumeWithException(job.getCancellationException())
else {
Expand Down
4 changes: 2 additions & 2 deletions common/kotlinx-coroutines-core-common/src/ResumeMode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
MODE_DIRECT -> resume(value)
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
Expand All @@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
MODE_DIRECT -> resumeWithException(exception)
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resumeWithException(exception) }
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlin.coroutines.*

internal expect fun threadContextElements(context: CoroutineContext): Any
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R,
*/
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
startDirect(completion) {
withCoroutineContext(completion.context) {
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(completion)
}
}
Expand All @@ -50,7 +50,7 @@ internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Contin
*/
internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
startDirect(completion) {
withCoroutineContext(completion.context) {
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(receiver, completion)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
/**
* Executes a block using a given coroutine context.
*/
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
val oldValue = updateThreadContext(context)
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
val oldValue = updateThreadContext(context, countOrElement)
try {
return block()
} finally {
Expand Down
16 changes: 10 additions & 6 deletions core/kotlinx-coroutines-core/src/internal/ThreadContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ private val restoreState =
return state
}

internal fun updateThreadContext(context: CoroutineContext): Any? {
val count = context.fold(0, countAll)
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!

// countOrElement is pre-cached in dispatched continuation
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
@Suppress("NAME_SHADOWING")
val countOrElement = countOrElement ?: threadContextElements(context)
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
return when {
count === 0 -> ZERO // very fast path when there are no active ThreadContextElements
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
// ^^^ identity comparison for speed, we know zero always has the same identity
count is Int -> {
countOrElement is Int -> {
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
context.fold(ThreadState(context, count), updateState)
context.fold(ThreadState(context, countOrElement), updateState)
}
else -> {
// fast path for one ThreadContextElement (no allocations, no additional context scan)
@Suppress("UNCHECKED_CAST")
val element = count as ThreadContextElement<Any?>
val element = countOrElement as ThreadContextElement<Any?>
element.updateThreadContext(context)
}
}
Expand Down
2 changes: 1 addition & 1 deletion js/kotlinx-coroutines-core-js/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
}

// No debugging facilities on JS
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
internal actual fun Continuation<*>.toDebugString(): String = toString()
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
9 changes: 9 additions & 0 deletions js/kotlinx-coroutines-core-js/src/internal/ThreadContext.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlin.coroutines.*

internal actual fun threadContextElements(context: CoroutineContext): Any = 0
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
}

// No debugging facilities on native
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T = block()
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
internal actual fun Continuation<*>.toDebugString(): String = toString()
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlin.coroutines.*

internal actual fun threadContextElements(context: CoroutineContext): Any = 0