Skip to content

Eagerly create coroutine for lazily-started coroutine builders in ord… #1548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,10 @@ private class LazyDeferredCoroutine<T>(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> T
) : DeferredCoroutine<T>(parentContext, active = false) {
private var block: (suspend CoroutineScope.() -> T)? = block
private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
continuation.startCoroutineCancellable(this)
}
}

Expand Down Expand Up @@ -190,12 +188,10 @@ private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private var block: (suspend CoroutineScope.() -> Unit)? = block
private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
continuation.startCoroutineCancellable(this)
}
}

Expand Down
7 changes: 3 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Broadcasts all elements of the channel.
Expand Down Expand Up @@ -124,7 +125,7 @@ private class LazyBroadcastCoroutine<E>(
channel: BroadcastChannel<E>,
block: suspend ProducerScope<E>.() -> Unit
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
private var block: (suspend ProducerScope<E>.() -> Unit)? = block
private val continuation = block.createCoroutineUnintercepted(this, this)

override fun openSubscription(): ReceiveChannel<E> {
// open subscription _first_
Expand All @@ -135,8 +136,6 @@ private class LazyBroadcastCoroutine<E>(
}

override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
continuation.startCoroutineCancellable(this)
}
}
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, co
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}

/**
* Similar to [startCoroutineCancellable], but for already created coroutine.
* [fatalCompletion] is used only when interception machinery throws an exception
*/
internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
runSafely(fatalCompletion) {
intercepted().resumeCancellable(Unit)
}

/**
* Runs given block and completes completion with its exception if it occurs.
* Rationale: [startCoroutineCancellable] is invoked when we are about to run coroutine asynchronously in its own dispatcher.
Expand Down
8 changes: 6 additions & 2 deletions kotlinx-coroutines-core/jvm/src/channels/Actor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Scope for [actor][GlobalScope.actor] coroutine builder.
Expand Down Expand Up @@ -143,11 +144,14 @@ private open class ActorCoroutine<E>(
private class LazyActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
private val block: suspend ActorScope<E>.() -> Unit
block: suspend ActorScope<E>.() -> Unit
) : ActorCoroutine<E>(parentContext, channel, active = false),
SelectClause2<E, SendChannel<E>> {

private var continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
block.startCoroutineCancellable(this, this)
continuation.startCoroutineCancellable(this)
}

override suspend fun send(element: E) {
Expand Down
14 changes: 13 additions & 1 deletion kotlinx-coroutines-debug/test/StartModeProbesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package kotlinx.coroutines.debug

import kotlinx.coroutines.*
import org.junit.*
import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*

Expand Down Expand Up @@ -138,4 +138,16 @@ class StartModeProbesTest : DebugTestBase() {
delay(Long.MAX_VALUE)
}
}

@Test
fun testLazy() = runTest({ it is CancellationException }) {
launch(start = CoroutineStart.LAZY) { }
actor<Int>(start = CoroutineStart.LAZY) { }
broadcast<Int>(start = CoroutineStart.LAZY) { }
async(start = CoroutineStart.LAZY) { 1 }
verifyPartialDump(5, "BlockingCoroutine",
"LazyStandaloneCoroutine", "LazyActorCoroutine",
"LazyBroadcastCoroutine", "LazyDeferredCoroutine")
cancel()
}
}