Skip to content

Commit 3e9f244

Browse files
committed
Review and optimize usage of CancellableContinuation.invokeOnCancellation
1 parent dbd9e1c commit 3e9f244

File tree

9 files changed

+39
-29
lines changed
  • common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental
  • integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio
  • js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental
  • reactive
    • kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
    • kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1
    • kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2

9 files changed

+39
-29
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,17 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
7474
it.start() // To properly await lazily started deferreds
7575
it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)
7676
}
77-
cont.invokeOnCancellation {
77+
cont.invokeOnCancellation(handler = DisposeHandlersOnCancel(handlers).asHandler)
78+
}
79+
80+
private class DisposeHandlersOnCancel(private val handlers: List<DisposableHandle>) : CancelHandler() {
81+
override fun invoke(cause: Throwable?) {
7882
handlers.forEach { it.dispose() }
7983
}
84+
override fun toString(): String = "DisposeHandlersOnCancel[$handlers]"
8085
}
8186

82-
inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
87+
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
8388
override fun invoke(cause: Throwable?) {
8489
if (cause != null) {
8590
val token = continuation.tryResumeWithException(cause)

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -769,11 +769,15 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
769769

770770
// ------ private ------
771771

772-
private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
773-
cont.invokeOnCancellation {
772+
private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
773+
cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
774+
775+
private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
776+
override fun invoke(cause: Throwable?) {
774777
if (receive.remove())
775778
onReceiveDequeued()
776779
}
780+
override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
777781
}
778782

779783
private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {

integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ suspend fun AsynchronousSocketChannel.aWrite(
137137

138138
private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
139139
cont.invokeOnCancellation {
140-
try {
141-
close()
142-
} catch (ex: Throwable) {
143-
// Specification says that it is Ok to call it any time, but reality is different,
144-
// so we have just to ignore exception
145-
}
140+
try {
141+
close()
142+
} catch (ex: Throwable) {
143+
// Specification says that it is Ok to call it any time, but reality is different,
144+
// so we have just to ignore exception
145+
}
146146
}
147147
}
148148

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ internal class NodeDispatcher : CoroutineDispatcher(), Delay {
2828
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
2929
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit))
3030
// Actually on cancellation, but clearTimeout is idempotent
31-
continuation.invokeOnCancellation { clearTimeout(handle) }
31+
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
32+
}
33+
34+
private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
35+
override fun dispose() { clearTimeout(handle) }
36+
override fun invoke(cause: Throwable?) { dispose() }
37+
override fun toString(): String = "ClearTimeout[$handle]"
3238
}
3339

3440
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
3541
val handle = setTimeout({ block.run() }, time.toIntMillis(unit))
36-
return object : DisposableHandle {
37-
override fun dispose() {
38-
clearTimeout(handle)
39-
}
40-
}
42+
return ClearTimeout(handle)
4143
}
4244
}
4345

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ private suspend fun <T> Publisher<T>.awaitOne(
120120
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
121121
if (!seenValue) {
122122
seenValue = true
123-
cont.resume(t)
124123
subscription.cancel()
124+
cont.resume(t)
125125
}
126126
}
127127
Mode.LAST, Mode.SINGLE -> {
128128
if (mode == Mode.SINGLE && seenValue) {
129+
subscription.cancel()
129130
if (cont.isActive)
130131
cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
131-
subscription.cancel()
132132
} else {
133133
value = t
134134
seenValue = true

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import rx.*
3333
*/
3434
public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
3535
subscribe(object : CompletableSubscriber {
36-
override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCompletion(s) }
36+
override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
3737
override fun onCompleted() { cont.resume(Unit) }
3838
override fun onError(e: Throwable) { cont.resumeWithException(e) }
3939
})
@@ -50,7 +50,7 @@ public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCorout
5050
* immediately resumes with [CancellationException].
5151
*/
5252
public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
53-
cont.unsubscribeOnCompletion(subscribe(object : SingleSubscriber<T>() {
53+
cont.unsubscribeOnCancellation(subscribe(object : SingleSubscriber<T>() {
5454
override fun onSuccess(t: T) { cont.resume(t) }
5555
override fun onError(error: Throwable) { cont.resumeWithException(error) }
5656
}))
@@ -128,14 +128,13 @@ public suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
128128
// ------------------------ private ------------------------
129129

130130
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
131-
cont.unsubscribeOnCompletion(subscribe(object : Subscriber<T>() {
131+
cont.unsubscribeOnCancellation(subscribe(object : Subscriber<T>() {
132132
override fun onStart() { request(1) }
133133
override fun onNext(t: T) { cont.resume(t) }
134134
override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
135135
override fun onError(e: Throwable) { cont.resumeWithException(e) }
136136
}))
137137
}
138138

139-
internal fun <T> CancellableContinuation<T>.unsubscribeOnCompletion(sub: Subscription) {
139+
internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
140140
invokeOnCancellation { sub.unsubscribe() }
141-
}

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : Co
4646
with(continuation) { resumeUndispatched(Unit) }
4747
}, time, unit)
4848
.let { subscription ->
49-
continuation.unsubscribeOnCompletion(subscription)
49+
continuation.unsubscribeOnCancellation(subscription)
5050
}
5151

5252
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import kotlinx.coroutines.experimental.suspendCancellableCoroutine
3434
*/
3535
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
3636
subscribe(object : CompletableObserver {
37-
override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
37+
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
3838
override fun onComplete() { cont.resume(Unit) }
3939
override fun onError(e: Throwable) { cont.resumeWithException(e) }
4040
})
@@ -65,7 +65,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
6565
*/
6666
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
6767
subscribe(object : MaybeObserver<T> {
68-
override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
68+
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
6969
override fun onComplete() { cont.resume(default) }
7070
override fun onSuccess(t: T) { cont.resume(t) }
7171
override fun onError(error: Throwable) { cont.resumeWithException(error) }
@@ -84,7 +84,7 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCan
8484
*/
8585
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
8686
subscribe(object : SingleObserver<T> {
87-
override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
87+
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
8888
override fun onSuccess(t: T) { cont.resume(t) }
8989
override fun onError(error: Throwable) { cont.resumeWithException(error) }
9090
})
@@ -161,7 +161,7 @@ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SING
161161

162162
// ------------------------ private ------------------------
163163

164-
internal fun CancellableContinuation<*>.disposeOnCompletion(d: Disposable) =
164+
internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
165165
invokeOnCancellation { d.dispose() }
166166

167167
private enum class Mode(val s: String) {

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : Co
4343
val disposable = scheduler.scheduleDirect({
4444
with(continuation) { resumeUndispatched(Unit) }
4545
}, time, unit)
46-
continuation.disposeOnCompletion(disposable)
46+
continuation.disposeOnCancellation(disposable)
4747
}
4848

4949
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {

0 commit comments

Comments
 (0)