Skip to content

Commit 3af370f

Browse files
committed
Save more code switching to resumeWith idiom
This change also makes TimeoutCoroutine extends ScopedCoroutines. In addition to avoid code duplication it also enables to do emit in flow from withing withTimeout { ... } sections, which would be properly considered scope. This change also save more JVM stack entries before resume calls.
1 parent 030fdc9 commit 3af370f

File tree

7 files changed

+45
-125
lines changed

7 files changed

+45
-125
lines changed

kotlinx-coroutines-core/common/src/ResumeMode.kt

-43
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,10 @@
44

55
package kotlinx.coroutines
66

7-
import kotlin.coroutines.*
8-
import kotlin.coroutines.intrinsics.*
9-
107
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
118
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
129
@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
1310
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
1411

1512
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
1613
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
17-
18-
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
19-
when (mode) {
20-
MODE_ATOMIC_DEFAULT -> resume(value)
21-
MODE_CANCELLABLE -> resumeCancellable(value)
22-
MODE_DIRECT -> resumeDirect(value)
23-
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
24-
else -> error("Invalid mode $mode")
25-
}
26-
}
27-
28-
internal fun <T> Continuation<T>.resumeWithExceptionMode(exception: Throwable, mode: Int) {
29-
when (mode) {
30-
MODE_ATOMIC_DEFAULT -> resumeWithException(exception)
31-
MODE_CANCELLABLE -> resumeCancellableWithException(exception)
32-
MODE_DIRECT -> resumeDirectWithException(exception)
33-
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatchedWithException(exception)
34-
else -> error("Invalid mode $mode")
35-
}
36-
}
37-
38-
internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
39-
when (mode) {
40-
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
41-
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
42-
MODE_DIRECT -> resume(value)
43-
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
44-
else -> error("Invalid mode $mode")
45-
}
46-
}
47-
48-
internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception: Throwable, mode: Int) {
49-
when (mode) {
50-
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
51-
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
52-
MODE_DIRECT -> resumeWithException(exception)
53-
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resumeWithException(exception) }
54-
else -> error("Invalid mode $mode")
55-
}
56-
}

kotlinx-coroutines-core/common/src/Timeout.kt

+3-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -80,26 +80,12 @@ private fun <U, T: U> setupTimeout(
8080

8181
private open class TimeoutCoroutine<U, in T: U>(
8282
@JvmField val time: Long,
83-
@JvmField val uCont: Continuation<U> // unintercepted continuation
84-
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T>, CoroutineStackFrame {
85-
override val defaultResumeMode: Int get() = MODE_DIRECT
86-
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
87-
override fun getStackTraceElement(): StackTraceElement? = null
88-
override val isScopedCoroutine: Boolean get() = true
89-
90-
@Suppress("LeakingThis", "Deprecation")
83+
uCont: Continuation<U> // unintercepted continuation
84+
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
9185
override fun run() {
9286
cancelCoroutine(TimeoutCancellationException(time, this))
9387
}
9488

95-
@Suppress("UNCHECKED_CAST")
96-
override fun afterCompletionInternal(state: Any?, mode: Int) {
97-
if (state is CompletedExceptionally)
98-
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
99-
else
100-
uCont.resumeUninterceptedMode(state as T, mode)
101-
}
102-
10389
override fun nameString(): String =
10490
"${super.nameString()}(timeMillis=$time)"
10591
}

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+12-29
Original file line numberDiff line numberDiff line change
@@ -175,32 +175,16 @@ internal class DispatchedContinuation<in T>(
175175
}
176176

177177
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
178-
inline fun resumeCancellable(value: T) {
179-
if (dispatcher.isDispatchNeeded(context)) {
180-
_state = value
181-
resumeMode = MODE_CANCELLABLE
182-
dispatcher.dispatch(context, this)
183-
} else {
184-
executeUnconfined(value, MODE_CANCELLABLE) {
185-
if (!resumeCancelled()) {
186-
resumeUndispatched(value)
187-
}
188-
}
189-
}
190-
}
191-
192-
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
193-
inline fun resumeCancellableWithException(exception: Throwable) {
194-
val context = continuation.context
195-
val state = CompletedExceptionally(exception)
178+
inline fun resumeCancellableWith(result: Result<T>) {
179+
val state = result.toState()
196180
if (dispatcher.isDispatchNeeded(context)) {
197-
_state = CompletedExceptionally(exception)
181+
_state = state
198182
resumeMode = MODE_CANCELLABLE
199183
dispatcher.dispatch(context, this)
200184
} else {
201185
executeUnconfined(state, MODE_CANCELLABLE) {
202186
if (!resumeCancelled()) {
203-
resumeUndispatchedWithException(exception)
187+
resumeUndispatchedWith(result)
204188
}
205189
}
206190
}
@@ -218,16 +202,9 @@ internal class DispatchedContinuation<in T>(
218202
}
219203

220204
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
221-
inline fun resumeUndispatched(value: T) {
222-
withCoroutineContext(context, countOrElement) {
223-
continuation.resume(value)
224-
}
225-
}
226-
227-
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
228-
inline fun resumeUndispatchedWithException(exception: Throwable) {
205+
inline fun resumeUndispatchedWith(result: Result<T>) {
229206
withCoroutineContext(context, countOrElement) {
230-
continuation.resumeWithStackTrace(exception)
207+
continuation.resumeWith(result)
231208
}
232209
}
233210

@@ -243,6 +220,12 @@ internal class DispatchedContinuation<in T>(
243220
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
244221
}
245222

223+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
224+
internal inline fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
225+
is DispatchedContinuation -> resumeCancellableWith(result)
226+
else -> resumeWith(result)
227+
}
228+
246229
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
247230
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
248231
run()

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+14-27
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,29 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
105105
}
106106
}
107107

108+
@Suppress("UNCHECKED_CAST")
108109
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
109110
// slow-path - use delegate
110111
val state = takeState()
111-
val exception = getExceptionalResult(state)
112-
if (exception != null) {
112+
val exception = getExceptionalResult(state)?.let {
113113
/*
114114
* Recover stacktrace for non-dispatched tasks.
115115
* We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
116116
* and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
117117
* kotlinx.coroutines and DispatchedTask
118118
*/
119-
val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
120-
delegate.resumeWithExceptionMode(recovered, useMode)
121-
} else {
122-
delegate.resumeMode(getSuccessfulResult(state), useMode)
119+
if (delegate is DispatchedTask<*>) it else recoverStackTrace(it, delegate)
120+
}
121+
val result = if (exception != null)
122+
Result.failure(exception)
123+
else
124+
Result.success(state as T)
125+
when (useMode) {
126+
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
127+
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
128+
MODE_DIRECT -> ((delegate as? DispatchedContinuation)?.continuation ?: delegate).resumeWith(result)
129+
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
130+
else -> error("Invalid mode $useMode")
123131
}
124132
}
125133

@@ -158,27 +166,6 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
158166
}
159167
}
160168

