From 4fa676ffff18552fc67e1ca28a0d736c5bc1b80e Mon Sep 17 00:00:00 2001 From: Louis CAD Date: Tue, 20 Oct 2020 00:40:23 +0200 Subject: [PATCH 1/5] Fix potential crash in RX3 asFlow extension --- reactive/kotlinx-coroutines-rx3/src/RxConvert.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 9bb38c088f..39eee18866 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -81,7 +81,12 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Exception) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } + } override fun onError(e: Throwable) { close(e) } } From d32a97f5c886d44c5303ba39bd2545392bc3ea32 Mon Sep 17 00:00:00 2001 From: Louis CAD Date: Tue, 20 Oct 2020 01:25:31 +0200 Subject: [PATCH 2/5] Fix potential crash in RX2 asFlow extension --- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index cf73ef2ea8..eb0b4c6c3f 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -81,7 +81,12 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Exception) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } + } override fun onError(e: Throwable) { close(e) } } From 7b884f1b8348441c7aa9dbdb23d3982b3094fda2 Mon Sep 17 00:00:00 2001 From: Louis CAD Date: Tue, 20 Oct 2020 11:01:26 +0200 Subject: [PATCH 3/5] Catch Throwable instead of Exception in Rx2 asFlow --- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index eb0b4c6c3f..abd13b9ed4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -84,7 +84,7 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { override fun onNext(t: T) { try { sendBlocking(t) - } catch (ignored: Exception) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 } } override fun onError(e: Throwable) { close(e) } From f756f04b3f7ddc93c1aa04cdb815fbe7701e5ab4 Mon Sep 17 00:00:00 2001 From: Louis CAD Date: Tue, 20 Oct 2020 11:02:01 +0200 Subject: [PATCH 4/5] Catch Throwable instead of Exception in Rx3 asFlow --- reactive/kotlinx-coroutines-rx3/src/RxConvert.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 39eee18866..f30146a704 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -84,7 +84,7 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { override fun onNext(t: T) { try { sendBlocking(t) - } catch (ignored: Exception) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 } } override fun onError(e: Throwable) { close(e) } From 305154b15284b0f1ac65437d18a04392f3e456f6 Mon Sep 17 00:00:00 2001 From: 5e32a505ad92310e881bdce2 <2978958+drinkthestars@users.noreply.github.com> Date: Tue, 20 Oct 2020 23:48:24 -0700 Subject: [PATCH 5/5] Added rx2//rx3 stress tests for ObservableSource.asFlow() --- .../test/ObservableSourceAsFlowStressTest.kt | 89 +++++++++++++++++++ .../test/ObservableSourceAsFlowStressTest.kt | 89 +++++++++++++++++++ 2 files changed, 178 insertions(+) create mode 100644 reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt create mode 100644 reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..b187ab9b63 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val maxIterations = 25 + private val collectorJobCancelDelay = 1_000L + private val nextIterationDelay = collectorJobCancelDelay * 2 + + private var jobCancelledException: Throwable? = null + private val exceptionHandler = { throwable: Throwable -> + jobCancelledException = extractJobCancelledException(throwable) + } + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) { + val collectorThread = newSingleThreadContext("Collector Thread") + val cancellerThread = newSingleThreadContext("Canceller Thread") + val scope = CoroutineScope(Job()) + var iteration = 0 + + while (jobCancelledException == null && iteration < maxIterations) { + scope.runIteration(collectorThread, cancellerThread) + iteration += 1 + + Thread.sleep(nextIterationDelay) + + collectorThread.cancel() + cancellerThread.cancel() + } + + collectorThread.close() + cancellerThread.close() + scope.cancel() + + jobCancelledException?.also { + throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it) + } + } + + private fun CoroutineScope.runIteration( + collectorThread: ExecutorCoroutineDispatcher, + cancellerThread: ExecutorCoroutineDispatcher + ) { + val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + + val collectorJob = launch(collectorThread) { + outerObservable + .asFlow() + .flatMapLatest { + innerObservable.asFlow() + } + .collect { delay(100) } + } + + launch(cancellerThread) { + delay(collectorJobCancelDelay) + collectorJob.cancel() + } + } + + private fun extractJobCancelledException(throwable: Throwable): Throwable? { + if (throwable is UndeliverableException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + if (throwable is InterruptedException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + return throwable + } +} diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..b5e90286a4 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val maxIterations = 25 + private val collectorJobCancelDelay = 1_000L + private val nextIterationDelay = collectorJobCancelDelay * 2 + + private var jobCancelledException: Throwable? = null + private val exceptionHandler = { throwable: Throwable -> + jobCancelledException = extractJobCancelledException(throwable) + } + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) { + val collectorThread = newSingleThreadContext("Collector Thread") + val cancellerThread = newSingleThreadContext("Canceller Thread") + val scope = CoroutineScope(Job()) + var iteration = 0 + + while (jobCancelledException == null && iteration < maxIterations) { + scope.runIteration(collectorThread, cancellerThread) + iteration += 1 + + Thread.sleep(nextIterationDelay) + + collectorThread.cancel() + cancellerThread.cancel() + } + + collectorThread.close() + cancellerThread.close() + scope.cancel() + + jobCancelledException?.also { + throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it) + } + } + + private fun CoroutineScope.runIteration( + collectorThread: ExecutorCoroutineDispatcher, + cancellerThread: ExecutorCoroutineDispatcher + ) { + val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + + val collectorJob = launch(collectorThread) { + outerObservable + .asFlow() + .flatMapLatest { + innerObservable.asFlow() + } + .collect { delay(100) } + } + + launch(cancellerThread) { + delay(collectorJobCancelDelay) + collectorJob.cancel() + } + } + + private fun extractJobCancelledException(throwable: Throwable): Throwable? { + if (throwable is UndeliverableException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + if (throwable is InterruptedException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + return throwable + } +}