forked from Kotlin/kotlinx.coroutines
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathObservableSourceAsFlowStressTest.kt
89 lines (71 loc) · 2.8 KB
/
ObservableSourceAsFlowStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.*
import java.util.concurrent.*
class ObservableSourceAsFlowStressTest : TestBase() {
private val maxIterations = 25
private val collectorJobCancelDelay = 1_000L
private val nextIterationDelay = collectorJobCancelDelay * 2
private var jobCancelledException: Throwable? = null
private val exceptionHandler = { throwable: Throwable ->
jobCancelledException = extractJobCancelledException(throwable)
}
@Before
fun setup() {
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}
@Test
fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) {
val collectorThread = newSingleThreadContext("Collector Thread")
val cancellerThread = newSingleThreadContext("Canceller Thread")
val scope = CoroutineScope(Job())
var iteration = 0
while (jobCancelledException == null && iteration < maxIterations) {
scope.runIteration(collectorThread, cancellerThread)
iteration += 1
Thread.sleep(nextIterationDelay)
collectorThread.cancel()
cancellerThread.cancel()
}
collectorThread.close()
cancellerThread.close()
scope.cancel()
jobCancelledException?.also {
throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it)
}
}
private fun CoroutineScope.runIteration(
collectorThread: ExecutorCoroutineDispatcher,
cancellerThread: ExecutorCoroutineDispatcher
) {
val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS)
val collectorJob = launch(collectorThread) {
outerObservable
.asFlow()
.flatMapLatest {
innerObservable.asFlow()
}
.collect { delay(100) }
}
launch(cancellerThread) {
delay(collectorJobCancelDelay)
collectorJob.cancel()
}
}
private fun extractJobCancelledException(throwable: Throwable): Throwable? {
if (throwable is UndeliverableException) {
if (throwable.cause !is InterruptedException) return throwable.cause
}
if (throwable is InterruptedException) {
if (throwable.cause !is InterruptedException) return throwable.cause
}
return throwable
}
}