161-
162-
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
163-
is DispatchedContinuation -> resumeCancellable(value)
164-
else -> resume(value)
165-
}
166-
167-
internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
168-
is DispatchedContinuation -> resumeCancellableWithException(exception)
169-
else -> resumeWithStackTrace(exception)
170-
}
171-
172-
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
173-
is DispatchedContinuation -> continuation.resume(value)
174-
else -> resume(value)
175-
}
176-
177-
internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
178-
is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
179-
else -> resumeWithStackTrace(exception)
180-
}
181-
182169
@Suppress("NOTHING_TO_INLINE")
183170
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
184171
resumeWith(Result.failure(recoverStackTrace(exception, this)))

kotlinx-coroutines-core/common/src/internal/Scopes.kt

+11-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.internal
66

77
import kotlinx.coroutines.*
88
import kotlin.coroutines.*
9+
import kotlin.coroutines.intrinsics.*
910
import kotlin.jvm.*
1011

1112
/**
@@ -25,10 +26,16 @@ internal open class ScopeCoroutine<in T>(
2526

2627
@Suppress("UNCHECKED_CAST")
2728
override fun afterCompletionInternal(state: Any?, mode: Int) {
28-
if (state is CompletedExceptionally) {
29-
uCont.resumeUninterceptedWithExceptionMode(recoverStackTrace(state.cause, uCont), mode)
30-
} else {
31-
uCont.resumeUninterceptedMode(state as T, mode)
29+
val result = if (state is CompletedExceptionally)
30+
Result.failure(recoverStackTrace(state.cause, uCont))
31+
else
32+
Result.success(state as T)
33+
when (mode) {
34+
MODE_ATOMIC_DEFAULT -> uCont.intercepted().resumeWith(result)
35+
MODE_CANCELLABLE -> uCont.intercepted().resumeCancellableWith(result)
36+
MODE_DIRECT -> uCont.resumeWith(result)
37+
MODE_UNDISPATCHED -> withCoroutineContext(uCont.context, null) { uCont.resumeWith(result) }
38+
else -> error("Invalid mode $mode")
3239
}
3340
}
3441
}

kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.intrinsics
@@ -14,7 +14,7 @@ import kotlin.coroutines.intrinsics.*
1414
*/
1515
@InternalCoroutinesApi
1616
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
17-
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
17+
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
1818
}
1919

2020
/**
@@ -23,7 +23,7 @@ public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuat
2323
*/
2424
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
2525
runSafely(completion) {
26-
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
26+
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
2727
}
2828

2929
/**
@@ -32,7 +32,7 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, co
3232
*/
3333
internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
3434
runSafely(fatalCompletion) {
35-
intercepted().resumeCancellable(Unit)
35+
intercepted().resumeCancellableWith(Result.success(Unit))
3636
}
3737

3838
/**

kotlinx-coroutines-core/common/src/selects/Select.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ internal class SelectBuilderImpl<in R>(
285285
// Resumes in MODE_CANCELLABLE
286286
override fun resumeSelectCancellableWithException(exception: Throwable) {
287287
doResume({ CompletedExceptionally(exception) }) {
288-
uCont.intercepted().resumeCancellableWithException(exception)
288+
uCont.intercepted().resumeCancellableWith(Result.failure(exception))
289289
}
290290
}
291291

0 commit comments

Comments
 (0)