Skip to content

Commit b10287e

Browse files
qwwdfsadelizarov
authored andcommitted
Change order of handlers in JobSupport and CancellableContinuation
1) Invoke handlers before dispatching in CancellableContinuation to make behaviour timing-independent and Unconfined doesn't produce unexpected results 2) Invoke onCancellation -> handlers -> onCompletion in Job to make behaviour timing-independent Fixes #415
1 parent 06ec98c commit b10287e

File tree

4 files changed

+55
-10
lines changed

4 files changed

+55
-10
lines changed

common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ internal abstract class AbstractContinuation<in T>(
188188
return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
189189
}
190190

191-
private fun onCompletionInternal(mode: Int) {
191+
private fun dispatchResume(mode: Int) {
192192
if (tryResume()) return // completed before getResult invocation -- bail out
193193
// otherwise, getResult has already commenced, i.e. completed later or in other thread
194194
dispatch(mode)
@@ -326,16 +326,18 @@ internal abstract class AbstractContinuation<in T>(
326326

327327
protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
328328
val exceptionally = update as? CompletedExceptionally
329-
onCompletionInternal(mode)
330329

331-
// Invoke cancellation handlers only if necessary
332330
if (update is CancelledContinuation && expect is CancelHandler) {
333331
try {
334332
expect.invoke(exceptionally?.cause)
335333
} catch (ex: Throwable) {
336334
handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
337335
}
338336
}
337+
338+
// Notify all handlers before dispatching, otherwise behaviour will be timing-dependent
339+
// and confusing with Unconfined
340+
dispatchResume(mode)
339341
}
340342

341343
private fun handleException(exception: Throwable) {

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,17 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
207207
private fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
208208
val exceptionally = update as? CompletedExceptionally
209209
// Do overridable processing before completion handlers
210-
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
211-
onCompletionInternal(update, mode)
210+
211+
/*
212+
* 1) Invoke onCancellationInternal: exception handling, parent/resource cancellation etc.
213+
* 2) Invoke completion handlers: .join(), callbacks etc. It's important to invoke them only AFTER exception handling, see #208
214+
* 3) Invoke onCompletionInternal: onNext(), timeout deregistration etc. I should be last so all callbacks observe consistent state
215+
* of the job which doesn't depend on callback scheduling
216+
*
217+
* Only notify on cancellation once (expect.isCancelling)
218+
*/
219+
if (!expect.isCancelling) onCancellationInternal(exceptionally)
220+
212221
// Invoke completion handlers
213222
val cause = exceptionally?.cause
214223
if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
@@ -220,6 +229,8 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
220229
} else {
221230
expect.list?.notifyCompletion(cause)
222231
}
232+
233+
onCompletionInternal(update, mode)
223234
}
224235

225236
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {

common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,22 @@ class AbstractCoroutineTest : TestBase() {
2424

2525
override fun onCompleted(value: String) {
2626
assertEquals("OK", value)
27-
expect(6)
27+
expect(8)
2828
}
2929

3030
override fun onCompletedExceptionally(exception: Throwable) {
3131
expectUnreached()
3232
}
3333
}
34+
3435
coroutine.invokeOnCompletion(onCancelling = true) {
3536
assertTrue(it == null)
36-
expect(7)
37+
expect(6)
3738
}
39+
3840
coroutine.invokeOnCompletion {
3941
assertTrue(it == null)
40-
expect(8)
42+
expect(7)
4143
}
4244
expect(2)
4345
coroutine.start()
@@ -66,7 +68,7 @@ class AbstractCoroutineTest : TestBase() {
6668

6769
override fun onCompletedExceptionally(exception: Throwable) {
6870
assertTrue(exception is TestException1)
69-
expect(8)
71+
expect(9)
7072
}
7173
}
7274
coroutine.invokeOnCompletion(onCancelling = true) {
@@ -75,7 +77,7 @@ class AbstractCoroutineTest : TestBase() {
7577
}
7678
coroutine.invokeOnCompletion {
7779
assertTrue(it is TestException1)
78-
expect(9)
80+
expect(8)
7981
}
8082
expect(2)
8183
coroutine.start()

common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt

+30
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlin.coroutines.experimental.*
99
import kotlin.test.*
1010

1111
class ProduceTest : TestBase() {
12+
1213
@Test
1314
fun testBasic() = runTest {
1415
val c = produce(coroutineContext) {
@@ -81,5 +82,34 @@ class ProduceTest : TestBase() {
8182
}
8283
}
8384

85+
@Test
86+
fun testCancelOnCompletionUnconfined() = runTest {
87+
cancelOnCompletion(Unconfined)
88+
}
89+
90+
@Test
91+
fun testCancelOnCompletion() = runTest {
92+
cancelOnCompletion(coroutineContext)
93+
}
94+
95+
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) {
96+
val source = Channel<Int>()
97+
expect(1)
98+
val produced = produce<Int>(coroutineContext, onCompletion = source.consumes()) {
99+
expect(2)
100+
source.receive()
101+
}
102+
103+
yield()
104+
expect(3)
105+
produced.cancel()
106+
try {
107+
source.receive()
108+
// TODO shouldn't it be ClosedReceiveChannelException ?
109+
} catch (e: JobCancellationException) {
110+
finish(4)
111+
}
112+
}
113+
84114
private class TestException : Exception()
85115
}

0 commit comments

Comments
 (0)