Skip to content

Commit 4fe1801

Browse files
committed
run is optimized with fast-path case and no longer has CoroutineScope in its block
1 parent f84f99f commit 4fe1801

File tree

3 files changed

+159
-9
lines changed

3 files changed

+159
-9
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+31-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.locks.LockSupport
2020
import kotlin.coroutines.experimental.*
21+
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
22+
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2123

2224
// --------------- basic coroutine builders ---------------
2325

@@ -58,10 +60,28 @@ fun launch(context: CoroutineContext, start: Boolean = true, block: suspend Coro
5860
* different thread inside the block, and back when it completes.
5961
* The specified [context] is added onto the current coroutine context for the execution of the block.
6062
*/
61-
public suspend fun <T> run(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T =
62-
suspendCoroutine { cont ->
63-
// new don't invoke `newCoroutineContext`, but consider this being the same coroutine in the new context
64-
InnerCoroutine(cont.context + context, cont).also { block.startCoroutine(it, it) }
63+
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
64+
suspendCoroutineOrReturn sc@ { cont ->
65+
val oldContext = cont.context
66+
// fast path #1 if there is no change in the actual context:
67+
if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
68+
return@sc block.startCoroutineUninterceptedOrReturn(cont)
69+
// compute new context
70+
val newContext = oldContext + context
71+
// fast path #2 if the result is actually the same
72+
if (newContext === oldContext)
73+
return@sc block.startCoroutineUninterceptedOrReturn(cont)
74+
// fast path #3 if the new dispatcher is the same as the old one
75+
if (newContext[ContinuationInterceptor] === oldContext[ContinuationInterceptor]) {
76+
val newContinuation = RunContinuationDirect(newContext, cont)
77+
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
78+
}
79+
// slowest path otherwise -- use new interceptor, sync to its result via a
80+
// full-blown instance of CancellableContinuation
81+
val newContinuation = RunContinuationCoroutine(newContext, cont)
82+
newContinuation.initCancellability()
83+
block.startCoroutine(newContinuation)
84+
newContinuation.getResult()
6585
}
6686

6787
/**
@@ -111,12 +131,15 @@ private class LazyStandaloneCoroutine(
111131
}
112132
}
113133

114-
private class InnerCoroutine<in T>(
134+
private class RunContinuationDirect<in T>(
115135
override val context: CoroutineContext,
116136
continuation: Continuation<T>
117-
) : Continuation<T> by continuation, CoroutineScope {
118-
override val isActive: Boolean = context[Job]?.isActive ?: true
119-
}
137+
) : Continuation<T> by continuation
138+
139+
private class RunContinuationCoroutine<in T>(
140+
override val parentContext: CoroutineContext,
141+
continuation: Continuation<T>
142+
) : CancellableContinuationImpl<T>(continuation, active = true)
120143

121144
private class BlockingCoroutine<T>(
122145
override val parentContext: CoroutineContext,

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ internal open class CancellableContinuationImpl<in T>(
179179
}
180180

181181
override fun initCancellability() {
182-
initParentJob(delegate.context[Job])
182+
initParentJob(parentContext[Job])
183183
}
184184

185185
@PublishedApi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.hamcrest.MatcherAssert.assertThat
20+
import org.hamcrest.core.IsEqual
21+
import org.junit.Test
22+
23+
class RunTest : TestBase() {
24+
@Test
25+
fun testSameContextNoSuspend() = runBlocking<Unit> {
26+
expect(1)
27+
launch(context) { // make sure there is not early dispatch here
28+
finish(5)
29+
}
30+
expect(2)
31+
val result = run(context) { // same context!
32+
expect(3) // still here
33+
"OK"
34+
}
35+
assertThat(result, IsEqual("OK"))
36+
expect(4)
37+
}
38+
39+
@Test
40+
fun testSameContextWithSuspend() = runBlocking<Unit> {
41+
expect(1)
42+
launch(context) { // make sure there is not early dispatch here
43+
expect(4)
44+
}
45+
expect(2)
46+
val result = run(context) { // same context!
47+
expect(3) // still here
48+
yield() // now yields to launch!
49+
expect(5)
50+
"OK"
51+
}
52+
assertThat(result, IsEqual("OK"))
53+
finish(6)
54+
}
55+
56+
@Test
57+
fun testCancelWithJobNoSuspend() = runBlocking<Unit> {
58+
expect(1)
59+
launch(context) { // make sure there is not early dispatch to here
60+
finish(6)
61+
}
62+
expect(2)
63+
val job = Job()
64+
val result = run(context + job) { // same context + new job
65+
expect(3) // still here
66+
job.cancel() // cancel out job!
67+
try {
68+
yield() // shall throw CancellationException
69+
expectUnreached()
70+
} catch (e: CancellationException) {
71+
expect(4)
72+
}
73+
"OK"
74+
}
75+
assertThat(result, IsEqual("OK"))
76+
expect(5)
77+
}
78+
79+
@Test
80+
fun testCancelWithJobWithSuspend() = runBlocking<Unit> {
81+
expect(1)
82+
launch(context) { // make sure there is not early dispatch to here
83+
expect(4)
84+
}
85+
expect(2)
86+
val job = Job()
87+
val result = run(context + job) { // same context + new job
88+
expect(3) // still here
89+
yield() // now yields to launch!
90+
expect(5)
91+
job.cancel() // cancel out job!
92+
try {
93+
yield() // shall throw CancellationExpcetion
94+
expectUnreached()
95+
} catch (e: CancellationException) {
96+
expect(6)
97+
}
98+
"OK"
99+
}
100+
assertThat(result, IsEqual("OK"))
101+
finish(7)
102+
}
103+
104+
@Test
105+
fun testCommonPoolNoSuspend() = runBlocking<Unit> {
106+
expect(1)
107+
val result = run(CommonPool) {
108+
expect(2)
109+
"OK"
110+
}
111+
assertThat(result, IsEqual("OK"))
112+
finish(3)
113+
}
114+
115+
@Test
116+
fun testCommonPoolWithSuspend() = runBlocking<Unit> {
117+
expect(1)
118+
val result = run(CommonPool) {
119+
expect(2)
120+
delay(100)
121+
expect(3)
122+
"OK"
123+
}
124+
assertThat(result, IsEqual("OK"))
125+
finish(4)
126+
}
127+
}

0 commit comments

Comments
 (0)