Skip to content

Commit 15bf5b7

Browse files
committed
Introduce JobSupport.cancelCoroutine for AbstractCoroutine-derived classes
1 parent 092480c commit 15bf5b7

File tree

8 files changed

+14
-10
lines changed

8 files changed

+14
-10
lines changed

integration/kotlinx-coroutines-jdk8/test/examples/ExplicitJob-example.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ fun main(args: Array<String>) {
2626
log("g should not execute this line")
2727
}
2828
log("Started futures f && g... will not wait -- cancel them!!!")
29-
job.cancel(CancellationException("I don't want it"))
29+
job.cancel()
3030
check(f.isCancelled)
3131
check(g.isCancelled)
3232
log("f result = ${Try<Unit> { f.get() }}")

kotlinx-coroutines-core/common/src/JobSupport.kt

+4
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
584584
public open fun childCancelled(cause: Throwable): Boolean =
585585
cancelImpl(cause) && handlesException
586586

587+
// For AbstractCoroutine implementations
588+
protected fun cancelCoroutine(cause: Throwable?) =
589+
cancelImpl(cause)
590+
587591
// cause is Throwable or ParentJob when cancelChild was invoked
588592
// returns true is exception was handled, false otherwise
589593
private fun cancelImpl(cause: Any?): Boolean {

kotlinx-coroutines-core/common/src/Timeout.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private open class TimeoutCoroutine<U, in T: U>(
9191

9292
@Suppress("LeakingThis", "Deprecation")
9393
override fun run() {
94-
cancel(TimeoutCancellationException(time, this))
94+
cancelCoroutine(TimeoutCancellationException(time, this))
9595
}
9696

9797
@Suppress("UNCHECKED_CAST")

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private open class BroadcastCoroutine<E>(
9898
override fun cancel(cause: Throwable?): Boolean {
9999
val wasCancelled = _channel.cancel(cause)
100100
@Suppress("DEPRECATION")
101-
if (wasCancelled) super.cancel(cause) // cancel the job
101+
if (wasCancelled) cancelCoroutine(cause) // cancel the job
102102
return wasCancelled
103103
}
104104

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal open class ChannelCoroutine<E>(
2323

2424
override fun cancel(cause: Throwable?): Boolean {
2525
val wasCancelled = _channel.cancel(cause)
26-
if (wasCancelled) super.cancel(cause) // cancel the job
26+
if (wasCancelled) cancelCoroutine(cause) // cancel the job
2727
return wasCancelled
2828
}
2929
}

kotlinx-coroutines-core/jvm/src/Builders.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private class BlockingCoroutine<T>(
7575
try {
7676
while (true) {
7777
@Suppress("DEPRECATION")
78-
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
78+
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
7979
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
8080
// note: process next even may loose unpark flag, so check if completed before parking
8181
if (isCompleted) break

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private class PublisherCoroutine<in T>(
7272

7373
override val isClosedForSend: Boolean get() = isCompleted
7474
override val isFull: Boolean = mutex.isLocked
75-
override fun close(cause: Throwable?): Boolean = cancel(cause)
75+
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
7676
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
7777
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
7878

@@ -134,7 +134,7 @@ private class PublisherCoroutine<in T>(
134134
// If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
135135
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
136136
// this failure is essentially equivalent to a failure of a child coroutine.
137-
childCancelled(e)
137+
cancelCoroutine(e)
138138
unlockAndCheckCompleted()
139139
throw e
140140
}
@@ -201,7 +201,7 @@ private class PublisherCoroutine<in T>(
201201
override fun request(n: Long) {
202202
if (n <= 0) {
203203
// Specification requires IAE for n <= 0
204-
cancel(IllegalArgumentException("non-positive subscription request $n"))
204+
cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
205205
return
206206
}
207207
while (true) { // lock-free loop for nRequested

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private class RxObservableCoroutine<T: Any>(
6565

6666
override val isClosedForSend: Boolean get() = isCompleted
6767
override val isFull: Boolean = mutex.isLocked
68-
override fun close(cause: Throwable?): Boolean = cancel(cause)
68+
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
6969
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
7070
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
7171

@@ -113,7 +113,7 @@ private class RxObservableCoroutine<T: Any>(
113113
// If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
114114
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
115115
// this failure is essentially equivalent to a failure of a child coroutine.
116-
childCancelled(e)
116+
cancelCoroutine(e)
117117
doLockedSignalCompleted()
118118
throw e
119119
}

0 commit comments

Comments
 (0)