Skip to content

Commit df42b60

Browse files
committed
Eagerly create coroutine for lazily-started coroutine builders in order to trigger DebugProbes.probeCoroutineCreated
Fixes #1544
1 parent f62d35b commit df42b60

File tree

5 files changed

+35
-15
lines changed

5 files changed

+35
-15
lines changed

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,10 @@ private class LazyDeferredCoroutine<T>(
106106
parentContext: CoroutineContext,
107107
block: suspend CoroutineScope.() -> T
108108
) : DeferredCoroutine<T>(parentContext, active = false) {
109-
private var block: (suspend CoroutineScope.() -> T)? = block
109+
private val continuation = block.createCoroutineUnintercepted(this, this)
110110

111111
override fun onStart() {
112-
val block = checkNotNull(this.block) { "Already started" }
113-
this.block = null
114-
block.startCoroutineCancellable(this, this)
112+
continuation.startCoroutineCancellable(this)
115113
}
116114
}
117115

@@ -190,12 +188,10 @@ private class LazyStandaloneCoroutine(
190188
parentContext: CoroutineContext,
191189
block: suspend CoroutineScope.() -> Unit
192190
) : StandaloneCoroutine(parentContext, active = false) {
193-
private var block: (suspend CoroutineScope.() -> Unit)? = block
191+
private val continuation = block.createCoroutineUnintercepted(this, this)
194192

195193
override fun onStart() {
196-
val block = checkNotNull(this.block) { "Already started" }
197-
this.block = null
198-
block.startCoroutineCancellable(this, this)
194+
continuation.startCoroutineCancellable(this)
199195
}
200196
}
201197

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
99
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
12+
import kotlin.coroutines.intrinsics.*
1213

1314
/**
1415
* Broadcasts all elements of the channel.
@@ -124,7 +125,7 @@ private class LazyBroadcastCoroutine<E>(
124125
channel: BroadcastChannel<E>,
125126
block: suspend ProducerScope<E>.() -> Unit
126127
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
127-
private var block: (suspend ProducerScope<E>.() -> Unit)? = block
128+
private val continuation = block.createCoroutineUnintercepted(this, this)
128129

129130
override fun openSubscription(): ReceiveChannel<E> {
130131
// open subscription _first_
@@ -135,8 +136,6 @@ private class LazyBroadcastCoroutine<E>(
135136
}
136137

137138
override fun onStart() {
138-
val block = checkNotNull(this.block) { "Already started" }
139-
this.block = null
140-
block.startCoroutineCancellable(this, this)
139+
continuation.startCoroutineCancellable(this)
141140
}
142141
}

kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, co
2626
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
2727
}
2828

29+
/**
30+
* Similar to [startCoroutineCancellable], but for already created coroutine.
31+
* [fatalCompletion] is used only when interception machinery throws an exception
32+
*/
33+
internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
34+
runSafely(fatalCompletion) {
35+
intercepted().resumeCancellable(Unit)
36+
}
37+
2938
/**
3039
* Runs given block and completes completion with its exception if it occurs.
3140
* Rationale: [startCoroutineCancellable] is invoked when we are about to run coroutine asynchronously in its own dispatcher.

kotlinx-coroutines-core/jvm/src/channels/Actor.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.intrinsics.*
99
import kotlinx.coroutines.selects.*
1010
import kotlin.coroutines.*
11+
import kotlin.coroutines.intrinsics.*
1112

1213
/**
1314
* Scope for [actor][GlobalScope.actor] coroutine builder.
@@ -143,11 +144,14 @@ private open class ActorCoroutine<E>(
143144
private class LazyActorCoroutine<E>(
144145
parentContext: CoroutineContext,
145146
channel: Channel<E>,
146-
private val block: suspend ActorScope<E>.() -> Unit
147+
block: suspend ActorScope<E>.() -> Unit
147148
) : ActorCoroutine<E>(parentContext, channel, active = false),
148149
SelectClause2<E, SendChannel<E>> {
150+
151+
private var continuation = block.createCoroutineUnintercepted(this, this)
152+
149153
override fun onStart() {
150-
block.startCoroutineCancellable(this, this)
154+
continuation.startCoroutineCancellable(this)
151155
}
152156

153157
override suspend fun send(element: E) {

kotlinx-coroutines-debug/test/StartModeProbesTest.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package kotlinx.coroutines.debug
66

77
import kotlinx.coroutines.*
8-
import org.junit.*
8+
import kotlinx.coroutines.channels.*
99
import org.junit.Test
1010
import kotlin.test.*
1111

@@ -138,4 +138,16 @@ class StartModeProbesTest : DebugTestBase() {
138138
delay(Long.MAX_VALUE)
139139
}
140140
}
141+
142+
@Test
143+
fun testLazy() = runTest({ it is CancellationException }) {
144+
launch(start = CoroutineStart.LAZY) { }
145+
actor<Int>(start = CoroutineStart.LAZY) { }
146+
broadcast<Int>(start = CoroutineStart.LAZY) { }
147+
async(start = CoroutineStart.LAZY) { 1 }
148+
verifyPartialDump(5, "BlockingCoroutine",
149+
"LazyStandaloneCoroutine", "LazyActorCoroutine",
150+
"LazyBroadcastCoroutine", "LazyDeferredCoroutine")
151+
cancel()
152+
}
141153
}

0 commit comments

Comments
 (0)