Skip to content

Commit ecfd227

Browse files
committed
Wrap all Incomplete final state into box (and unbox it where necessary) because otherwise job machinery treats such state as intermediate
Fixes #835
1 parent 06600a2 commit ecfd227

File tree

10 files changed

+143
-24
lines changed

10 files changed

+143
-24
lines changed

common/kotlinx-coroutines-core-common/src/Builders.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ private class DispatchedCoroutine<in T>(
228228
fun getResult(): Any? {
229229
if (trySuspend()) return COROUTINE_SUSPENDED
230230
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
231-
val state = this.state
231+
val state = this.state.unboxState()
232232
if (state is CompletedExceptionally) throw state.cause
233233
@Suppress("UNCHECKED_CAST")
234234
return state as T

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+23-12
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
154154
}
155155

156156
// ------------ state query ------------
157-
158157
/**
159158
* Returns current state of this job.
160-
* @suppress **This is unstable API and it is subject to change.**
159+
* If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
160+
* and should be [unboxed][unboxState] before returning to user code.
161161
*/
162162
internal val state: Any? get() {
163163
_state.loop { state -> // helper loop on state (complete in-progress atomic operations)
@@ -192,7 +192,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
192192
// Finalizes Finishing -> Completed (terminal state) transition.
193193
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
194194
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
195-
require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
195+
/*
196+
* Note: proposed state can be Incompleted, e.g.
197+
* async {
198+
* smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
199+
* }
200+
*/
196201
require(this.state === state) // consistency check -- it cannot change
197202
require(!state.isSealed) // consistency check -- cannot be sealed yet
198203
require(state.isCompleting) // consistency check -- must be marked as completing
@@ -220,7 +225,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
220225
handleJobException(finalException)
221226
}
222227
// Then CAS to completed state -> it must succeed
223-
require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
228+
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
224229
// And process all post-completion actions
225230
completeStateFinalization(state, finalState, mode, suppressed)
226231
return true
@@ -254,7 +259,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
254259
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
255260
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
256261
check(update !is CompletedExceptionally) // only for normal completion
257-
if (!_state.compareAndSet(state, update)) return false
262+
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
258263
completeStateFinalization(state, update, mode, false)
259264
return true
260265
}
@@ -1029,7 +1034,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10291034
private val job: JobSupport
10301035
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
10311036
override fun getContinuationCancellationCause(parent: Job): Throwable {
1032-
val state = job.state
1037+
val state = job.state.unboxState()
10331038
/*
10341039
* When the job we are waiting for had already completely completed exceptionally or
10351040
* is failing, we shall use its root/completion cause for await's result.
@@ -1054,7 +1059,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10541059
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
10551060

10561061
public fun getCompletionExceptionOrNull(): Throwable? {
1057-
val state = this.state
1062+
val state = this.state.unboxState()
10581063
check(state !is Incomplete) { "This job has not completed yet" }
10591064
return state.exceptionOrNull
10601065
}
@@ -1063,7 +1068,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10631068
* @suppress **This is unstable API and it is subject to change.**
10641069
*/
10651070
internal fun getCompletedInternal(): Any? {
1066-
val state = this.state
1071+
val state = this.state.unboxState()
10671072
check(state !is Incomplete) { "This job has not completed yet" }
10681073
if (state is CompletedExceptionally) throw state.cause
10691074
return state
@@ -1075,7 +1080,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10751080
internal suspend fun awaitInternal(): Any? {
10761081
// fast-path -- check state (avoid extra object creation)
10771082
while (true) { // lock-free loop on state
1078-
val state = this.state
1083+
val state = this.state.unboxState()
10791084
if (state !is Incomplete) {
10801085
// already complete -- just return result
10811086
if (state is CompletedExceptionally) throw state.cause
@@ -1131,7 +1136,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11311136
*/
11321137
@Suppress("UNCHECKED_CAST")
11331138
internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1134-
val state = this.state
1139+
val state = this.state.unboxState()
11351140
// Note: await is non-atomic (can be cancelled while dispatched)
11361141
if (state is CompletedExceptionally)
11371142
select.resumeSelectCancellableWithException(state.cause)
@@ -1140,6 +1145,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11401145
}
11411146
}
11421147

1148+
/*
1149+
* Class to represent object as the final state of the Job
1150+
*/
1151+
private class IncompleteStateBox(@JvmField val state: Incomplete)
1152+
private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1153+
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1154+
11431155
// --------------- helper classes & constants for job implementation
11441156

11451157
private const val COMPLETING_ALREADY_COMPLETING = 0
@@ -1232,8 +1244,7 @@ private class ResumeAwaitOnCompletion<T>(
12321244
private val continuation: AbstractContinuation<T>
12331245
) : JobNode<JobSupport>(job) {
12341246
override fun invoke(cause: Throwable?) {
1235-
val state = job.state
1236-
check(state !is Incomplete)
1247+
val state = job.state.unboxState()
12371248
if (state is CompletedExceptionally) {
12381249
// Resume with exception in atomic way to preserve exception
12391250
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
123123
return when {
124124
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
125125
makeCompletingOnce(result, MODE_IGNORE) -> {
126-
val state = state
126+
val state = state.unboxState()
127127
if (state is CompletedExceptionally) {
128128
when {
129129
shouldThrow(state.cause) -> throw state.cause

common/kotlinx-coroutines-core-common/test/AsyncTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,16 @@ class AsyncTest : TestBase() {
239239
finish(3)
240240
}
241241
}
242+
243+
@Test
244+
fun testIncompleteAsyncState() = runTest {
245+
val job = async {
246+
coroutineContext[Job]!!.invokeOnCompletion { }
247+
}
248+
249+
job.await().dispose()
250+
assertTrue(job.isCompleted)
251+
assertFalse(job.isActive)
252+
assertFalse(job.isCancelled)
253+
}
242254
}

common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,20 @@ class CoroutineScopeTest : TestBase() {
256256
assertSame(Dispatchers.Unconfined, scopePlusContext(Dispatchers.Unconfined, Dispatchers.Unconfined))
257257
}
258258

259+
@Test
260+
fun testIncompleteScopeState() = runTest {
261+
lateinit var scopeJob: Job
262+
coroutineScope {
263+
scopeJob = coroutineContext[Job]!!
264+
scopeJob.invokeOnCompletion { }
265+
}
266+
267+
scopeJob.join()
268+
assertTrue(scopeJob.isCompleted)
269+
assertFalse(scopeJob.isActive)
270+
assertFalse(scopeJob.isCancelled)
271+
}
272+
259273
private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) =
260274
(ContextScope(c1) + c2).coroutineContext
261275
}

common/kotlinx-coroutines-core-common/test/JobTest.kt

+28
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("DEPRECATION")
6+
57
package kotlinx.coroutines
68

79
import kotlin.test.*
@@ -205,4 +207,30 @@ class JobTest : TestBase() {
205207
assertTrue(job.isCancelled)
206208
assertTrue(parent.isCancelled)
207209
}
210+
211+
@Test
212+
fun testIncompleteJobState() = runTest {
213+
val job = launch {
214+
coroutineContext[Job]!!.invokeOnCompletion { }
215+
}
216+
217+
job.join()
218+
assertTrue(job.isCompleted)
219+
assertFalse(job.isActive)
220+
assertFalse(job.isCancelled)
221+
}
222+
223+
@Test
224+
fun testChildrenWithIncompleteState() = runTest {
225+
val job = async { Wrapper() }
226+
job.join()
227+
assertTrue(job.children.toList().isEmpty())
228+
}
229+
230+
private class Wrapper : Incomplete {
231+
override val isActive: Boolean
232+
get() = error("")
233+
override val list: NodeList?
234+
get() = error("")
235+
}
208236
}

common/kotlinx-coroutines-core-common/test/WithContextTest.kt

+38-8
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ class WithContextTest : TestBase() {
5252
expect(2)
5353
val result = withContext(coroutineContext) { // same context!
5454
expect(3) // still here
55-
"OK"
56-
}
55+
"OK".wrap()
56+
}.unwrap()
5757
assertEquals("OK", result)
5858
expect(4)
5959
// will wait for the first coroutine
@@ -70,8 +70,8 @@ class WithContextTest : TestBase() {
7070
expect(3) // still here
7171
yield() // now yields to launch!
7272
expect(5)
73-
"OK"
74-
}
73+
"OK".wrap()
74+
}.unwrap()
7575
assertEquals("OK", result)
7676
finish(6)
7777
}
@@ -95,7 +95,7 @@ class WithContextTest : TestBase() {
9595
} catch (e: CancellationException) {
9696
expect(4)
9797
}
98-
"OK"
98+
"OK".wrap()
9999
}
100100

101101
expectUnreached()
@@ -126,7 +126,7 @@ class WithContextTest : TestBase() {
126126
} catch (e: CancellationException) {
127127
finish(6)
128128
}
129-
"OK"
129+
"OK".wrap()
130130
}
131131
// still fails, because parent job was cancelled
132132
expectUnreached()
@@ -240,7 +240,9 @@ class WithContextTest : TestBase() {
240240
job!!.cancel() // cancel itself
241241
require(job!!.cancel(AssertionError()))
242242
require(!isActive)
243+
"OK".wrap()
243244
}
245+
expectUnreached()
244246
} catch (e: Throwable) {
245247
expect(7)
246248
// make sure JCE is thrown
@@ -269,7 +271,9 @@ class WithContextTest : TestBase() {
269271
throw TestException()
270272
}
271273
expect(3)
274+
"OK".wrap()
272275
}
276+
expectUnreached()
273277
} catch (e: TestException) {
274278
// ensure that we can catch exception outside of the scope
275279
expect(5)
@@ -287,7 +291,8 @@ class WithContextTest : TestBase() {
287291
expect(4) // waits before return
288292
}
289293
expect(3)
290-
}
294+
"OK".wrap()
295+
}.unwrap()
291296
finish(5)
292297
}
293298

