Skip to content

Wrap all Incomplete final state into box (and unbox it where necessar… #837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private class DispatchedCoroutine<in T>(
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
Expand Down
38 changes: 26 additions & 12 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

// ------------ state query ------------

/**
* Returns current state of this job.
* @suppress **This is unstable API and it is subject to change.**
* If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
* and should be [unboxed][unboxState] before returning to user code.
*/
internal val state: Any? get() {
_state.loop { state -> // helper loop on state (complete in-progress atomic operations)
Expand Down Expand Up @@ -192,7 +192,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Finalizes Finishing -> Completed (terminal state) transition.
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
/*
* Note: proposed state can be Incompleted, e.g.
* async {
* smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
* }
*/
require(this.state === state) // consistency check -- it cannot change
require(!state.isSealed) // consistency check -- cannot be sealed yet
require(state.isCompleting) // consistency check -- must be marked as completing
Expand Down Expand Up @@ -220,7 +225,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
handleJobException(finalException)
}
// Then CAS to completed state -> it must succeed
require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
// And process all post-completion actions
completeStateFinalization(state, finalState, mode, suppressed)
return true
Expand Down Expand Up @@ -254,7 +259,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
check(update !is CompletedExceptionally) // only for normal completion
if (!_state.compareAndSet(state, update)) return false
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
completeStateFinalization(state, update, mode, false)
return true
}
Expand Down Expand Up @@ -1066,7 +1071,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
if (state is CompletedExceptionally) throw state.cause
return state
return state.unboxState()
}

/**
Expand All @@ -1079,7 +1084,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.cause
return state
return state.unboxState()

}
if (startInternal(state) >= 0) break // break unless needs to retry
Expand Down Expand Up @@ -1111,10 +1116,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (state is CompletedExceptionally)
if (state is CompletedExceptionally) {
select.resumeSelectCancellableWithException(state.cause)
else
block.startCoroutineUnintercepted(state as T, select.completion)
}
else {
block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
}
}
return
}
Expand All @@ -1136,10 +1143,17 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.cause)
else
block.startCoroutineCancellable(state as T, select.completion)
block.startCoroutineCancellable(state.unboxState() as T, select.completion)
}
}

/*
* Class to represent object as the final state of the Job
*/
private class IncompleteStateBox(@JvmField val state: Incomplete)
private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this

// --------------- helper classes & constants for job implementation

private const val COMPLETING_ALREADY_COMPLETING = 0
Expand Down Expand Up @@ -1238,7 +1252,7 @@ private class ResumeAwaitOnCompletion<T>(
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state as T)
continuation.resume(state.unboxState() as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
else -> result
}
} else {
state
state.unboxState()
}
}
else -> COROUTINE_SUSPENDED
Expand Down
31 changes: 30 additions & 1 deletion common/kotlinx-coroutines-core-common/test/AsyncTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "UNREACHABLE_CODE") // KT-21913
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "UNREACHABLE_CODE", "USELESS_IS_CHECK") // KT-21913

package kotlinx.coroutines

Expand Down Expand Up @@ -238,4 +238,33 @@ class AsyncTest : TestBase() {
finish(3)
}
}

@Test
fun testIncompleteAsyncState() = runTest {
val deferred = async {
coroutineContext[Job]!!.invokeOnCompletion { }
}

deferred.await().dispose()
assertTrue(deferred.getCompleted() is DisposableHandle)
assertNull(deferred.getCompletionExceptionOrNull())
assertTrue(deferred.isCompleted)
assertFalse(deferred.isActive)
assertFalse(deferred.isCancelled)
}

@Test
fun testIncompleteAsyncFastPath() = runTest {
val deferred = async(Dispatchers.Unconfined) {
coroutineContext[Job]!!.invokeOnCompletion { }
}

deferred.await().dispose()
assertTrue(deferred.getCompleted() is DisposableHandle)
assertNull(deferred.getCompletionExceptionOrNull())
assertTrue(deferred.isCompleted)
assertFalse(deferred.isActive)
assertFalse(deferred.isCancelled)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,41 @@ class CompletableDeferredTest : TestBase() {
checkFresh(c)
}

private fun checkFresh(c: CompletableDeferred<String>) {
assertEquals(true, c.isActive)
assertEquals(false, c.isCancelled)
assertEquals(false, c.isCompleted)
assertThrows<IllegalStateException> { c.getCancellationException() }
assertThrows<IllegalStateException> { c.getCompleted() }
assertThrows<IllegalStateException> { c.getCompletionExceptionOrNull() }
}

@Test
fun testComplete() {
val c = CompletableDeferred<String>()
assertEquals(true, c.complete("OK"))
checkCompleteOk(c)
assertEquals("OK", c.getCompleted())
assertEquals(false, c.complete("OK"))
checkCompleteOk(c)
assertEquals("OK", c.getCompleted())
}

@Test
fun testCompleteWithIncompleteResult() {
val c = CompletableDeferred<DisposableHandle>()
assertEquals(true, c.complete(c.invokeOnCompletion { }))
checkCompleteOk(c)
assertEquals(false, c.complete(c.invokeOnCompletion { }))
checkCompleteOk(c)
assertTrue(c.getCompleted() is Incomplete)
}

private fun checkFresh(c: CompletableDeferred<*>) {
assertEquals(true, c.isActive)
assertEquals(false, c.isCancelled)
assertEquals(false, c.isCompleted)
assertThrows<IllegalStateException> { c.getCancellationException() }
assertThrows<IllegalStateException> { c.getCompleted() }
assertThrows<IllegalStateException> { c.getCompletionExceptionOrNull() }
}

private fun checkCompleteOk(c: CompletableDeferred<String>) {
private fun checkCompleteOk(c: CompletableDeferred<*>) {
assertEquals(false, c.isActive)
assertEquals(false, c.isCancelled)
assertEquals(true, c.isCompleted)
assertTrue(c.getCancellationException() is JobCancellationException)
assertEquals("OK", c.getCompleted())
assertEquals(null, c.getCompletionExceptionOrNull())
}

Expand Down
14 changes: 14 additions & 0 deletions common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ class CoroutineScopeTest : TestBase() {
assertSame(Dispatchers.Unconfined, scopePlusContext(Dispatchers.Unconfined, Dispatchers.Unconfined))
}

@Test
fun testIncompleteScopeState() = runTest {
lateinit var scopeJob: Job
coroutineScope {
scopeJob = coroutineContext[Job]!!
scopeJob.invokeOnCompletion { }
}

scopeJob.join()
assertTrue(scopeJob.isCompleted)
assertFalse(scopeJob.isActive)
assertFalse(scopeJob.isCancelled)
}

private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) =
(ContextScope(c1) + c2).coroutineContext
}
28 changes: 28 additions & 0 deletions common/kotlinx-coroutines-core-common/test/JobTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("DEPRECATION")

