-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix await/asDeferred for MinimalState implementations #2457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,16 +105,19 @@ private fun Job.setupCancellation(future: CompletableFuture<*>) { | |
} | ||
|
||
/** | ||
* Converts this completion stage to an instance of [Deferred]. | ||
* When this completion stage is an instance of [Future], then it is cancelled when | ||
* the resulting deferred is cancelled. | ||
* Converts this [CompletionStage] to an instance of [Deferred]. | ||
* | ||
* The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) | ||
* is cancelled when the resulting deferred is cancelled. | ||
*/ | ||
@Suppress("DeferredIsResult") | ||
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> { | ||
val future = toCompletableFuture() // retrieve the future | ||
// Fast path if already completed | ||
if (this is Future<*> && isDone()){ | ||
if (future.isDone) { | ||
return try { | ||
@Suppress("UNCHECKED_CAST") | ||
CompletableDeferred(get() as T) | ||
CompletableDeferred(future.get() as T) | ||
} catch (e: Throwable) { | ||
// unwrap original cause from ExecutionException | ||
val original = (e as? ExecutionException)?.cause ?: e | ||
|
@@ -132,25 +135,28 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> { | |
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) | ||
} | ||
} | ||
if (this is Future<*>) result.cancelFutureOnCompletion(this) | ||
result.cancelFutureOnCompletion(future) | ||
return result | ||
} | ||
|
||
/** | ||
* Awaits for completion of the completion stage without blocking a thread. | ||
* Awaits for completion of [CompletionStage] without blocking a thread. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. | ||
* This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture]. | ||
* If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead. | ||
* | ||
* This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that | ||
* corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture]) | ||
* is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead. | ||
*/ | ||
public suspend fun <T> CompletionStage<T>.await(): T { | ||
val future = toCompletableFuture() // retrieve the future | ||
// fast path when CompletableFuture is already done (does not suspend) | ||
if (this is Future<*> && isDone()) { | ||
if (future.isDone) { | ||
try { | ||
@Suppress("UNCHECKED_CAST") | ||
return get() as T | ||
@Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext") | ||
return future.get() as T | ||
} catch (e: ExecutionException) { | ||
throw e.cause ?: e // unwrap original cause from ExecutionException | ||
} | ||
|
@@ -159,10 +165,14 @@ public suspend fun <T> CompletionStage<T>.await(): T { | |
return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> | ||
val consumer = ContinuationConsumer(cont) | ||
whenComplete(consumer) | ||
cont.invokeOnCancellation { | ||
// mayInterruptIfRunning is not used | ||
(this as? CompletableFuture<T>)?.cancel(false) | ||
consumer.cont = null // shall clear reference to continuation to aid GC | ||
if (cont.isActive) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a big fan of optimizations that work 0.01% of the time (when future completes after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. It is indeed not needed anymore, where we (again) have fast-path via |
||
// avoid creation of a lambda when continuation was already resumed during whenComplete call | ||
// TODO: In a major release this lambda can be made to extend CancelFutureOnCompletion class from core module | ||
// This will further save one allocated object here. | ||
cont.invokeOnCancellation { | ||
future.cancel(false) | ||
consumer.cont = null // shall clear reference to continuation to aid GC | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed https://youtrack.jetbrains.com/issue/KT-44092