-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Repaired some of ListenableFuture.kt's cancellation corner cases #1347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This fixes: - Cancellation without an untrapped CancellationException propagating through a Callback; isCancelled() is the correct way to check for cancellation - Bidirectional propagation of cancellation through `asListenableFuture()` - The cause getting lost in the `asListenableFuture()` future when cancelling its `Deferred` parent with a cause This also: - Extensively documents the package and the contracts created by the promise-creating extension methods and `future()` - Uses `getUninterruptibly()` for speed - Uses `AbstractFuture` to make as certain as possible that `Future.cancel()` will return `true` at most once - Should clear up rare spooky race conditions around cancellation/interruption in hybrid Coroutines/Guava Futures codebases There are probably a few more interesting corner cases hiding in here, but this should be a good start improving the correctness of `.guava`'s adapters.
Should resolve #1346 |
… on completed deferred in StackTraceRecoveryTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job, thanks for the contribution and detailed exploration of the cancellation.
} | ||
} | ||
|
||
val deferred = CompletableDeferred<T>() | ||
Futures.addCallback(this, object : FutureCallback<T> { | ||
override fun onSuccess(result: T?) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override fun onSuccess(result: T?) {
deferred.complete(result!!)
}
will throw KNPE for nullable result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Good catch.
In my first read, I thought the unchecked cast was equivalent to the !! assert. Re-reading, the previous behaviour was to catch and completeExceptionally()
with a KNPE. I've restored that behaviour with a version that won't try/catch for the null case. I've added a test and documentation to make sure the behaviour doesn't change unnoticed again.
The fundamental problem is that ListenableFuture<T>
can complete with a null value, and CompletableDeferred<T>
(and Deferred) cannot. Offhand, I don't think there's a better solution than this. Using the type LF<T?>
in the method seems like a high cost.
I started addressing cancellation, not nullability, but here we are with bonus test cases. :)
* Completion is non-atomic between the two promises. | ||
* | ||
* When either promise successfully completes, it will attempt to synchronously complete its | ||
* counterpart with the same value. This will succeed barring a race with cancellation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When either promise successfully completes
This is a bit misleading statement as resulting Deferred
cannot be completed (either with value or exception) independently, it can only be cancelled.
Yes, someone can cast resulting deferred to CompleteableDeferred
, but in that case, the behaviour is beyond public API scope. Let's keep documentation as short and concise as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "completes" I mean "reaches Cancelled or Completed state" for a Deferred
, or "finishes" for a Future
. I'm referring to the promise ending, not its being completed in the sense of calling CompletableDeferred.complete()
.
Is there another way I can phrase this to make that intention clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say something like:
If the given future completes (either exceptionally or normally) it will try to complete the resulting deferred with the same value. This will succeed barring a race with cancellation.
If the resulting deferred is manually [cancelled][Deferred.cancel()], it will try to propagate the cancellation to the original future
Additionally, it would be nice to move paragraphs below "Cancellation is propagated bidirectionally." to the method body from the public KDoc. This behaviour (at least now, thanks for your fix!) now is natural and intuitive, so it's not worth to have a long KDocs that may scare away newcomers.
And there always will be a huge comment in the implementation explaining all the details for curious ones as it is done in e.g. most of the java.util.concurrent
classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Moved the details inside the method as a block comment.
integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Outdated
Show resolved
Hide resolved
integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Outdated
Show resolved
Hide resolved
This anticipates a new mechanism in a future Android Gradle Plugin (3.6.0+) that enables version targeting of ProGuard/R8 rules. It does not change any behavior for current users of the library. Once the new plugin is used, the R8 optimization will be read automatically. It also contains tests. Co-authored-by: Wojtek Kaliciński <[email protected]> Co-authored-by: Jake Wharton <[email protected]>
* Mention in the guide that `-ea` turns it on. * Give a link to DEBUG_PROPERTY_NAME instead of newCoroutineContext. The later does not have details on its page anymore, since the details were only mentioned in JVM version. * Move description in of debugging facilities to DEBUG_PROPERTY_NAME in the code.
This represents a class in the RxJava library. Not saying US English is better, but it is. https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/subjects/BehaviorSubject.java
Concurrent work is already properly decomposed and does not expose an "partial cancellation" behaviour as other operators may do Fixes Kotlin#1334
…of users who depend on any reactive library and kotlinx.coroutines
These exclusions are obsolete since Kotlin 1.3.40. Resolves Kotlin#1405
* Implementation detail (launch on each value) is leaking into upstream behaviour * The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel) Fixes Kotlin#1392
Kotlin#1409) * Use setTimeout-based dispatcher when process is not available on the target runtime Fixes Kotlin#1404
* Improved documentation * Detailed error messages
Any further feedback? I'd like to get this finished up. |
Sorry, I'm busy with 1.3 release and internal activities, will return to your PR on Friday |
No worries! Just wanted to make sure it's not forgotten. |
This will make Flow API surface more orthogonal with less operators to remember. Both of them can be easily written without too much additional code and still produce quite readable and easy to understand code: delayFlow(time) = onStart { delay(time) } delayEach(time) = onEach { delay(time) } Fixes Kotlin#1429
…race recovery to properly deal with Android's minifier Fixes Kotlin#1416
Kotlin#1426) * Properly enforce flow invariant when flow is used from "suspend fun main" or artificially started coroutine (e.g. by block.startCoroutine(...)) Fixes Kotlin#1421
Kotlin#1420) Get rid of NonRecoverableThrowable, mention way to opt-out stacktrace recovery in debugging.md
…duce. Previously it was not possible due to not implemented Kotlin#1235
…n#1388) * Allocate underlying buffer in ArrayChannel in on-demand manner Rationale: Such change will allow us to use huge buffers in various flow operators without having a serious footprint in suspension-free scenarios
…Kotlin#1377) * Propagation of the coroutine context of await calls into Mono/Flux builder * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher * Flow.asFlux transform * Optimized FlowSubscription * kotlinx.coroutines.reactor.flow package is replaced with kotlinx.coroutines.reactor Fixes Kotlin#284
Kotlin#1368) * Fully copy CoroutineInfo for DebugProbes.dumpCoroutinesInfo, it is required for IDEA integration (field is left as internal deliberately) * Make CoroutineInfo non-data class
* Operator renamed to combine * Introduced combineTransform operator with custom transformer * Decouple API and implementation details to improve user experience from IDE * combine(Iterable<Flow>) and combineTransform(Iterable<Flow>) are introduced Fixes Kotlin#1224 Fixes Kotlin#1262
…nd to have a consistent and meaningful naming scheme for the rest of the 'latest' operators * Make flatMapLatest pure, do not leak cancellation behaviour to downstream * Make *latest buffered by default to amortize constant re-dispatch cost * Introducing transformLatest * Introducing mapLatest Fixes Kotlin#1335
# Conflicts: # docs/coroutine-context-and-dispatchers.md
@@ -68,6 +282,7 @@ private class ListenableFutureCoroutine<T>( | |||
future.set(value) | |||
} | |||
|
|||
// TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does not.
Potentially, we can cancel the future if the coroutine was canceled, though in that case an additional diagnostic message will be lost.
For example:
val future = scope.future { ... }
scope.cancel(CancellationException("Graceful shutdown"))
// or even subtype, e.g. scope.cancel(TimeoutCancellationException())
Now resulting future will be cancelled, but get()
will throw a plain old CancellationException
without additional diagnostics.
Internally, LF already has a mechanism to supply a proper cause (Cancellation
class), but public API for supplying cause or a custom CE is missing and it would be a nice API to have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To have a bidirectional cancellation, something like this can be done in ListenableFutureCoroutine
:
private var wasCancelled = false
override fun cancelInternal(cause: Throwable?): Boolean {
val cancelled = super.cancelInternal(cause)
wasCancelled = wasCancelled || cancelled
return cancelled
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
// cause check is required because some of the children of the coroutine could have failed during cancellation process
if (wasCancelled && cause is CancellationException) {
future.cancel(true)
return
} // or false ?
if (!future.setException(cause) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
handleCoroutineException(context, cause)
}
}
* Completion is non-atomic between the two promises. | ||
* | ||
* When either promise successfully completes, it will attempt to synchronously complete its | ||
* counterpart with the same value. This will succeed barring a race with cancellation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say something like:
If the given future completes (either exceptionally or normally) it will try to complete the resulting deferred with the same value. This will succeed barring a race with cancellation.
If the resulting deferred is manually [cancelled][Deferred.cancel()], it will try to propagate the cancellation to the original future
Additionally, it would be nice to move paragraphs below "Cancellation is propagated bidirectionally." to the method body from the public KDoc. This behaviour (at least now, thanks for your fix!) now is natural and intuitive, so it's not worth to have a long KDocs that may scare away newcomers.
And there always will be a huge comment in the implementation explaining all the details for curious ones as it is done in e.g. most of the java.util.concurrent
classes
*/ | ||
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { | ||
// Fast path if already completed. | ||
// TODO: There's a faster fast path - InternalFutureFailureAccess - in Guava >= 27.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Master and develop is already on Guava 28.
It would be nice if you've changed the target branch to develop
and rebased your branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Will send another PR with the latest requested changes.
Let's see if I still know how to hold git right...
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { | ||
// Fast path if already completed. | ||
// TODO: There's a faster fast path - InternalFutureFailureAccess - in Guava >= 27.0 | ||
if (isDone && !isCancelled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is cancellation not part of the fast-path just for the sake of simplicity?
* - Fully correct cancellation and listener happens-after obeying [Future] and | ||
* [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve. | ||
* The best way to be correct, especially given the fun corner cases from | ||
* [AsyncFuture.setAsync], is to just use an [AsyncFuture]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no such class as AsyncFuture
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractFuture. Not sure how I got that one wrong.
Fixed.
Almost there :) |
This fixes: - Cancellation without an untrapped CancellationException propagating through a Callback; isCancelled() is the correct way to check for cancellation - Bidirectional propagation of cancellation through `asListenableFuture()` - The cause getting lost in the `asListenableFuture()` future when cancelling its `Deferred` parent with a cause This also: - Extensively documents the package and the contracts created by the promise-creating extension methods and `future()` - Uses `getUninterruptibly()` for speed - Uses `AbstractFuture` to make as certain as possible that `Future.cancel()` will return `true` at most once - Should clear up rare spooky race conditions around cancellation/interruption in hybrid Coroutines/Guava Futures codebases There are probably a few more interesting corner cases hiding in here, but this should be a good start improving the correctness of `.guava`'s adapters. This is a squash commit of kotlin/pr/1347, rebased on develop: - Incorporated first-round feedback. - Merged CancellationToCoroutine into ListenableFutureCoroutine to save an allocation. - Documented and tested for null completion of asDeferred()'s parent Future. - Renamed a cancellation test case for clarity of purpose. - Split asDeferred() documentation between KDoc/details
Rebase on develop w/ some improvements: #1441 |
This fixes:
through a Callback; isCancelled() is the correct way to check for
cancellation
asListenableFuture()
asListenableFuture()
future whencancelling its
Deferred
parent with a causeThis also:
promise-creating extension methods and
future()
getUninterruptibly()
for speedAbstractFuture
to make as certain as possible thatFuture.cancel()
will returntrue
at most oncecancellation/interruption in hybrid Coroutines/Guava Futures
codebases
There are probably a few more interesting corner cases hiding in here,
but this should be a good start improving the correctness of
.guava
'sadapters.