Skip to content

Commit ee78090

Browse files
qwwdfsadLouisCAD
andauthored
Fix potential crash in Rx2 and Rx3 asFlow extension (#2333)
Fixes #2104 Fixes #2299 Co-authored-by: Louis CAD <[email protected]>
1 parent 8df6f5a commit ee78090

File tree

4 files changed

+85
-2
lines changed

4 files changed

+85
-2
lines changed

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8181
val observer = object : Observer<T> {
8282
override fun onComplete() { close() }
8383
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
84-
override fun onNext(t: T) { sendBlocking(t) }
84+
override fun onNext(t: T) {
85+
try {
86+
sendBlocking(t)
87+
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88+
// Is handled by the downstream flow
89+
}
90+
}
8591
override fun onError(e: Throwable) { close(e) }
8692
}
8793

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import io.reactivex.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.*
11+
import org.junit.*
12+
import java.util.concurrent.*
13+
14+
class ObservableSourceAsFlowStressTest : TestBase() {
15+
16+
private val iterations = 100 * stressTestMultiplierSqrt
17+
18+
@Before
19+
fun setup() {
20+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
21+
}
22+
23+
@Test
24+
fun testAsFlowCancellation() = runTest {
25+
repeat(iterations) {
26+
val latch = Channel<Unit>(1)
27+
var i = 0
28+
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
29+
.doOnNext { if (++i > 100) latch.offer(Unit) }
30+
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
31+
latch.receive()
32+
job.cancelAndJoin()
33+
}
34+
}
35+
}

reactive/kotlinx-coroutines-rx3/src/RxConvert.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8181
val observer = object : Observer<T> {
8282
override fun onComplete() { close() }
8383
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
84-
override fun onNext(t: T) { sendBlocking(t) }
84+
override fun onNext(t: T) {
85+
try {
86+
sendBlocking(t)
87+
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88+
// Is handled by the downstream flow
89+
}
90+
}
8591
override fun onError(e: Throwable) { close(e) }
8692
}
8793

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx3
6+
7+
import io.reactivex.rxjava3.core.*
8+
import io.reactivex.rxjava3.exceptions.*
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.channels.*
11+
import kotlinx.coroutines.flow.*
12+
import org.junit.*
13+
import java.util.concurrent.*
14+
15+
class ObservableSourceAsFlowStressTest : TestBase() {
16+
17+
private val iterations = 100 * stressTestMultiplierSqrt
18+
19+
@Before
20+
fun setup() {
21+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22+
}
23+
24+
@Test
25+
fun testAsFlowCancellation() = runTest {
26+
repeat(iterations) {
27+
val latch = Channel<Unit>(1)
28+
var i = 0
29+
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
30+
.doOnNext { if (++i > 100) latch.offer(Unit) }
31+
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
32+
latch.receive()
33+
job.cancelAndJoin()
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)