@@ -301,7 +306,32 @@ class WithContextTest : TestBase() {
301306
expect(4) // waits before return
302307
}
303308
expect(3)
304-
}
309+
"OK".wrap()
310+
}.unwrap()
305311
finish(5)
306312
}
313+
314+
@Test
315+
fun testIncompleteWithContextState() = runTest {
316+
lateinit var ctxJob: Job
317+
withContext(wrapperDispatcher(coroutineContext)) {
318+
ctxJob = coroutineContext[Job]!!
319+
ctxJob.invokeOnCompletion { }
320+
}
321+
322+
ctxJob.join()
323+
assertTrue(ctxJob.isCompleted)
324+
assertFalse(ctxJob.isActive)
325+
assertFalse(ctxJob.isCancelled)
326+
}
327+
328+
private class Wrapper(val value: String) : Incomplete {
329+
override val isActive: Boolean
330+
get() = error("")
331+
override val list: NodeList?
332+
get() = error("")
333+
}
334+
335+
private fun String.wrap() = Wrapper(this)
336+
private fun Wrapper.unwrap() = value
307337
}

common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt

+15-1
Original file line numberDiff line numberDiff line change
@@ -195,5 +195,19 @@ class WithTimeoutTest : TestBase() {
195195
finish(4)
196196
}
197197
}
198-
}
199198

