Skip to content

Wrap all Incomplete final state into box (and unbox it where necessar… #837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private class DispatchedCoroutine<in T>(
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
Expand Down
35 changes: 23 additions & 12 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

// ------------ state query ------------

/**
* Returns current state of this job.
* @suppress **This is unstable API and it is subject to change.**
* If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
* and should be [unboxed][unboxState] before returning to user code.
*/
internal val state: Any? get() {
_state.loop { state -> // helper loop on state (complete in-progress atomic operations)
Expand Down Expand Up @@ -192,7 +192,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Finalizes Finishing -> Completed (terminal state) transition.
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
/*
* Note: proposed state can be Incompleted, e.g.
* async {
* smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
* }
*/
require(this.state === state) // consistency check -- it cannot change
require(!state.isSealed) // consistency check -- cannot be sealed yet
require(state.isCompleting) // consistency check -- must be marked as completing
Expand Down Expand Up @@ -220,7 +225,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
handleJobException(finalException)
}
// Then CAS to completed state -> it must succeed
require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
// And process all post-completion actions
completeStateFinalization(state, finalState, mode, suppressed)
return true
Expand Down Expand Up @@ -254,7 +259,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
check(update !is CompletedExceptionally) // only for normal completion
if (!_state.compareAndSet(state, update)) return false
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
completeStateFinalization(state, update, mode, false)
return true
}
Expand Down Expand Up @@ -1029,7 +1034,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
override fun getContinuationCancellationCause(parent: Job): Throwable {
val state = job.state
val state = job.state.unboxState()
/*
* When the job we are waiting for had already completely completed exceptionally or
* is failing, we shall use its root/completion cause for await's result.
Expand All @@ -1054,7 +1059,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally

public fun getCompletionExceptionOrNull(): Throwable? {
val state = this.state
val state = this.state.unboxState()
check(state !is Incomplete) { "This job has not completed yet" }
return state.exceptionOrNull
}
Expand All @@ -1063,7 +1068,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun getCompletedInternal(): Any? {
val state = this.state
val state = this.state.unboxState()
check(state !is Incomplete) { "This job has not completed yet" }
if (state is CompletedExceptionally) throw state.cause
return state
Expand All @@ -1075,7 +1080,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
internal suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while (true) { // lock-free loop on state
val state = this.state
val state = this.state.unboxState()
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.cause
Expand Down Expand Up @@ -1131,7 +1136,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
*/
@Suppress("UNCHECKED_CAST")
internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
val state = this.state
val state = this.state.unboxState()
// Note: await is non-atomic (can be cancelled while dispatched)
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.cause)
Expand All @@ -1140,6 +1145,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
}

/*
* Class to represent object as the final state of the Job
*/
private class IncompleteStateBox(@JvmField val state: Incomplete)
private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this

// --------------- helper classes & constants for job implementation

private const val COMPLETING_ALREADY_COMPLETING = 0
Expand Down Expand Up @@ -1230,8 +1242,7 @@ private class ResumeAwaitOnCompletion<T>(
private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
val state = job.state
check(state !is Incomplete)
val state = job.state.unboxState()
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
return when {
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
makeCompletingOnce(result, MODE_IGNORE) -> {
val state = state
val state = state.unboxState()
if (state is CompletedExceptionally) {
when {
shouldThrow(state.cause) -> throw state.cause
Expand Down
12 changes: 12 additions & 0 deletions common/kotlinx-coroutines-core-common/test/AsyncTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,16 @@ class AsyncTest : TestBase() {
finish(3)
}
}

@Test
fun testIncompleteAsyncState() = runTest {
val job = async {
coroutineContext[Job]!!.invokeOnCompletion { }
}

job.await().dispose()
assertTrue(job.isCompleted)
assertFalse(job.isActive)
assertFalse(job.isCancelled)
}
}
14 changes: 14 additions & 0 deletions common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ class CoroutineScopeTest : TestBase() {
assertSame(Dispatchers.Unconfined, scopePlusContext(Dispatchers.Unconfined, Dispatchers.Unconfined))
}

@Test
fun testIncompleteScopeState() = runTest {
lateinit var scopeJob: Job
coroutineScope {
scopeJob = coroutineContext[Job]!!
scopeJob.invokeOnCompletion { }
}

scopeJob.join()
assertTrue(scopeJob.isCompleted)
assertFalse(scopeJob.isActive)
assertFalse(scopeJob.isCancelled)
}

private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) =
(ContextScope(c1) + c2).coroutineContext
}
28 changes: 28 additions & 0 deletions common/kotlinx-coroutines-core-common/test/JobTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("DEPRECATION")

package kotlinx.coroutines

import kotlin.test.*
Expand Down Expand Up @@ -205,4 +207,30 @@ class JobTest : TestBase() {
assertTrue(job.isCancelled)
assertTrue(parent.isCancelled)
}

@Test
fun testIncompleteJobState() = runTest {
val job = launch {
coroutineContext[Job]!!.invokeOnCompletion { }
}

job.join()
assertTrue(job.isCompleted)
assertFalse(job.isActive)
assertFalse(job.isCancelled)
}

@Test
fun testChildrenWithIncompleteState() = runTest {
val job = async { Wrapper() }
job.join()
assertTrue(job.children.toList().isEmpty())
}

private class Wrapper : Incomplete {
override val isActive: Boolean
get() = error("")
override val list: NodeList?
get() = error("")
}
}
46 changes: 38 additions & 8 deletions common/kotlinx-coroutines-core-common/test/WithContextTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class WithContextTest : TestBase() {
expect(2)
val result = withContext(coroutineContext) { // same context!
expect(3) // still here
"OK"
}
"OK".wrap()
}.unwrap()
assertEquals("OK", result)
expect(4)
// will wait for the first coroutine
Expand All @@ -70,8 +70,8 @@ class WithContextTest : TestBase() {
expect(3) // still here
yield() // now yields to launch!
expect(5)
"OK"
}
"OK".wrap()
}.unwrap()
assertEquals("OK", result)
finish(6)
}
Expand All @@ -95,7 +95,7 @@ class WithContextTest : TestBase() {
} catch (e: CancellationException) {
expect(4)
}
"OK"
"OK".wrap()
}

