From dac8c2432420bd272e4cce177ac4c38264d591b2 Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Tue, 1 Dec 2020 10:59:32 +0000 Subject: [PATCH 1/4] Do not throw from JobListenableFuture.isCancelled This properly handles ExecutionException that can be thrown from getUninterruptibly. Fixed #2421. --- .../src/ListenableFuture.kt | 17 +++++++++++- .../test/ListenableFutureTest.kt | 27 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 6d1fab3d69..9590dc0724 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -385,9 +385,24 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return auxFuture.isCancelled || (isDone && Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled) + return auxFuture.isCancelled || auxFuture.completedWithCancellation } + /** + * Helper for [isCancelled] that takes into account that + * our auxiliary future can complete with [Cancelled] instance. + */ + private val SettableFuture<*>.completedWithCancellation: Boolean + get() = isDone && try { + Uninterruptibles.getUninterruptibly(this) is Cancelled + } catch (e: CancellationException) { + true + } catch (t: Throwable) { + // In theory appart from CancellationException, getUninterruptibly can only + // throw ExecutionException, but to be safe we catch Throwable here. + false + } + /** * Waits for [auxFuture] to complete by blocking, then uses its `result` * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException]. diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index dc2d99d7f7..2211295b76 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -680,6 +680,33 @@ class ListenableFutureTest : TestBase() { finish(5) } + @Test + fun testFutureCompletedExceptionally() = runTest { + val testException = TestException() + // NonCancellable to not propagate error to this scope. + val future = future(context = NonCancellable) { + throw testException + } + yield() + assertTrue(future.isDone) + assertFalse(future.isCancelled) + val thrown = assertFailsWith { future.get() } + assertEquals(testException, thrown.cause) + } + + @Test + fun testAsListenableFutureCompletedExceptionally() = runTest { + val testException = TestException() + val deferred = CompletableDeferred().apply { + completeExceptionally(testException) + } + val asListenableFuture = deferred.asListenableFuture() + assertTrue(asListenableFuture.isDone) + assertFalse(asListenableFuture.isCancelled) + val thrown = assertFailsWith { asListenableFuture.get() } + assertEquals(testException, thrown.cause) + } + private inline fun ListenableFuture<*>.checkFutureException() { val e = assertFailsWith { get() } val cause = e.cause!! From 19e10fdca2a5e4241ebeae7c452a6008d89fdd27 Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Thu, 3 Dec 2020 12:56:27 +0000 Subject: [PATCH 2/4] Only handle exceptions that getUninterruptibly declares in the javadoc --- integration/kotlinx-coroutines-guava/src/ListenableFuture.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 9590dc0724..31751d2a29 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -397,9 +397,7 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu Uninterruptibles.getUninterruptibly(this) is Cancelled } catch (e: CancellationException) { true - } catch (t: Throwable) { - // In theory appart from CancellationException, getUninterruptibly can only - // throw ExecutionException, but to be safe we catch Throwable here. + } catch (e: ExecutionException) { false } From 1aa1b70e365e77112b9596f417b1c1217a7c4349 Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Fri, 11 Dec 2020 12:43:57 +0000 Subject: [PATCH 3/4] Stress test JobListenableFuture.isCancelled If we don't catch CancellationException, the test fails with the following stack trace: Task was cancelled. java.util.concurrent.CancellationException: Task was cancelled. at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1349) at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:550) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513) at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:90) at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:237) at kotlinx.coroutines.guava.JobListenableFuture.getCompletedWithCancellation(ListenableFuture.kt:397) at kotlinx.coroutines.guava.JobListenableFuture.isCancelled(ListenableFuture.kt:388) at kotlinx.coroutines.guava.ListenableFutureTest$stressTestFutureIsCancelledDoesNotThrow$1$1$1.invokeSuspend(ListenableFutureTest.kt:721) (Coroutine boundary) at kotlinx.coroutines.guava.ListenableFutureTest$stressTestFutureIsCancelledDoesNotThrow$1.invokeSuspend(ListenableFutureTest.kt:716) (Coroutine creation stacktrace) at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122) at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30) at kotlinx.coroutines.guava.ListenableFutureTest.stressTestFutureIsCancelledDoesNotThrow(ListenableFutureTest.kt:711) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.CancellationException: Task was cancelled. at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1349) at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:550) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513) at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:90) at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:237) at kotlinx.coroutines.guava.JobListenableFuture.getCompletedWithCancellation(ListenableFuture.kt:397) at kotlinx.coroutines.guava.JobListenableFuture.isCancelled(ListenableFuture.kt:388) at kotlinx.coroutines.guava.ListenableFutureTest$stressTestFutureIsCancelledDoesNotThrow$1$1$1.invokeSuspend(ListenableFutureTest.kt:721) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665) --- .../test/ListenableFutureTest.kt | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 2211295b76..9dca9e9b46 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -707,6 +707,23 @@ class ListenableFutureTest : TestBase() { assertEquals(testException, thrown.cause) } + @Test + fun stressTestJobListenableFutureIsCancelledDoesNotThrow() = runTest { + repeat(1000) { + val deferred = CompletableDeferred() + val asListenableFuture = deferred.asListenableFuture() + // We heed two threads to test a race condition. + withContext(Dispatchers.Default) { + val cancellationJob = launch { + asListenableFuture.cancel(false) + } + while (!cancellationJob.isCompleted) { + asListenableFuture.isCancelled // Shouldn't throw. + } + } + } + } + private inline fun ListenableFuture<*>.checkFutureException() { val e = assertFailsWith { get() } val cause = e.cause!! From ef5de1291bd7eb5706c98d18b233a93b187ee423 Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Tue, 26 Jan 2021 19:00:11 +0000 Subject: [PATCH 4/4] Add an optimisation for JobListenableFuture.isCancelled `auxFutureIsFailed` is eventually set to `true` if `auxFuture` is completed with `ExecutionException`. We use `auxFutureIsFailed` to avoid unnecessary allocation of `ExecutionException` in `JobListenableFuture.isCancelled`. --- .../src/ListenableFuture.kt | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 31751d2a29..299cfd7ec3 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -347,6 +347,17 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu */ private val auxFuture = SettableFuture.create() + /** + * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException]. + * + * Note: this is eventually consistent with the state of [auxFuture]. + * + * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException] + * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation + * we use this field. + */ + private var auxFutureIsFailed: Boolean = false + /** * When the attached coroutine [isCompleted][Job.isCompleted] successfully * its outcome should be passed to this method. @@ -366,7 +377,8 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // CancellationException is wrapped into `Cancelled` to preserve original cause and message. // All the other exceptions are delegated to SettableFuture.setException. fun completeExceptionallyOrCancel(t: Throwable): Boolean = - if (t is CancellationException) auxFuture.set(Cancelled(t)) else auxFuture.setException(t) + if (t is CancellationException) auxFuture.set(Cancelled(t)) + else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true } /** * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to @@ -385,21 +397,17 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return auxFuture.isCancelled || auxFuture.completedWithCancellation - } - - /** - * Helper for [isCancelled] that takes into account that - * our auxiliary future can complete with [Cancelled] instance. - */ - private val SettableFuture<*>.completedWithCancellation: Boolean - get() = isDone && try { - Uninterruptibles.getUninterruptibly(this) is Cancelled + return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try { + Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled } catch (e: CancellationException) { + // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false. true } catch (e: ExecutionException) { + // `auxFutureIsFailed` hasn't been updated yet. + auxFutureIsFailed = true false } + } /** * Waits for [auxFuture] to complete by blocking, then uses its `result`