From d4fafc958f0a12a7b41ab1853d7f5039a1733808 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 18 Dec 2020 12:29:08 +0300 Subject: [PATCH 1/3] Fix await/asDeferred for MinimalState implementations Fixes #2456 --- .../api/kotlinx-coroutines-jdk8.api | 2 + .../src/future/Future.kt | 117 ++++++++++++++---- .../test/future/FutureTest.kt | 77 ++++++++++++ kotlinx-coroutines-core/jvm/src/Future.kt | 8 +- 4 files changed, 177 insertions(+), 27 deletions(-) diff --git a/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api b/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api index 4ee57845b2..524426bb46 100644 --- a/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api +++ b/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api @@ -1,7 +1,9 @@ public final class kotlinx/coroutines/future/FutureKt { public static final fun asCompletableFuture (Lkotlinx/coroutines/Deferred;)Ljava/util/concurrent/CompletableFuture; public static final fun asCompletableFuture (Lkotlinx/coroutines/Job;)Ljava/util/concurrent/CompletableFuture; + public static final fun asDeferred (Ljava/util/concurrent/CompletableFuture;)Lkotlinx/coroutines/Deferred; public static final fun asDeferred (Ljava/util/concurrent/CompletionStage;)Lkotlinx/coroutines/Deferred; + public static final fun await (Ljava/util/concurrent/CompletableFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun await (Ljava/util/concurrent/CompletionStage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun future (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/CompletableFuture; public static synthetic fun future$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/CompletableFuture; diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index f7fdba5f69..0fb82ce87b 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -105,22 +105,23 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) { } /** - * Converts this completion stage to an instance of [Deferred]. + * Converts this [CompletionStage] to an instance of [Deferred]. * When this completion stage is an instance of [Future], then it is cancelled when * the resulting deferred is cancelled. + * + * ### Implementation details + * + * [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and + * to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with + * [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance + * of [CompletableFuture] there is an overload of this `asDeferred` function with a [CompletableFuture] receiver that + * has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance + * of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do + * implement a [CompletableFuture], yet throw an [UnsupportedOperationException] on most of the future-like methods in it. + * For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case. */ +@Suppress("DeferredIsResult") public fun CompletionStage.asDeferred(): Deferred { - // Fast path if already completed - if (this is Future<*> && isDone()){ - return try { - @Suppress("UNCHECKED_CAST") - CompletableDeferred(get() as T) - } catch (e: Throwable) { - // unwrap original cause from ExecutionException - val original = (e as? ExecutionException)?.cause ?: e - CompletableDeferred().also { it.completeExceptionally(original) } - } - } val result = CompletableDeferred() whenComplete { value, exception -> if (exception == null) { @@ -137,34 +138,98 @@ public fun CompletionStage.asDeferred(): Deferred { } /** - * Awaits for completion of the completion stage without blocking a thread. + * Converts this [CompletableFuture] to an instance of [Deferred]. + * The future is cancelled when the resulting deferred is cancelled. + * + * ### Implementation details + * + * This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone]. + */ +@Suppress("DeferredIsResult") +public fun CompletableFuture.asDeferred(): Deferred { + // Fast path if already completed + if (isDone) { + return try { + @Suppress("UNCHECKED_CAST") + CompletableDeferred(get() as T) + } catch (e: Throwable) { + // unwrap original cause from ExecutionException + val original = (e as? ExecutionException)?.cause ?: e + CompletableDeferred().also { it.completeExceptionally(original) } + } + } + // slow-path + return (this as CompletionStage).asDeferred() +} +/** + * Awaits for completion of [CompletionStage] without blocking a thread. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. - * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture]. + * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of a [Future]. * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead. + * + * ### Implementation details + * + * [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and + * to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with + * [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance + * of [CompletableFuture] there is an overload of this `await` function with a [CompletableFuture] receiver that + * has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance + * of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do + * implement a [CompletableFuture], yet throw an exception on most of the future-like methods in it. + * For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case. */ -public suspend fun CompletionStage.await(): T { +public suspend fun CompletionStage.await(): T = + suspendCancellableCoroutine { cont: CancellableContinuation -> + val consumer = ContinuationConsumer(cont) + whenComplete(consumer) + if (cont.isActive) { + // avoid creation of a lambda when continuation was already resumed during whenComplete call + // TODO: In a major release this lambda can be made to extend CancelFutureOnCompletion class from core module + // This will further save one allocated object here. + cont.invokeOnCancellation { + if (this is Future<*>) { + // mayInterruptIfRunning is not used + try { + cancel(false) + } catch (e: UnsupportedOperationException) { + // Internal JDK implementation of a Future can throw an UnsupportedOperationException here. + // We simply ignore it for the purpose of cancellation + // See https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + } + } + consumer.cont = null // shall clear reference to continuation to aid GC + } + } + } + +/** + * Awaits for completion of [CompletableFuture] without blocking a thread. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * stops waiting for the completable future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. + * This method is intended to be used with one-shot futures, so on coroutine cancellation the completable future is cancelled. + * If cancelling the future is undesired, `future.asDeferred().await()` should be used instead. + * + * ### Implementation details + * + * This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone]. + */ +public suspend fun CompletableFuture.await(): T { // fast path when CompletableFuture is already done (does not suspend) - if (this is Future<*> && isDone()) { + if (isDone) { try { - @Suppress("UNCHECKED_CAST") + @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext") return get() as T } catch (e: ExecutionException) { throw e.cause ?: e // unwrap original cause from ExecutionException } } // slow path -- suspend - return suspendCancellableCoroutine { cont: CancellableContinuation -> - val consumer = ContinuationConsumer(cont) - whenComplete(consumer) - cont.invokeOnCancellation { - // mayInterruptIfRunning is not used - (this as? CompletableFuture)?.cancel(false) - consumer.cont = null // shall clear reference to continuation to aid GC - } - } + return (this as CompletionStage).await() } private class ContinuationConsumer( diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index f75c96746c..998aaa0835 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -490,4 +490,81 @@ class FutureTest : TestBase() { } } } + + /** + * https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + */ + @Test + fun testCompletedStageAwait() = runTest { + val stage = CompletableFuture.completedStage("OK") + assertEquals("OK", stage.await()) + } + + /** + * https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + */ + @Test + fun testCompletedStageAsDeferredAwait() = runTest { + val stage = CompletableFuture.completedStage("OK") + val deferred = stage.asDeferred() + assertEquals("OK", deferred.await()) + } + + @Test + fun testCompletedStateThenApplyAwait() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.complete("O") + } + expect(2) + val stage = cf.thenApply { it + "K" } + assertEquals("OK", stage.await()) + finish(4) + } + + @Test + fun testCompletedStateThenApplyAwaitCancel() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.cancel(false) + } + expect(2) + val stage = cf.thenApply { it + "K" } + assertFailsWith { stage.await() } + finish(4) + } + + @Test + fun testCompletedStateThenApplyAsDeferredAwait() = runTest { + expect(1) + val cf = CompletableFuture() + launch { + expect(3) + cf.complete("O") + } + expect(2) + val stage = cf.thenApply { it + "K" } + val deferred = stage.asDeferred() + assertEquals("OK", deferred.await()) + finish(4) + } + + @Test + fun testCompletedStateThenApplyAsDeferredAwaitCancel() = runTest { + expect(1) + val cf = CompletableFuture() + expect(2) + val stage = cf.thenApply { it + "K" } + val deferred = stage.asDeferred() + launch { + expect(3) + deferred.cancel() // cancel the deferred! + } + assertFailsWith { stage.await() } + finish(4) + } } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 9b6bce5ce4..ecbdc523bc 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -38,7 +38,13 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + try { + future.cancel(false) + } catch (e: UnsupportedOperationException) { + // Internal JDK implementation of a Future can throw an UnsupportedOperationException here. + // We simply ignore it for the purpose of cancellation + // See https://github.com/Kotlin/kotlinx.coroutines/issues/2456 + } } } From 0472d69482b0dcda21c9c759eb12262819b67b9c Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 24 Dec 2020 18:54:50 +0300 Subject: [PATCH 2/3] ~ Better implementation via CompletionStage.toCompletableFuture --- .../api/kotlinx-coroutines-jdk8.api | 2 - .../src/future/Future.kt | 119 +++++------------- kotlinx-coroutines-core/jvm/src/Future.kt | 8 +- 3 files changed, 33 insertions(+), 96 deletions(-) diff --git a/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api b/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api index 524426bb46..4ee57845b2 100644 --- a/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api +++ b/integration/kotlinx-coroutines-jdk8/api/kotlinx-coroutines-jdk8.api @@ -1,9 +1,7 @@ public final class kotlinx/coroutines/future/FutureKt { public static final fun asCompletableFuture (Lkotlinx/coroutines/Deferred;)Ljava/util/concurrent/CompletableFuture; public static final fun asCompletableFuture (Lkotlinx/coroutines/Job;)Ljava/util/concurrent/CompletableFuture; - public static final fun asDeferred (Ljava/util/concurrent/CompletableFuture;)Lkotlinx/coroutines/Deferred; public static final fun asDeferred (Ljava/util/concurrent/CompletionStage;)Lkotlinx/coroutines/Deferred; - public static final fun await (Ljava/util/concurrent/CompletableFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun await (Ljava/util/concurrent/CompletionStage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun future (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/CompletableFuture; public static synthetic fun future$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/CompletableFuture; diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index 0fb82ce87b..8755e07c31 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -106,22 +106,24 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) { /** * Converts this [CompletionStage] to an instance of [Deferred]. - * When this completion stage is an instance of [Future], then it is cancelled when - * the resulting deferred is cancelled. * - * ### Implementation details - * - * [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and - * to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with - * [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance - * of [CompletableFuture] there is an overload of this `asDeferred` function with a [CompletableFuture] receiver that - * has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance - * of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do - * implement a [CompletableFuture], yet throw an [UnsupportedOperationException] on most of the future-like methods in it. - * For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case. + * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) + * is cancelled when the resulting deferred is cancelled. */ @Suppress("DeferredIsResult") public fun CompletionStage.asDeferred(): Deferred { + val future = toCompletableFuture() // retrieve the future + // Fast path if already completed + if (future.isDone) { + return try { + @Suppress("UNCHECKED_CAST") + CompletableDeferred(future.get() as T) + } catch (e: Throwable) { + // unwrap original cause from ExecutionException + val original = (e as? ExecutionException)?.cause ?: e + CompletableDeferred().also { it.completeExceptionally(original) } + } + } val result = CompletableDeferred() whenComplete { value, exception -> if (exception == null) { @@ -133,56 +135,34 @@ public fun CompletionStage.asDeferred(): Deferred { result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) } } - if (this is Future<*>) result.cancelFutureOnCompletion(this) + result.cancelFutureOnCompletion(future) return result } -/** - * Converts this [CompletableFuture] to an instance of [Deferred]. - * The future is cancelled when the resulting deferred is cancelled. - * - * ### Implementation details - * - * This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone]. - */ -@Suppress("DeferredIsResult") -public fun CompletableFuture.asDeferred(): Deferred { - // Fast path if already completed - if (isDone) { - return try { - @Suppress("UNCHECKED_CAST") - CompletableDeferred(get() as T) - } catch (e: Throwable) { - // unwrap original cause from ExecutionException - val original = (e as? ExecutionException)?.cause ?: e - CompletableDeferred().also { it.completeExceptionally(original) } - } - } - // slow-path - return (this as CompletionStage).asDeferred() -} /** * Awaits for completion of [CompletionStage] without blocking a thread. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. - * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of a [Future]. - * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead. * - * ### Implementation details - * - * [CompletionStage] does not extend a [Future] and does not provide future-like methods to check for completion and - * to retrieve a resulting value, so this implementation always takes a slow path of installing a callback with - * [CompletionStage.whenComplete]. For cases when [CompletionStage] is statically known to be an instance - * of [CompletableFuture] there is an overload of this `await` function with a [CompletableFuture] receiver that - * has a fast-path for a future that is already complete. Note, that it is not safe to dynamically check if an instance - * of a [CompletionStage] is an instance of a [CompletableFuture], because JDK functions return instances that do - * implement a [CompletableFuture], yet throw an exception on most of the future-like methods in it. - * For the purpose of cancelling the the future, the corresponding [UnsupportedOperationException] is ignored in this case. + * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that + * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) + * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead. */ -public suspend fun CompletionStage.await(): T = - suspendCancellableCoroutine { cont: CancellableContinuation -> +public suspend fun CompletionStage.await(): T { + val future = toCompletableFuture() // retrieve the future + // fast path when CompletableFuture is already done (does not suspend) + if (future.isDone) { + try { + @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext") + return future.get() as T + } catch (e: ExecutionException) { + throw e.cause ?: e // unwrap original cause from ExecutionException + } + } + // slow path -- suspend + return suspendCancellableCoroutine { cont: CancellableContinuation -> val consumer = ContinuationConsumer(cont) whenComplete(consumer) if (cont.isActive) { @@ -190,46 +170,11 @@ public suspend fun CompletionStage.await(): T = // TODO: In a major release this lambda can be made to extend CancelFutureOnCompletion class from core module // This will further save one allocated object here. cont.invokeOnCancellation { - if (this is Future<*>) { - // mayInterruptIfRunning is not used - try { - cancel(false) - } catch (e: UnsupportedOperationException) { - // Internal JDK implementation of a Future can throw an UnsupportedOperationException here. - // We simply ignore it for the purpose of cancellation - // See https://github.com/Kotlin/kotlinx.coroutines/issues/2456 - } - } + future.cancel(false) consumer.cont = null // shall clear reference to continuation to aid GC } } } - -/** - * Awaits for completion of [CompletableFuture] without blocking a thread. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * stops waiting for the completable future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. - * This method is intended to be used with one-shot futures, so on coroutine cancellation the completable future is cancelled. - * If cancelling the future is undesired, `future.asDeferred().await()` should be used instead. - * - * ### Implementation details - * - * This implementation has a fast-path for a case of a future that [isDone][CompletableFuture.isDone]. - */ -public suspend fun CompletableFuture.await(): T { - // fast path when CompletableFuture is already done (does not suspend) - if (isDone) { - try { - @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext") - return get() as T - } catch (e: ExecutionException) { - throw e.cause ?: e // unwrap original cause from ExecutionException - } - } - // slow path -- suspend - return (this as CompletionStage).await() } private class ContinuationConsumer( diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index ecbdc523bc..9b6bce5ce4 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -38,13 +38,7 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - try { - future.cancel(false) - } catch (e: UnsupportedOperationException) { - // Internal JDK implementation of a Future can throw an UnsupportedOperationException here. - // We simply ignore it for the purpose of cancellation - // See https://github.com/Kotlin/kotlinx.coroutines/issues/2456 - } + future.cancel(false) } } From f4beeebd7f6817dff63937f168060de5d6ece427 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 25 Dec 2020 15:05:26 +0300 Subject: [PATCH 3/3] ~ Remove unneeded optimization --- .../kotlinx-coroutines-jdk8/src/future/Future.kt | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index 8755e07c31..bb5f426a28 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -165,14 +165,9 @@ public suspend fun CompletionStage.await(): T { return suspendCancellableCoroutine { cont: CancellableContinuation -> val consumer = ContinuationConsumer(cont) whenComplete(consumer) - if (cont.isActive) { - // avoid creation of a lambda when continuation was already resumed during whenComplete call - // TODO: In a major release this lambda can be made to extend CancelFutureOnCompletion class from core module - // This will further save one allocated object here. - cont.invokeOnCancellation { - future.cancel(false) - consumer.cont = null // shall clear reference to continuation to aid GC - } + cont.invokeOnCancellation { + future.cancel(false) + consumer.cont = null // shall clear reference to continuation to aid GC } } }