Skip to content

Commit 937180f

Browse files
authored
Prevent StackOverflowError in CompletableFuture.asDeferred and proper… (#2731)
Prevent StackOverflowError in CompletableFuture.asDeferred and properly report exceptions from completion handlers * It turned out that 'cancel' on completed future tries to help and invoke 'whenComplete' handlers that also invoke 'cancel' on the very same future * Use top-level exception handler as a last resort to deliver an exception Fixes #2730
1 parent 5121005 commit 937180f

File tree

7 files changed

+144
-14
lines changed

7 files changed

+144
-14
lines changed

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,13 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
136136
override fun onSuccess(result: T?) {
137137
// Here we work with flexible types, so we unchecked cast to trick the type system
138138
@Suppress("UNCHECKED_CAST")
139-
deferred.complete(result as T)
139+
runCatching { deferred.complete(result as T) }
140+
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
140141
}
141142

142143
override fun onFailure(t: Throwable) {
143-
deferred.completeExceptionally(t)
144+
runCatching { deferred.completeExceptionally(t) }
145+
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
144146
}
145147
}, MoreExecutors.directExecutor())
146148

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.guava
6+
7+
import com.google.common.util.concurrent.*
8+
import kotlinx.coroutines.*
9+
import org.junit.*
10+
import org.junit.Test
11+
import kotlin.test.*
12+
13+
class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {
14+
15+
// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
16+
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
17+
private lateinit var caughtException: Throwable
18+
19+
@Before
20+
fun setUp() {
21+
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
22+
}
23+
24+
@After
25+
fun tearDown() {
26+
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
27+
}
28+
29+
@Test
30+
fun testLostExceptionOnSuccess() = runTest {
31+
val future = SettableFuture.create<Int>()
32+
val deferred = future.asDeferred()
33+
deferred.invokeOnCompletion { throw TestException() }
34+
future.set(1)
35+
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
36+
}
37+
38+
@Test
39+
fun testLostExceptionOnFailure() = runTest {
40+
val future = SettableFuture.create<Int>()
41+
val deferred = future.asDeferred()
42+
deferred.invokeOnCompletion { throw TestException() }
43+
future.setException(TestException2())
44+
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
45+
}
46+
}

integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.junit.Ignore
1111
import org.junit.Test
1212
import java.util.concurrent.*
1313
import java.util.concurrent.CancellationException
14+
import java.util.concurrent.atomic.*
1415
import kotlin.test.*
1516

1617
class ListenableFutureTest : TestBase() {
@@ -755,4 +756,23 @@ class ListenableFutureTest : TestBase() {
755756
future(start = CoroutineStart.ATOMIC) { }
756757
future(start = CoroutineStart.UNDISPATCHED) { }
757758
}
759+
760+
@Test
761+
fun testStackOverflow() = runTest {
762+
val future = SettableFuture.create<Int>()
763+
val completed = AtomicLong()
764+
val count = 10000L
765+
val children = ArrayList<Job>()
766+
for (i in 0 until count) {
767+
children += launch(Dispatchers.Default) {
768+
future.asDeferred().await()
769+
completed.incrementAndGet()
770+
}
771+
}
772+
future.set(1)
773+
withTimeout(60_000) {
774+
children.forEach { it.join() }
775+
assertEquals(count, completed.get())
776+
}
777+
}
758778
}

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,18 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
126126
}
127127
val result = CompletableDeferred<T>()
128128
whenComplete { value, exception ->
129-
if (exception == null) {
130-
// the future has completed normally
131-
result.complete(value)
132-
} else {
133-
// the future has completed with an exception, unwrap it consistently with fast path
134-
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
135-
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
129+
try {
130+
if (exception == null) {
131+
// the future has completed normally
132+
result.complete(value)
133+
} else {
134+
// the future has completed with an exception, unwrap it consistently with fast path
135+
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
136+
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
137+
}
138+
} catch (e: Throwable) {
139+
// We come here iff the internals of Deferred threw an exception during its completion
140+
handleCoroutineException(EmptyCoroutineContext, e)
136141
}
137142
}
138143
result.cancelFutureOnCompletion(future)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package future
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.future.*
9+
import org.junit.*
10+
import org.junit.Test
11+
import java.util.concurrent.*
12+
import kotlin.test.*
13+
14+
class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {
15+
16+
// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
17+
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
18+
private lateinit var caughtException: Throwable
19+
20+
@Before
21+
fun setUp() {
22+
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
23+
}
24+
25+
@After
26+
fun tearDown() {
27+
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
28+
}
29+
30+
@Test
31+
fun testLostException() = runTest {
32+
val future = CompletableFuture<Int>()
33+
val deferred = future.asDeferred()
34+
deferred.invokeOnCompletion { throw TestException() }
35+
future.complete(1)
36+
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
37+
}
38+
}

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,4 +575,23 @@ class FutureTest : TestBase() {
575575
future(start = CoroutineStart.ATOMIC) { }
576576
future(start = CoroutineStart.UNDISPATCHED) { }
577577
}
578+
579+
@Test
580+
fun testStackOverflow() = runTest {
581+
val future = CompletableFuture<Int>()
582+
val completed = AtomicLong()
583+
val count = 10000L
584+
val children = ArrayList<Job>()
585+
for (i in 0 until count) {
586+
children += launch(Dispatchers.Default) {
587+
future.asDeferred().await()
588+
completed.incrementAndGet()
589+
}
590+
}
591+
future.complete(1)
592+
withTimeout(60_000) {
593+
children.forEach { it.join() }
594+
assertEquals(count, completed.get())
595+
}
596+
}
578597
}

kotlinx-coroutines-core/jvm/src/Future.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@ import java.util.concurrent.*
1313
* Cancels a specified [future] when this job is cancelled.
1414
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
1515
* ```
16-
* invokeOnCompletion { future.cancel(false) }
16+
* invokeOnCompletion { if (it != null) future.cancel(false) }
1717
* ```
1818
*
1919
* @suppress **This an internal API and should not be used from general code.**
2020
*/
2121
@InternalCoroutinesApi
2222
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
23-
invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well?
23+
invokeOnCompletion(handler = CancelFutureOnCompletion(future))
2424

2525
/**
2626
* Cancels a specified [future] when this job is cancelled.
2727
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
2828
* ```
29-
* invokeOnCancellation { future.cancel(false) }
29+
* invokeOnCancellation { if (it != null) future.cancel(false) }
3030
* ```
3131
*/
3232
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
@@ -38,15 +38,15 @@ private class CancelFutureOnCompletion(
3838
override fun invoke(cause: Throwable?) {
3939
// Don't interrupt when cancelling future on completion, because no one is going to reset this
4040
// interruption flag and it will cause spurious failures elsewhere
41-
future.cancel(false)
41+
if (cause != null) future.cancel(false)
4242
}
4343
}
4444

4545
private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() {
4646
override fun invoke(cause: Throwable?) {
4747
// Don't interrupt when cancelling future on completion, because no one is going to reset this
4848
// interruption flag and it will cause spurious failures elsewhere
49-
future.cancel(false)
49+
if (cause != null) future.cancel(false)
5050
}
5151
override fun toString() = "CancelFutureOnCancel[$future]"
5252
}

0 commit comments

Comments
 (0)