Skip to content

Commit 8d6c1a9

Browse files
qwwdfsadelizarov
authored andcommitted
Propagate cancellation on awaiting Completable and Listenable futures, provide ListenableFuture.asDeferred
Rationale: in most common use-cases awaiting the future is required to integrate with future-based API, current coroutine is the only user of this future and after its cancellation this future result is no longer needed. For non-cancelling await future.asDeferred().await() should be used Fixes #515
1 parent 25b4388 commit 8d6c1a9

File tree

5 files changed

+166
-10
lines changed

5 files changed

+166
-10
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-guava.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
public final class kotlinx/coroutines/experimental/guava/ListenableFutureKt {
2+
public static final fun asDeferred (Lcom/google/common/util/concurrent/ListenableFuture;)Lkotlinx/coroutines/experimental/Deferred;
23
public static final fun asListenableFuture (Lkotlinx/coroutines/experimental/Deferred;)Lcom/google/common/util/concurrent/ListenableFuture;
34
public static final fun await (Lcom/google/common/util/concurrent/ListenableFuture;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
45
public static final fun future (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lcom/google/common/util/concurrent/ListenableFuture;

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

+34-4
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,46 @@ private class DeferredListenableFuture<T>(
149149
override fun interruptTask() { deferred.cancel() }
150150
}
151151

152+
/**
153+
* Converts this listenable future to an instance of [Deferred].
154+
* It is cancelled when the resulting deferred is cancelled.
155+
*/
156+
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
157+
// Fast path if already completed
158+
if (isDone) {
159+
return try {
160+
@Suppress("UNCHECKED_CAST")
161+
CompletableDeferred(get() as T)
162+
} catch (e: Throwable) {
163+
// unwrap original cause from ExecutionException
164+
val original = (e as? ExecutionException)?.cause ?: e
165+
CompletableDeferred<T>().also { it.completeExceptionally(original) }
166+
}
167+
}
168+
val deferred = CompletableDeferred<T>()
169+
Futures.addCallback(this, object : FutureCallback<T> {
170+
override fun onSuccess(result: T?) {
171+
deferred.complete(result!!)
172+
}
173+
174+
override fun onFailure(t: Throwable) {
175+
deferred.completeExceptionally(t)
176+
}
177+
}, MoreExecutors.directExecutor())
178+
179+
deferred.invokeOnCompletion { cancel(false) }
180+
return deferred
181+
}
182+
152183
/**
153184
* Awaits for completion of the future without blocking a thread.
154185
*
155186
* This suspending function is cancellable.
156187
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
157188
* stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.experimental.CancellationException].
158189
*
159-
* Note, that `ListenableFuture` does not support removal of installed listeners, so on cancellation of this wait
160-
* a few small objects will remain in the `ListenableFuture` list of listeners until the future completes. However, the
161-
* care is taken to clear the reference to the waiting coroutine itself, so that its memory can be released even if
162-
* the future never completes.
190+
* This method is intended to be used with one-shot futures, so on coroutine cancellation future is cancelled as well.
191+
* If cancelling given future is undesired, `future.asDeferred().await()` should be used instead.
163192
*/
164193
public suspend fun <T> ListenableFuture<T>.await(): T {
165194
try {
@@ -172,6 +201,7 @@ public suspend fun <T> ListenableFuture<T>.await(): T {
172201
val callback = ContinuationCallback(cont)
173202
Futures.addCallback(this, callback, MoreExecutors.directExecutor())
174203
cont.invokeOnCancellation {
204+
cancel(false)
175205
callback.cont = null // clear the reference to continuation from the future's callback
176206
}
177207
}

integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

+88
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import kotlinx.coroutines.experimental.CancellationException
1010
import org.hamcrest.core.*
1111
import org.junit.*
1212
import org.junit.Assert.*
13+
import org.junit.Test
1314
import java.io.*
1415
import java.util.concurrent.*
16+
import kotlin.test.assertFailsWith
1517

1618
class ListenableFutureTest : TestBase() {
1719
@Before
@@ -187,4 +189,90 @@ class ListenableFutureTest : TestBase() {
187189
yield() // yield main thread to job
188190
finish(6)
189191
}
192+
193+
@Test
194+
fun testFutureCancellation() = runTest {
195+
val future = awaitFutureWithCancel(true)
196+
assertTrue(future.isCancelled)
197+
assertFailsWith<CancellationException> { future.get() }
198+
finish(4)
199+
}
200+
201+
@Test
202+
fun testNoFutureCancellation() = runTest {
203+
val future = awaitFutureWithCancel(false)
204+
assertFalse(future.isCancelled)
205+
assertEquals(42, future.get())
206+
finish(4)
207+
}
208+
209+
@Test
210+
fun testCompletedFutureAsDeferred() = runTest {
211+
val future = SettableFuture.create<Int>()
212+
val task = async {
213+
expect(2)
214+
assertEquals(42, future.asDeferred().await())
215+
expect(4)
216+
}
217+
218+
expect(1)
219+
yield()
220+
expect(3)
221+
future.set(42)
222+
task.join()
223+
finish(5)
224+
}
225+
226+
@Test
227+
fun testFailedFutureAsDeferred() = runTest {
228+
val future = SettableFuture.create<Int>().apply {
229+
setException(TestException())
230+
}
231+
val deferred = future.asDeferred()
232+
assertTrue(deferred.isCompletedExceptionally)
233+
val completionException = deferred.getCompletionExceptionOrNull()!!
234+
assertTrue(completionException is TestException)
235+
236+
try {
237+
deferred.await()
238+
expectUnreached()
239+
} catch (e: Exception) {
240+
assertTrue(e is TestException)
241+
}
242+
}
243+
244+
@Test
245+
fun testThrowingFutureAsDeferred() = runTest {
246+
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
247+
val future = executor.submit(Callable { throw TestException() })
248+
val deferred = async {
249+
future.asDeferred().await()
250+
}
251+
252+
try {
253+
deferred.await()
254+
expectUnreached()
255+
} catch (e: Exception) {
256+
assertTrue(e is TestException)
257+
}
258+
}
259+
260+
private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> {
261+
val latch = CountDownLatch(1)
262+
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
263+
val future = executor.submit(Callable { latch.await(); 42 })
264+
val deferred = async {
265+
expect(2)
266+
if (cancellable) future.await()
267+
else future.asDeferred().await()
268+
}
269+
expect(1)
270+
yield()
271+
deferred.cancel()
272+
expect(3)
273+
latch.countDown()
274+
return future
275+
}
276+
277+
private class TestException : Exception()
190278
}

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,8 @@ public suspend fun <T> CompletableFuture<T>.await(): T =
190190
* This suspending function is cancellable.
191191
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
192192
* stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.experimental.CancellationException].
193-
*
194-
* Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait
195-
* a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself.
196-
* However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
197-
* released even if the completion stage never completes.
193+
* This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
194+
* If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
198195
*/
199196
public suspend fun <T> CompletionStage<T>.await(): T {
200197
// fast path when CompletableFuture is already done (does not suspend)
@@ -211,7 +208,9 @@ public suspend fun <T> CompletionStage<T>.await(): T {
211208
val consumer = ContinuationConsumer(cont)
212209
whenComplete(consumer)
213210
cont.invokeOnCancellation {
214-
consumer.cont = null // shall clear reference to continuation
211+
// mayInterruptIfRunning is not used
212+
(this as? CompletableFuture<T>)?.cancel(false)
213+
consumer.cont = null // shall clear reference to continuation to aid GC
215214
}
216215
}
217216
}

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

+38
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import kotlinx.coroutines.experimental.CancellationException
99
import org.hamcrest.core.*
1010
import org.junit.*
1111
import org.junit.Assert.*
12+
import org.junit.Test
1213
import java.util.concurrent.*
1314
import java.util.concurrent.atomic.*
1415
import java.util.concurrent.locks.*
1516
import java.util.function.*
1617
import kotlin.concurrent.*
1718
import kotlin.coroutines.experimental.*
19+
import kotlin.test.assertFailsWith
1820

1921
class FutureTest : TestBase() {
2022
@Before
@@ -332,6 +334,42 @@ class FutureTest : TestBase() {
332334
assertEquals("value", result)
333335
}
334336

337+
@Test
338+
fun testFutureCancellation() = runTest {
339+
val future = awaitFutureWithCancel(true)
340+
assertTrue(future.isCompletedExceptionally)
341+
assertFailsWith<CancellationException> { future.get() }
342+
finish(4)
343+
}
344+
345+
@Test
346+
fun testNoFutureCancellation() = runTest {
347+
val future = awaitFutureWithCancel(false)
348+
assertFalse(future.isCompletedExceptionally)
349+
assertEquals(239, future.get())
350+
finish(4)
351+
}
352+
353+
private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): CompletableFuture<Int> {
354+
val latch = CountDownLatch(1)
355+
val future = CompletableFuture.supplyAsync({
356+
latch.await()
357+
239
358+
})
359+
360+
val deferred = async {
361+
expect(2)
362+
if (cancellable) future.await()
363+
else future.asDeferred().await()
364+
}
365+
expect(1)
366+
yield()
367+
deferred.cancel()
368+
expect(3)
369+
latch.countDown()
370+
return future
371+
}
372+
335373
class TestException(message: String) : Exception(message)
336374

337375
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {

0 commit comments

Comments
 (0)