package kotlinx.coroutines

import kotlin.test.*
Expand Down Expand Up @@ -205,4 +207,30 @@ class JobTest : TestBase() {
assertTrue(job.isCancelled)
assertTrue(parent.isCancelled)
}

@Test
fun testIncompleteJobState() = runTest {
val job = launch {
coroutineContext[Job]!!.invokeOnCompletion { }
}

job.join()
assertTrue(job.isCompleted)
assertFalse(job.isActive)
assertFalse(job.isCancelled)
}

@Test
fun testChildrenWithIncompleteState() = runTest {
val job = async { Wrapper() }
job.join()
assertTrue(job.children.toList().isEmpty())
}

private class Wrapper : Incomplete {
override val isActive: Boolean
get() = error("")
override val list: NodeList?
get() = error("")
}
}
46 changes: 38 additions & 8 deletions common/kotlinx-coroutines-core-common/test/WithContextTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class WithContextTest : TestBase() {
expect(2)
val result = withContext(coroutineContext) { // same context!
expect(3) // still here
"OK"
}
"OK".wrap()
}.unwrap()
assertEquals("OK", result)
expect(4)
// will wait for the first coroutine
Expand All @@ -70,8 +70,8 @@ class WithContextTest : TestBase() {
expect(3) // still here
yield() // now yields to launch!
expect(5)
"OK"
}
"OK".wrap()
}.unwrap()
assertEquals("OK", result)
finish(6)
}
Expand All @@ -95,7 +95,7 @@ class WithContextTest : TestBase() {
} catch (e: CancellationException) {
expect(4)
}
"OK"
"OK".wrap()
}

expectUnreached()
Expand Down Expand Up @@ -126,7 +126,7 @@ class WithContextTest : TestBase() {
} catch (e: CancellationException) {
finish(6)
}
"OK"
"OK".wrap()
}
// still fails, because parent job was cancelled
expectUnreached()
Expand Down Expand Up @@ -240,7 +240,9 @@ class WithContextTest : TestBase() {
job!!.cancel() // cancel itself
require(job!!.cancel(AssertionError()))
require(!isActive)
"OK".wrap()
}
expectUnreached()
} catch (e: Throwable) {
expect(7)
// make sure JCE is thrown
Expand Down Expand Up @@ -269,7 +271,9 @@ class WithContextTest : TestBase() {
throw TestException()
}
expect(3)
"OK".wrap()
}
expectUnreached()
} catch (e: TestException) {
// ensure that we can catch exception outside of the scope
expect(5)
Expand All @@ -287,7 +291,8 @@ class WithContextTest : TestBase() {
expect(4) // waits before return
}
expect(3)
}
"OK".wrap()
}.unwrap()
finish(5)
}

Expand All @@ -301,7 +306,32 @@ class WithContextTest : TestBase() {
expect(4) // waits before return
}
expect(3)
}
"OK".wrap()
}.unwrap()
finish(5)
}

@Test
fun testIncompleteWithContextState() = runTest {
lateinit var ctxJob: Job
withContext(wrapperDispatcher(coroutineContext)) {
ctxJob = coroutineContext[Job]!!
ctxJob.invokeOnCompletion { }
}

ctxJob.join()
assertTrue(ctxJob.isCompleted)
assertFalse(ctxJob.isActive)
assertFalse(ctxJob.isCancelled)
}

private class Wrapper(val value: String) : Incomplete {
override val isActive: Boolean
get() = error("")
override val list: NodeList?
get() = error("")
}

private fun String.wrap() = Wrapper(this)
private fun Wrapper.unwrap() = value
}
Loading