199+
@Test
200+
fun testIncompleteWithTimeoutState() = runTest {
201+
lateinit var timeoutJob: Job
202+
val handle = withTimeout(Long.MAX_VALUE) {
203+
timeoutJob = coroutineContext[Job]!!
204+
timeoutJob.invokeOnCompletion { }
205+
}
206+
207+
handle.dispose()
208+
timeoutJob.join()
209+
assertTrue(timeoutJob.isCompleted)
210+
assertFalse(timeoutJob.isActive)
211+
assertFalse(timeoutJob.isCancelled)
212+
}
213+
}

core/kotlinx-coroutines-core/src/Builders.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private class BlockingCoroutine<T>(
8181
}
8282
timeSource.unregisterTimeLoopThread()
8383
// now return result
84-
val state = this.state
84+
val state = this.state.unboxState()
8585
(state as? CompletedExceptionally)?.let { throw it.cause }
8686
return state as T
8787
}

core/kotlinx-coroutines-core/test/RunBlockingTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,14 @@ class RunBlockingTest : TestBase() {
152152

153153
assertEquals(1, value)
154154
}
155+
156+
@Test
157+
fun testIncompleteState() {
158+
val handle = runBlocking {
159+
// See #835
160+
coroutineContext[Job]!!.invokeOnCompletion { }
161+
}
162+
163+
handle.dispose()
164+
}
155165
}

0 commit comments

Comments
 (0)