|
5 | 5 | package kotlinx.coroutines.rx3
|
6 | 6 |
|
7 | 7 | import io.reactivex.rxjava3.core.*
|
8 |
| -import io.reactivex.rxjava3.plugins.* |
| 8 | +import io.reactivex.rxjava3.exceptions.* |
9 | 9 | import kotlinx.coroutines.*
|
10 | 10 | import kotlinx.coroutines.CancellationException
|
11 | 11 | import org.junit.*
|
12 | 12 | import org.junit.Test
|
13 | 13 | import java.util.concurrent.*
|
| 14 | +import java.util.concurrent.atomic.* |
14 | 15 | import kotlin.test.*
|
15 | 16 |
|
16 | 17 | class ObservableTest : TestBase() {
|
@@ -140,24 +141,26 @@ class ObservableTest : TestBase() {
|
140 | 141 |
|
141 | 142 | @Test
|
142 | 143 | fun testExceptionAfterCancellation() {
|
143 |
| - // Test that no exceptions were reported to the global EH (it will fail the test if so) |
144 |
| - val handler = { e: Throwable -> |
145 |
| - assertFalse(e is CancellationException) |
| 144 | + // Test that no exceptions were reported to the coroutine EH (it will fail the test if so) |
| 145 | + val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() } |
| 146 | + val rxExceptionHandlerInvocations = AtomicInteger() |
| 147 | + val rxExceptionHandler: (Throwable) -> Unit = { error -> |
| 148 | + assertTrue(error is UndeliverableException) |
| 149 | + assertTrue(error.cause is TestException) |
| 150 | + rxExceptionHandlerInvocations.getAndIncrement() |
146 | 151 | }
|
147 |
| - withExceptionHandler(handler) { |
148 |
| - RxJavaPlugins.setErrorHandler { |
149 |
| - require(it !is CancellationException) |
150 |
| - } |
| 152 | + withExceptionHandler(rxExceptionHandler) { |
151 | 153 | Observable
|
152 | 154 | .interval(1, TimeUnit.MILLISECONDS)
|
153 | 155 | .take(1000)
|
154 | 156 | .switchMapSingle {
|
155 |
| - rxSingle { |
| 157 | + rxSingle(coroutineExceptionHandler) { |
156 | 158 | timeBomb().await()
|
157 | 159 | }
|
158 | 160 | }
|
159 | 161 | .blockingSubscribe({}, {})
|
160 | 162 | }
|
| 163 | + assertTrue(rxExceptionHandlerInvocations.get() > 0) |
161 | 164 | }
|
162 | 165 |
|
163 | 166 | private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
|
|
0 commit comments