From a84575c1f5d2904b3f03a549c5be6eace966ea79 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 12 Aug 2019 15:02:13 +0300 Subject: [PATCH] Do not report already handled exception in select builder Fixes #1433 --- .../common/src/selects/Select.kt | 15 ++++-- .../jvm/test/flow/CombineStressTest.kt | 53 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index b42fde3e07..9555f2b9b9 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -311,10 +311,17 @@ internal class SelectBuilderImpl( internal fun handleBuilderException(e: Throwable) { if (trySelect(null)) { resumeWithException(e) - } else { - // Cannot handle this exception -- builder was already resumed with a different exception, - // so treat it as "unhandled exception" - handleCoroutineException(context, e) + } else if (e !is CancellationException) { + /* + * Cannot handle this exception -- builder was already resumed with a different exception, + * so treat it as "unhandled exception". But only if it is not the completion reason + * and it's not the cancellation. Otherwise, in the face of structured concurrency + * the same exception will be reported to theglobal exception handler. + */ + val result = getResult() + if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) { + handleCoroutineException(context, e) + } } } diff --git a/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt new file mode 100644 index 0000000000..3b5c36f9e9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* + +class CombineStressTest : TestBase() { + + @Test + public fun testCancellation() = runTest { + withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + flow { + expect(1) + repeat(1_000 * stressTestMultiplier) { + emit(it) + } + }.flatMapLatest { + combine(flowOf(it), flowOf(it)) { arr -> arr[0] } + }.collect() + finish(2) + reset() + } + } + + @Test + public fun testFailure() = runTest { + val innerIterations = 100 * stressTestMultiplierSqrt + val outerIterations = 10 * stressTestMultiplierSqrt + withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + repeat(outerIterations) { + try { + flow { + expect(1) + repeat(innerIterations) { + emit(it) + } + }.flatMapLatest { + combine(flowOf(it), flowOf(it)) { arr -> arr[0] } + }.onEach { + if (it >= innerIterations / 2) throw TestException() + }.collect() + } catch (e: TestException) { + expect(2) + } + finish(3) + reset() + } + } + } +}