File tree 2 files changed +64
-4
lines changed
2 files changed +64
-4
lines changed Original file line number Diff line number Diff line change @@ -311,10 +311,17 @@ internal class SelectBuilderImpl<in R>(
311
311
internal fun handleBuilderException (e : Throwable ) {
312
312
if (trySelect(null )) {
313
313
resumeWithException(e)
314
- } else {
315
- // Cannot handle this exception -- builder was already resumed with a different exception,
316
- // so treat it as "unhandled exception"
317
- handleCoroutineException(context, e)
314
+ } else if (e !is CancellationException ) {
315
+ /*
316
+ * Cannot handle this exception -- builder was already resumed with a different exception,
317
+ * so treat it as "unhandled exception". But only if it is not the completion reason
318
+ * and it's not the cancellation. Otherwise, in the face of structured concurrency
319
+ * the same exception will be reported to theglobal exception handler.
320
+ */
321
+ val result = getResult()
322
+ if (result !is CompletedExceptionally || unwrap(result.cause) != = unwrap(e)) {
323
+ handleCoroutineException(context, e)
324
+ }
318
325
}
319
326
}
320
327
Original file line number Diff line number Diff line change
1
+ /*
2
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.flow
6
+
7
+ import kotlinx.coroutines.*
8
+ import org.junit.*
9
+
10
+ class CombineStressTest : TestBase () {
11
+
12
+ @Test
13
+ public fun testCancellation () = runTest {
14
+ withContext(Dispatchers .Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
15
+ flow {
16
+ expect(1 )
17
+ repeat(1_000 * stressTestMultiplier) {
18
+ emit(it)
19
+ }
20
+ }.flatMapLatest {
21
+ combine(flowOf(it), flowOf(it)) { arr -> arr[0 ] }
22
+ }.collect()
23
+ finish(2 )
24
+ reset()
25
+ }
26
+ }
27
+
28
+ @Test
29
+ public fun testFailure () = runTest {
30
+ val innerIterations = 100 * stressTestMultiplierSqrt
31
+ val outerIterations = 10 * stressTestMultiplierSqrt
32
+ withContext(Dispatchers .Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
33
+ repeat(outerIterations) {
34
+ try {
35
+ flow {
36
+ expect(1 )
37
+ repeat(innerIterations) {
38
+ emit(it)
39
+ }
40
+ }.flatMapLatest {
41
+ combine(flowOf(it), flowOf(it)) { arr -> arr[0 ] }
42
+ }.onEach {
43
+ if (it >= innerIterations / 2 ) throw TestException ()
44
+ }.collect()
45
+ } catch (e: TestException ) {
46
+ expect(2 )
47
+ }
48
+ finish(3 )
49
+ reset()
50
+ }
51
+ }
52
+ }
53
+ }
You can’t perform that action at this time.
0 commit comments