diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index f7fdba5f69..bb5f426a28 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -105,16 +105,19 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) { } /** - * Converts this completion stage to an instance of [Deferred]. - * When this completion stage is an instance of [Future], then it is cancelled when - * the resulting deferred is cancelled. + * Converts this [CompletionStage] to an instance of [Deferred]. + * + * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) + * is cancelled when the resulting deferred is cancelled. */ +@Suppress("DeferredIsResult") public fun CompletionStage.asDeferred(): Deferred { + val future = toCompletableFuture() // retrieve the future // Fast path if already completed - if (this is Future<*> && isDone()){ + if (future.isDone) { return try { @Suppress("UNCHECKED_CAST") - CompletableDeferred(get() as T) + CompletableDeferred(future.get() as T) } catch (e: Throwable) { // unwrap original cause from ExecutionException val original = (e as? ExecutionException)?.cause ?: e @@ -132,25 +135,28 @@ public fun CompletionStage.asDeferred(): Deferred { result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) } } - if (this is Future<*>) result.cancelFutureOnCompletion(this) + result.cancelFutureOnCompletion(future) return result } /** - * Awaits for completion of the completion stage without blocking a thread. + * Awaits for completion of [CompletionStage] without blocking a thread. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. - * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture]. - * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead. + * + * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that + * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) + * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead. */ public suspend fun CompletionStage.await(): T { + val future = toCompletableFuture() // retrieve the future // fast path when CompletableFuture is already done (does not suspend) - if (this is Future<*> && isDone()) { + if (future.isDone) { try { - @Suppress("UNCHECKED_CAST") - return get() as T + @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext") + return future.get() as T } catch (e: ExecutionException) { throw e.cause ?: e // unwrap original cause from ExecutionException } @@ -160,8 +166,7 @@ public suspend fun CompletionStage.await(): T { val consumer = ContinuationConsumer(cont) whenComplete(consumer) cont.invokeOnCancellation { - // mayInterruptIfRunning is not used - (this as? CompletableFuture)?.cancel(false) + future.cancel(false) consumer.cont = null // shall clear reference to continuation to aid GC } } diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index f75c96746c..998aaa0835 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -490,4 +490,81 @@ class FutureTest : TestBase() { } } } + + /** + * https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + */ + @Test + fun testCompletedStageAwait() = runTest { + val stage = CompletableFuture.completedStage("OK") + assertEquals("OK", stage.await()) + } + + /** + * https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + */ + @Test + fun testCompletedStageAsDeferredAwait() = runTest { + val stage = CompletableFuture.completedStage("OK") + val deferred = stage.asDeferred() + assertEquals("OK", deferred.await()) + } + + @Test + fun testCompletedStateThenApplyAwait() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.complete("O") + } + expect(2) + val stage = cf.thenApply { it + "K" } + assertEquals("OK", stage.await()) + finish(4) + } + + @Test + fun testCompletedStateThenApplyAwaitCancel() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.cancel(false) + } + expect(2) + val stage = cf.thenApply { it + "K" } + assertFailsWith { stage.await() } + finish(4) + } + + @Test + fun testCompletedStateThenApplyAsDeferredAwait() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.complete("O") + } + expect(2) + val stage = cf.thenApply { it + "K" } + val deferred = stage.asDeferred() + assertEquals("OK", deferred.await()) + finish(4) + } + + @Test + fun testCompletedStateThenApplyAsDeferredAwaitCancel() = runTest { + expect(1) + val cf = CompletableFuture() + expect(2) + val stage = cf.thenApply { it + "K" } + val deferred = stage.asDeferred() + launch { + expect(3) + deferred.cancel() // cancel the deferred! + } + assertFailsWith { stage.await() } + finish(4) + } }