expectUnreached()
Expand Down Expand Up @@ -126,7 +126,7 @@ class WithContextTest : TestBase() {
} catch (e: CancellationException) {
finish(6)
}
"OK"
"OK".wrap()
}
// still fails, because parent job was cancelled
expectUnreached()
Expand Down Expand Up @@ -240,7 +240,9 @@ class WithContextTest : TestBase() {
job!!.cancel() // cancel itself
require(job!!.cancel(AssertionError()))
require(!isActive)
"OK".wrap()
}
expectUnreached()
} catch (e: Throwable) {
expect(7)
// make sure JCE is thrown
Expand Down Expand Up @@ -269,7 +271,9 @@ class WithContextTest : TestBase() {
throw TestException()
}
expect(3)
"OK".wrap()
}
expectUnreached()
} catch (e: TestException) {
// ensure that we can catch exception outside of the scope
expect(5)
Expand All @@ -287,7 +291,8 @@ class WithContextTest : TestBase() {
expect(4) // waits before return
}
expect(3)
}
"OK".wrap()
}.unwrap()
finish(5)
}

Expand All @@ -301,7 +306,32 @@ class WithContextTest : TestBase() {
expect(4) // waits before return
}
expect(3)
}
"OK".wrap()
}.unwrap()
finish(5)
}

@Test
fun testIncompleteWithContextState() = runTest {
lateinit var ctxJob: Job
withContext(wrapperDispatcher(coroutineContext)) {
ctxJob = coroutineContext[Job]!!
ctxJob.invokeOnCompletion { }
}

ctxJob.join()
assertTrue(ctxJob.isCompleted)
assertFalse(ctxJob.isActive)
assertFalse(ctxJob.isCancelled)
}

private class Wrapper(val value: String) : Incomplete {
override val isActive: Boolean
get() = error("")
override val list: NodeList?
get() = error("")
}

private fun String.wrap() = Wrapper(this)
private fun Wrapper.unwrap() = value
}
16 changes: 15 additions & 1 deletion common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -195,5 +195,19 @@ class WithTimeoutTest : TestBase() {
finish(4)
}
}
}

@Test
fun testIncompleteWithTimeoutState() = runTest {
lateinit var timeoutJob: Job
val handle = withTimeout(Long.MAX_VALUE) {
timeoutJob = coroutineContext[Job]!!
timeoutJob.invokeOnCompletion { }
}

handle.dispose()
timeoutJob.join()
assertTrue(timeoutJob.isCompleted)
assertFalse(timeoutJob.isActive)
assertFalse(timeoutJob.isCancelled)
}
}
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private class BlockingCoroutine<T>(
}
timeSource.unregisterTimeLoopThread()
// now return result
val state = this.state
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
Expand Down
10 changes: 10 additions & 0 deletions core/kotlinx-coroutines-core/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,14 @@ class RunBlockingTest : TestBase() {

assertEquals(1, value)
}

@Test
fun testIncompleteState() {
val handle = runBlocking {
// See #835
coroutineContext[Job]!!.invokeOnCompletion { }
}

handle.dispose()
}
}