Skip to content

Commit 5cbd971

Browse files
qwwdfsadpablobaxter
authored andcommitted
Cancel dispatched coroutine on Dispatchers.IO when the underlying Han… (Kotlin#2784)
Cancel dispatched coroutine on Dispatchers.IO when the underlying Handler is closed in Handler.asCoroutineDispatcher() Fixes Kotlin#2778
1 parent 1ebe6b1 commit 5cbd971

File tree

3 files changed

+96
-16
lines changed

3 files changed

+96
-16
lines changed

ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt

+27-11
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
package kotlinx.coroutines.android
88

99
import android.os.*
10-
import androidx.annotation.*
1110
import android.view.*
11+
import androidx.annotation.*
1212
import kotlinx.coroutines.*
1313
import kotlinx.coroutines.internal.*
1414
import java.lang.reflect.*
@@ -54,15 +54,22 @@ internal class AndroidDispatcherFactory : MainDispatcherFactory {
5454
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
5555
HandlerContext(Looper.getMainLooper().asHandler(async = true))
5656

57-
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
57+
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
5858

5959
override val loadPriority: Int
6060
get() = Int.MAX_VALUE / 2
6161
}
6262

6363
/**
64-
* Represents an arbitrary [Handler] as a implementation of [CoroutineDispatcher]
64+
* Represents an arbitrary [Handler] as an implementation of [CoroutineDispatcher]
6565
* with an optional [name] for nicer debugging
66+
*
67+
* ## Rejected execution
68+
*
69+
* If the underlying handler is closed and its message-scheduling methods start to return `false` on
70+
* an attempt to submit a continuation task to the resulting dispatcher,
71+
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
72+
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
6673
*/
6774
@JvmName("from") // this is for a nice Java API, see issue #255
6875
@JvmOverloads
@@ -129,24 +136,33 @@ internal class HandlerContext private constructor(
129136
}
130137

131138
override fun dispatch(context: CoroutineContext, block: Runnable) {
132-
handler.post(block)
139+
if (!handler.post(block)) {
140+
cancelOnRejection(context, block)
141+
}
133142
}
134143

135144
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
136145
val block = Runnable {
137146
with(continuation) { resumeUndispatched(Unit) }
138147
}
139-
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
140-
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
148+
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
149+
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
150+
} else {
151+
cancelOnRejection(continuation.context, block)
152+
}
141153
}
142154

143155
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
144-
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
145-
return object : DisposableHandle {
146-
override fun dispose() {
147-
handler.removeCallbacks(block)
148-
}
156+
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
157+
return DisposableHandle { handler.removeCallbacks(block) }
149158
}
159+
cancelOnRejection(context, block)
160+
return NonDisposableHandle
161+
}
162+
163+
private fun cancelOnRejection(context: CoroutineContext, block: Runnable) {
164+
context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed"))
165+
Dispatchers.IO.dispatch(context, block)
150166
}
151167

152168
override fun toString(): String = toStringInternalImpl() ?: run {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.android
6+
7+
import android.os.*
8+
import kotlinx.coroutines.*
9+
import org.junit.*
10+
import org.junit.runner.*
11+
import org.robolectric.*
12+
import org.robolectric.annotation.*
13+
14+
@RunWith(RobolectricTestRunner::class)
15+
@Config(manifest = Config.NONE, sdk = [28])
16+
class DisabledHandlerTest : TestBase() {
17+
18+
private var delegateToSuper = false
19+
private val disabledDispatcher = object : Handler() {
20+
override fun sendMessageAtTime(msg: Message?, uptimeMillis: Long): Boolean {
21+
if (delegateToSuper) return super.sendMessageAtTime(msg, uptimeMillis)
22+
return false
23+
}
24+
}.asCoroutineDispatcher()
25+
26+
@Test
27+
fun testRunBlocking() {
28+
expect(1)
29+
try {
30+
runBlocking(disabledDispatcher) {
31+
expectUnreached()
32+
}
33+
expectUnreached()
34+
} catch (e: CancellationException) {
35+
finish(2)
36+
}
37+
}
38+
39+
@Test
40+
fun testInvokeOnCancellation() = runTest {
41+
val job = launch(disabledDispatcher, start = CoroutineStart.LAZY) { expectUnreached() }
42+
job.invokeOnCompletion { if (it != null) expect(2) }
43+
yield()
44+
expect(1)
45+
job.join()
46+
finish(3)
47+
}
48+
49+
@Test
50+
fun testWithTimeout() = runTest {
51+
delegateToSuper = true
52+
try {
53+
withContext(disabledDispatcher) {
54+
expect(1)
55+
delegateToSuper = false
56+
delay(Long.MAX_VALUE - 1)
57+
expectUnreached()
58+
}
59+
expectUnreached()
60+
} catch (e: CancellationException) {
61+
finish(2)
62+
}
63+
}
64+
}

ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class HandlerDispatcherTest : TestBase() {
2929
fun mainIsAsync() = runTest {
3030
ReflectionHelpers.setStaticField(Build.VERSION::class.java, "SDK_INT", 28)
3131

32-
val mainLooper = ShadowLooper.getShadowMainLooper()
32+
val mainLooper = shadowOf(Looper.getMainLooper())
3333
mainLooper.pause()
3434
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)
3535

@@ -48,7 +48,7 @@ class HandlerDispatcherTest : TestBase() {
4848

4949
val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()
5050

51-
val mainLooper = ShadowLooper.getShadowMainLooper()
51+
val mainLooper = shadowOf(Looper.getMainLooper())
5252
mainLooper.pause()
5353
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)
5454

@@ -67,7 +67,7 @@ class HandlerDispatcherTest : TestBase() {
6767

6868
val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()
6969

70-
val mainLooper = ShadowLooper.getShadowMainLooper()
70+
val mainLooper = shadowOf(Looper.getMainLooper())
7171
mainLooper.pause()
7272
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)
7373

@@ -86,7 +86,7 @@ class HandlerDispatcherTest : TestBase() {
8686

8787
val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()
8888

89-
val mainLooper = ShadowLooper.getShadowMainLooper()
89+
val mainLooper = shadowOf(Looper.getMainLooper())
9090
mainLooper.pause()
9191
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)
9292

@@ -105,7 +105,7 @@ class HandlerDispatcherTest : TestBase() {
105105

106106
val main = Looper.getMainLooper().asHandler(async = false).asCoroutineDispatcher()
107107

108-
val mainLooper = ShadowLooper.getShadowMainLooper()
108+
val mainLooper = shadowOf(Looper.getMainLooper())
109109
mainLooper.pause()
110110
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)
111111

0 commit comments

Comments
 (0)