Skip to content

Commit d4fafc9

Browse files
committed
Fix await/asDeferred for MinimalState implementations
Fixes #2456
1 parent fa30140 commit d4fafc9

File tree

4 files changed

+177
-27
lines changed

4 files changed

+177
-27
lines changed

integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api

+2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
public final class kotlinx/coroutines/future/FutureKt {
22
public static final fun asCompletableFuture (Lkotlinx/coroutines/Deferred;)Ljava/util/concurrent/CompletableFuture;
33
public static final fun asCompletableFuture (Lkotlinx/coroutines/Job;)Ljava/util/concurrent/CompletableFuture;
4+
public static final fun asDeferred (Ljava/util/concurrent/CompletableFuture;)Lkotlinx/coroutines/Deferred;
45
public static final fun asDeferred (Ljava/util/concurrent/CompletionStage;)Lkotlinx/coroutines/Deferred;
6+
public static final fun await (Ljava/util/concurrent/CompletableFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
57
public static final fun await (Ljava/util/concurrent/CompletionStage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
68
public static final fun future (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/CompletableFuture;
79
public static synthetic fun future$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/CompletableFuture;

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

+91-26
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,23 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) {
105105
}
106106

107107
/**
108-
* Converts this completion stage to an instance of [Deferred].
108+
* Converts this [CompletionStage] to an instance of [Deferred].
109109
* When this completion stage is an instance of [Future], then it is cancelled when
110110
* the resulting deferred is cancelled.
111+
*
112+
* ### Implementation details
113+
*
114+
* [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and
115+
* to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with
116+
* [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance
117+
* of [CompletableFuture] there is an overload of this `asDeferred` function with a [CompletableFuture] receiver that
118+
* has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance
119+
* of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do
120+
* implement a [CompletableFuture], yet throw an [UnsupportedOperationException] on most of the future-like methods in it.
121+
* For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case.
111122
*/
123+
@Suppress("DeferredIsResult")
112124
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
113-
// Fast path if already completed
114-
if (this is Future<*> && isDone()){
115-
return try {
116-
@Suppress("UNCHECKED_CAST")
117-
CompletableDeferred(get() as T)
118-
} catch (e: Throwable) {
119-
// unwrap original cause from ExecutionException
120-
val original = (e as? ExecutionException)?.cause ?: e
121-
CompletableDeferred<T>().also { it.completeExceptionally(original) }
122-
}
123-
}
124125
val result = CompletableDeferred<T>()
125126
whenComplete { value, exception ->
126127
if (exception == null) {
@@ -137,34 +138,98 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
137138
}
138139

139140
/**
140-
* Awaits for completion of the completion stage without blocking a thread.
141+
* Converts this [CompletableFuture] to an instance of [Deferred].
142+
* The future is cancelled when the resulting deferred is cancelled.
143+
*
144+
* ### Implementation details
145+
*
146+
* This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone].
147+
*/
148+
@Suppress("DeferredIsResult")
149+
public fun <T> CompletableFuture<T>.asDeferred(): Deferred<T> {
150+
// Fast path if already completed
151+
if (isDone) {
152+
return try {
153+
@Suppress("UNCHECKED_CAST")
154+
CompletableDeferred(get() as T)
155+
} catch (e: Throwable) {
156+
// unwrap original cause from ExecutionException
157+
val original = (e as? ExecutionException)?.cause ?: e
158+
CompletableDeferred<T>().also { it.completeExceptionally(original) }
159+
}
160+
}
161+
// slow-path
162+
return (this as CompletionStage<T>).asDeferred()
163+
}
164+
/**
165+
* Awaits for completion of [CompletionStage] without blocking a thread.
141166
*
142167
* This suspending function is cancellable.
143168
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144169
* stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
145-
* This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
170+
* This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of a [Future].
146171
* If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
172+
*
173+
* ### Implementation details
174+
*
175+
* [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and
176+
* to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with
177+
* [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance
178+
* of [CompletableFuture] there is an overload of this `await` function with a [CompletableFuture] receiver that
179+
* has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance
180+
* of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do
181+
* implement a [CompletableFuture], yet throw an exception on most of the future-like methods in it.
182+
* For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case.
147183
*/
148-
public suspend fun <T> CompletionStage<T>.await(): T {
184+
public suspend fun <T> CompletionStage<T>.await(): T =
185+
suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
186+
val consumer = ContinuationConsumer(cont)
187+
whenComplete(consumer)
188+
if (cont.isActive) {
189+
// avoid creation of a lambda when continuation was already resumed during whenComplete call
190+
// TODO: In a major release this lambda can be made to extend CancelFutureOnCompletion class from core module
191+
// This will further save one allocated object here.
192+
cont.invokeOnCancellation {
193+
if (this is Future<*>) {
194+
// mayInterruptIfRunning is not used
195+
try {
196+
cancel(false)
197+
} catch (e: UnsupportedOperationException) {
198+
// Internal JDK implementation of a Future can throw an UnsupportedOperationException here.
199+
// We simply ignore it for the purpose of cancellation
200+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/2456
201+
}
202+
}
203+
consumer.cont = null // shall clear reference to continuation to aid GC
204+
}
205+
}
206+
}
207+
208+
/**
209+
* Awaits for completion of [CompletableFuture] without blocking a thread.
210+
*
211+
* This suspending function is cancellable.
212+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
213+
* stops waiting for the completable future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
214+
* This method is intended to be used with one-shot futures, so on coroutine cancellation the completable future is cancelled.
215+
* If cancelling the future is undesired, `future.asDeferred().await()` should be used instead.
216+
*
217+
* ### Implementation details
218+
*
219+
* This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone].
220+
*/
221+
public suspend fun <T> CompletableFuture<T>.await(): T {
149222
// fast path when CompletableFuture is already done (does not suspend)
150-
if (this is Future<*> && isDone()) {
223+
if (isDone) {
151224
try {
152-
@Suppress("UNCHECKED_CAST")
225+
@Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
153226
return get() as T
154227
} catch (e: ExecutionException) {
155228
throw e.cause ?: e // unwrap original cause from ExecutionException
156229
}
157230
}
158231
// slow path -- suspend
159-
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
160-
val consumer = ContinuationConsumer(cont)
161-
whenComplete(consumer)
162-
cont.invokeOnCancellation {
163-
// mayInterruptIfRunning is not used
164-
(this as? CompletableFuture<T>)?.cancel(false)
165-
consumer.cont = null // shall clear reference to continuation to aid GC
166-
}
167-
}
232+
return (this as CompletionStage<T>).await()
168233
}
169234

170235
private class ContinuationConsumer<T>(

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

+77
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,81 @@ class FutureTest : TestBase() {
490490
}
491491
}
492492
}
493+
494+
/**
495+
* https://github.com/Kotlin/kotlinx.coroutines/issues/2456
496+
*/
497+
@Test
498+
fun testCompletedStageAwait() = runTest {
499+
val stage = CompletableFuture.completedStage("OK")
500+
assertEquals("OK", stage.await())
501+
}
502+
503+
/**
504+
* https://github.com/Kotlin/kotlinx.coroutines/issues/2456
505+
*/
506+
@Test
507+
fun testCompletedStageAsDeferredAwait() = runTest {
508+
val stage = CompletableFuture.completedStage("OK")
509+
val deferred = stage.asDeferred()
510+
assertEquals("OK", deferred.await())
511+
}
512+
513+
@Test
514+
fun testCompletedStateThenApplyAwait() = runTest {
515+
expect(1)
516+
val cf = CompletableFuture<String>()
517+
launch {
518+
expect(3)
519+
cf.complete("O")
520+
}
521+
expect(2)
522+
val stage = cf.thenApply { it + "K" }
523+
assertEquals("OK", stage.await())
524+
finish(4)
525+
}
526+
527+
@Test
528+
fun testCompletedStateThenApplyAwaitCancel() = runTest {
529+
expect(1)
530+
val cf = CompletableFuture<String>()
531+
launch {
532+
expect(3)
533+
cf.cancel(false)
534+
}
535+
expect(2)
536+
val stage = cf.thenApply { it + "K" }
537+
assertFailsWith<CancellationException> { stage.await() }
538+
finish(4)
539+
}
540+
541+
@Test
542+
fun testCompletedStateThenApplyAsDeferredAwait() = runTest {
543+
expect(1)
544+
val cf = CompletableFuture<String>()
545+
launch {
546+
expect(3)
547+
cf.complete("O")
548+
}
549+
expect(2)
550+
val stage = cf.thenApply { it + "K" }
551+
val deferred = stage.asDeferred()
552+
assertEquals("OK", deferred.await())
553+
finish(4)
554+
}
555+
556+
@Test
557+
fun testCompletedStateThenApplyAsDeferredAwaitCancel() = runTest {
558+
expect(1)
559+
val cf = CompletableFuture<String>()
560+
expect(2)
561+
val stage = cf.thenApply { it + "K" }
562+
val deferred = stage.asDeferred()
563+
launch {
564+
expect(3)
565+
deferred.cancel() // cancel the deferred!
566+
}
567+
assertFailsWith<CancellationException> { stage.await() }
568+
finish(4)
569+
}
493570
}

kotlinx-coroutines-core/jvm/src/Future.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ private class CancelFutureOnCompletion(
3838
override fun invoke(cause: Throwable?) {
3939
// Don't interrupt when cancelling future on completion, because no one is going to reset this
4040
// interruption flag and it will cause spurious failures elsewhere
41-
future.cancel(false)
41+
try {
42+
future.cancel(false)
43+
} catch (e: UnsupportedOperationException) {
44+
// Internal JDK implementation of a Future can throw an UnsupportedOperationException here.
45+
// We simply ignore it for the purpose of cancellation
46+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/2456
47+
}
4248
}
4349
}
4450

0 commit comments

Comments
 (0)