Skip to content

Commit ac48070

Browse files
committed
Optimize creation of DispatchTask for cancellable continuations
1 parent 5d94a26 commit ac48070

File tree

4 files changed

+41
-4
lines changed

4 files changed

+41
-4
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ private const val RESUMED = 2
2828
/**
2929
* @suppress **This is unstable API and it is subject to change.**
3030
*/
31+
// Note: it also works directly as DispatchTask for this delegate
3132
internal abstract class AbstractContinuation<in T>(
3233
@JvmField protected val delegate: Continuation<T>,
3334
@JvmField protected val resumeMode: Int
34-
) : JobSupport(true), Continuation<T> {
35+
) : JobSupport(true), Continuation<T>, Runnable {
3536
private val _decision = atomic(UNDECIDED)
3637

3738
/* decision state machine
@@ -81,10 +82,23 @@ internal abstract class AbstractContinuation<in T>(
8182
override fun afterCompletion(state: Any?, mode: Int) {
8283
if (tryResume()) return // completed before getResult invocation -- bail out
8384
// otherwise, getResult has already commenced, i.e. completed later or in other thread
85+
var useMode = mode
86+
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
87+
// dispatch directly using this instance's Runnable implementation
88+
val dispatcher = delegate.dispatcher
89+
val context = delegate.context
90+
if (dispatcher.isDispatchNeeded(context)) {
91+
dispatcher.dispatch(context, this)
92+
return // and that's it -- dispatched via fast-path
93+
} else {
94+
useMode = MODE_UNDISPATCHED
95+
}
96+
}
97+
// slow-path - use delegate
8498
if (state is CompletedExceptionally) {
85-
delegate.resumeWithExceptionMode(state.exception, mode)
99+
delegate.resumeWithExceptionMode(state.exception, useMode)
86100
} else {
87-
delegate.resumeMode(getSuccessfulResult(state), mode)
101+
delegate.resumeMode(getSuccessfulResult(state), useMode)
88102
}
89103
}
90104

@@ -118,4 +132,23 @@ internal abstract class AbstractContinuation<in T>(
118132
override fun handleException(exception: Throwable) {
119133
handleCoroutineException(context, exception)
120134
}
135+
136+
// see all DispatchTask.run with the same logic
137+
override fun run() {
138+
check(delegate is DispatchedContinuation)
139+
try {
140+
val context = delegate.context
141+
val job = if (resumeMode.isCancellableMode) context[Job] else null
142+
val state = this.state
143+
withCoroutineContext(context) {
144+
when {
145+
job != null && !job.isActive -> delegate.resumeWithException(job.getCancellationException())
146+
state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
147+
else -> delegate.resume(getSuccessfulResult(state))
148+
}
149+
}
150+
} catch (e: Throwable) {
151+
throw RuntimeException("Unexpected exception running $this", e)
152+
}
153+
}
121154
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private class RemoveOnCancel(
214214
internal class CancellableContinuationImpl<in T>(
215215
delegate: Continuation<T>,
216216
resumeMode: Int
217-
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T> {
217+
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
218218
@Volatile // just in case -- we don't want an extra data race, even benign one
219219
private var _context: CoroutineContext? = null // created on first need
220220

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public actual abstract class CoroutineDispatcher actual constructor() :
110110
public actual typealias Runnable = java.lang.Runnable
111111

112112
// named class for ease of debugging, better stack-traces and optimize the number of anonymous classes
113+
// note that CancellableContinuationImpl directly works as DispatchTask
113114
internal class DispatchTask<in T>(
114115
private val continuation: Continuation<T>,
115116
private val value: Any?, // T | Throwable

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ResumeMode.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import kotlin.coroutines.experimental.Continuation
2424
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
2525
@PublishedApi internal const val MODE_IGNORE = 4 // don't do anything
2626

27+
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
28+
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
29+
2730
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
2831
when (mode) {
2932
MODE_ATOMIC_DEFAULT -> resume(value)

0 commit comments

Comments
 (0)