diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 4370325f58..22bee8759f 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -67,7 +67,9 @@ public final class kotlinx/coroutines/rx2/RxObservableKt { } public final class kotlinx/coroutines/rx2/RxSchedulerKt { - public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher; + public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher; + public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher; + public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/Scheduler; } public final class kotlinx/coroutines/rx2/RxSingleKt { diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle index 6d2c4abcc8..9995c9ba47 100644 --- a/reactive/kotlinx-coroutines-rx2/build.gradle +++ b/reactive/kotlinx-coroutines-rx2/build.gradle @@ -5,6 +5,7 @@ dependencies { compile project(':kotlinx-coroutines-reactive') testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output + testCompile project(':kotlinx-coroutines-test') testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" compile "io.reactivex.rxjava2:rxjava:$rxjava2_version" } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 9952eb91a0..d4fb53ba8d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -4,16 +4,136 @@ package kotlinx.coroutines.rx2 -import io.reactivex.Scheduler +import io.reactivex.* +import io.reactivex.disposables.* +import io.reactivex.plugins.* import kotlinx.coroutines.* -import java.util.concurrent.TimeUnit -import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.channels.* +import java.util.concurrent.* +import kotlin.coroutines.* /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] * and provides native support of [delay] and [withTimeout]. */ -public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) +public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = + if (this is DispatcherScheduler) { + dispatcher + } else { + SchedulerCoroutineDispatcher(this) + } + +@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") +@JvmName("asCoroutineDispatcher") +public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = + SchedulerCoroutineDispatcher(this) + +/** + * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. + */ +public fun CoroutineDispatcher.asScheduler(): Scheduler = + if (this is SchedulerCoroutineDispatcher) { + scheduler + } else { + DispatcherScheduler(this) + } + +private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() { + + private val schedulerJob = SupervisorJob() + + private val scope = CoroutineScope(schedulerJob + dispatcher + CoroutineExceptionHandler { _, throwable -> + RxJavaPlugins.onError(throwable) + }) + + override fun scheduleDirect(block: Runnable): Disposable = + scheduleDirect(block, 0, TimeUnit.MILLISECONDS) + + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable { + if (!scope.isActive) return Disposables.disposed() + val newBlock = RxJavaPlugins.onSchedule(block) + return scope.launch { + delay(unit.toMillis(delay)) + newBlock.run() + }.asDisposable() + } + + override fun createWorker(): Worker = + DispatcherWorker(dispatcher, schedulerJob).also { + it.start() + } + + override fun shutdown() { + scope.cancel() + } + + private class DispatcherWorker(dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() { + + private val workerJob = SupervisorJob(parentJob) + private val workerScope = CoroutineScope(workerJob + dispatcher) + private val blockChannel = Channel(Channel.UNLIMITED) + + fun start() { + workerScope.launch { + while (isActive) { + val task = blockChannel.receive() + task.execute() + } + } + } + + override fun isDisposed(): Boolean = !workerScope.isActive + + override fun schedule(block: Runnable): Disposable = + schedule(block, 0, TimeUnit.MILLISECONDS) + + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable { + if (!workerScope.isActive) return Disposables.disposed() + + val newBlock = RxJavaPlugins.onSchedule(block) + + val taskJob = Job(workerJob) + val task = SchedulerChannelTask(newBlock, taskJob) + + if (delay <= 0L) { + blockChannel.offer(task) + } else { + // Use `taskJob` as the parent here so the delay will also get cancelled if the Disposable + // is disposed. + workerScope.launch(taskJob) { + // Delay *before* enqueuing the task, so other tasks (e.g. via schedule without delay) + // aren't blocked by the delay. + delay(unit.toMillis(delay)) + // Once the task is ready to run, it still needs to be executed via the queue to comply + // with the Scheduler contract of running all worker tasks in a non-overlapping manner. + blockChannel.offer(task) + } + } + + return taskJob.asDisposable() + } + + override fun dispose() { + workerScope.cancel() + } + } +} + +/** + * Represents a task to be queued sequentially on a [Channel] for a [Scheduler.Worker]. + * + * Delayed tasks do not block [Channel] from processing other tasks + */ +private class SchedulerChannelTask( + private val block: Runnable, + job: Job +) : JobDisposable(job) { + fun execute() { + if (job.isActive) { + block.run() + } + } +} /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. @@ -50,3 +170,13 @@ public class SchedulerCoroutineDispatcher( /** @suppress */ override fun hashCode(): Int = System.identityHashCode(scheduler) } + +private open class JobDisposable(protected val job: Job) : Disposable { + override fun isDisposed(): Boolean = !job.isActive + + override fun dispose() { + job.cancel() + } +} + +private fun Job.asDisposable(): Disposable = JobDisposable(this) diff --git a/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt new file mode 100644 index 0000000000..14afb3e396 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import kotlinx.coroutines.test.* +import org.junit.* +import java.util.concurrent.* + +class SchedulerStressTest : TestBase() { + @Before + fun setup() { + ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + /** + * Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + val n = 2000 * stressTestMultiplier + coroutineScope { + repeat(n) { i -> + launch { + val a = ByteArray(1000000) //1MB + val disposable = block(Runnable { + expectUnreached() + runBlocking { + keepMe(a) + } + }) + disposable.dispose() + expect(i + 2) + } + yield() + } + } + + scheduler.shutdown() + finish(n + 2) + } + + /** + * Test function that holds a reference. Used for testing OOM situations + */ + private suspend fun keepMe(a: ByteArray) { + delay(10) + } + + /** + * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposedDuringDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposedDuringDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler.createWorker()::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + val n = 2000 * stressTestMultiplier + coroutineScope { + repeat(n) { i -> + val a = ByteArray(1000000) //1MB + val delayMillis: Long = 10 + val disposable = block(Runnable { + runBlocking { + keepMe(a) + } + }, delayMillis, TimeUnit.MILLISECONDS) + disposable.dispose() + advanceTimeBy(delayMillis) + yield() + } + } + + scheduler.shutdown() + finish(2) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt index 26dbe8f4cf..21771c7862 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt @@ -4,10 +4,18 @@ package kotlinx.coroutines.rx2 -import io.reactivex.schedulers.Schedulers +import io.reactivex.* +import io.reactivex.disposables.* +import io.reactivex.observers.* +import io.reactivex.plugins.* +import io.reactivex.schedulers.* import kotlinx.coroutines.* -import org.junit.Before +import kotlinx.coroutines.test.* +import org.junit.* import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { @@ -17,7 +25,7 @@ class SchedulerTest : TestBase() { } @Test - fun testIoScheduler(): Unit = runBlocking { + fun testIoScheduler(): Unit = runTest { expect(1) val mainThread = Thread.currentThread() withContext(Schedulers.io().asCoroutineDispatcher()) { @@ -31,4 +39,438 @@ class SchedulerTest : TestBase() { } finish(4) } -} \ No newline at end of file + + @Test + fun testSchedulerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) { + expect(1) + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }) + } + yield() + finish(3) + } + + @Test + fun testSchedulerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, 300) + } + + @Test + fun testSchedulerWorkerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, 300) + } + + @Test + fun testSchedulerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) { + expect(1) + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }, delayMillis, TimeUnit.MILLISECONDS) + } + + scheduler.shutdown() + finish(3) + } + + @Test + fun testAsSchedulerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, -1) + } + + @Test + fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, -1) + } + + @Test + fun testSchedulerDisposeDuringDelay(): Unit = runBlockingTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableDisposeDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposeDuringDelay(): Unit = runBlockingTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableDisposeDuringDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableDisposeDuringDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + val delayMillis = 300L + val disposable = block(Runnable { + expectUnreached() + }, delayMillis, TimeUnit.MILLISECONDS) + delay(100) + expect(2) + disposable.dispose() + delay(300) + yield() + finish(3) + } + + @Test + fun testSchedulerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) { + expect(1) + val disposable = block(Runnable { + expectUnreached() + }) + disposable.dispose() + yield() + finish(2) + } + + @Test + fun testSchedulerWorksWithSchedulerCoroutineDispatcher(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + expect(1) + suspendCancellableCoroutine { + scheduler.scheduleDirect(Runnable { + expect(2) + it.resume(Unit) + }) + } + finish(3) + } + + @Test + fun testConvertDispatcherToOriginalScheduler(): Unit = runTest { + expect(1) + + val originalScheduler = Schedulers.io() + val dispatcher = originalScheduler.asCoroutineDispatcher() + val scheduler = dispatcher.asScheduler() + assertEquals(originalScheduler, scheduler) + + finish(2) + } + + @Test + fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest { + expect(1) + + val originalDispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = originalDispatcher.asScheduler() + val dispatcher = scheduler.asCoroutineDispatcher() + assertEquals(originalDispatcher, dispatcher) + + finish(2) + } + + @Test + fun testSchedulerExpectRxPluginsCall(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) { + expect(1) + + fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + setScheduler(2, 4) + + pauseDispatcher { + suspendCancellableCoroutine { + block(Runnable { + expect(5) + it.resume(Unit) + }) + expect(3) + resumeDispatcher() + } + } + + RxJavaPlugins.setScheduleHandler(null) + scheduler.shutdown() + finish(6) + } + + @Test + fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableExpectRxPluginsCallDelay(worker::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + setScheduler(2, 4) + + pauseDispatcher { + suspendCancellableCoroutine { + block(Runnable { + expect(5) + RxJavaPlugins.setScheduleHandler(null) + it.resume(Unit) + }, 300, TimeUnit.MILLISECONDS) + expect(3) + resumeDispatcher() + } + } + + RxJavaPlugins.setScheduleHandler(null) + scheduler.shutdown() + finish(6) + } + + private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + /** + * Let's test the [Scheduler.Worker] to make sure it satisfies the documented constraint of running all work + * sequentially + */ + @Test + fun testSchedulerWorkerSequentialOrdering(): Unit = runTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val worker = scheduler.createWorker() + + val iterations = 2 + coroutineScope { + for (i in (0..iterations)) { + launch { + suspendCancellableCoroutine { + worker.schedule(Runnable { + expect(2 + i) + it.resume(Unit) + }) + } + yield() + } + } + } + yield() + finish((iterations + 2) + 1) + } + + /** + * @see [testSchedulerWorkerSequentialOrdering] + */ + @Test + fun testSchedulerWorkerSequentialOrderingDelayed(): Unit = runTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val worker = scheduler.createWorker() + + val iterations = 2 + coroutineScope { + for (i in (0..iterations)) { + suspendCancellableCoroutine { + worker.schedule(Runnable { + expect(2 + i) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + } + } + } + yield() + finish((iterations + 2) + 1) + } + + /** + * Let's test the [Scheduler.Worker] to make sure it satisfies the documented constraint of running all work + * sequentially using RxJava primitives + */ + @Test + fun testSchedulerWorkerSequentialWithObservables(): Unit = runBlockingTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val testObservable = Observable + .create { + it.onNext(1) + it.onNext(2) + it.onComplete() + } + .observeOn(scheduler) + .map { + runBlocking { + if (it == 1) { + // delay by some time. we expect that even with delay this iteration should be first + delay(100) + } + it + 1 + } + } + .subscribeOn(scheduler) + + val testObserver = TestObserver() + testObservable.subscribe(testObserver) + testObservable.blockingSubscribe() + testObserver.apply { + assertValueCount(2) + assertResult(2, 3) + dispose() + } + finish(2) + } + + /** + * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later, + * even when the later task is submitted before the earlier one) + * + * NOTE: not using [runBlockingTest] because of infamous "this job has not completed yet" error: + * + * https://github.com/Kotlin/kotlinx.coroutines/issues/1204 + */ + @Test + fun testSchedulerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) { + expect(1) + + coroutineScope { + launch { + suspendCancellableCoroutine { + block(Runnable { + expect(3) + it.resume(Unit) + }, 100, TimeUnit.MILLISECONDS) + } + } + + launch { + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }, 1, TimeUnit.MILLISECONDS) + } + } + } + + finish(4) + } + + /** + * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler. + * + * This is part of expected behavior documented. + */ + @Test + fun testMultipleWorkerCancellation(): Unit = runTest { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + coroutineScope { + suspendCancellableCoroutine { + val workerOne = scheduler.createWorker() + workerOne.schedule({ + expect(3) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + + expect(2) + + val workerTwo = scheduler.createWorker() + workerTwo.schedule({ + expectUnreached() + }, 10, TimeUnit.MILLISECONDS) + workerTwo.dispose() + } + } + + finish(4) + } +} + +typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable +typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 6d2dd63d2c..3d710eff93 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -56,7 +56,9 @@ public final class kotlinx/coroutines/rx3/RxObservableKt { } public final class kotlinx/coroutines/rx3/RxSchedulerKt { - public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher; + public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher; + public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher; + public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/rxjava3/core/Scheduler; } public final class kotlinx/coroutines/rx3/RxSingleKt { diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle index ced694abd3..6f64069085 100644 --- a/reactive/kotlinx-coroutines-rx3/build.gradle +++ b/reactive/kotlinx-coroutines-rx3/build.gradle @@ -5,6 +5,7 @@ targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':kotlinx-coroutines-reactive') + testCompile project(':kotlinx-coroutines-test') testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" compile "io.reactivex.rxjava3:rxjava:$rxjava3_version" diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index a426aea6ba..aa59431b24 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -4,16 +4,29 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.core.Scheduler +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* +import io.reactivex.rxjava3.plugins.* import kotlinx.coroutines.* -import java.util.concurrent.TimeUnit -import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.channels.* +import java.util.concurrent.* +import kotlin.coroutines.* /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] * and provides native support of [delay] and [withTimeout]. */ -public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) +public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = + if (this is DispatcherScheduler) { + dispatcher + } else { + SchedulerCoroutineDispatcher(this) + } + +@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") +@JvmName("asCoroutineDispatcher") +public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = + SchedulerCoroutineDispatcher(this) /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. @@ -50,3 +63,121 @@ public class SchedulerCoroutineDispatcher( /** @suppress */ override fun hashCode(): Int = System.identityHashCode(scheduler) } + +/** + * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. + */ +public fun CoroutineDispatcher.asScheduler(): Scheduler = + if (this is SchedulerCoroutineDispatcher) { + scheduler + } else { + DispatcherScheduler(this) + } + +private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() { + + private val schedulerJob = SupervisorJob() + + private val scope = CoroutineScope(schedulerJob + dispatcher + CoroutineExceptionHandler { _, throwable -> + RxJavaPlugins.onError(throwable) + }) + + override fun scheduleDirect(block: Runnable): Disposable = + scheduleDirect(block, 0, TimeUnit.MILLISECONDS) + + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable { + if (!scope.isActive) return Disposable.disposed() + val newBlock = RxJavaPlugins.onSchedule(block) + return scope.launch { + delay(unit.toMillis(delay)) + newBlock.run() + }.asDisposable() + } + + override fun createWorker(): Worker = + DispatcherWorker(dispatcher, schedulerJob).also { + it.start() + } + + override fun shutdown() { + scope.cancel() + } + + private class DispatcherWorker(dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() { + + private val workerJob = SupervisorJob(parentJob) + private val workerScope = CoroutineScope(workerJob + dispatcher) + private val blockChannel = Channel(Channel.UNLIMITED) + + fun start() { + workerScope.launch { + while (isActive) { + val task = blockChannel.receive() + task.execute() + } + } + } + + override fun isDisposed(): Boolean = !workerScope.isActive + + override fun schedule(block: Runnable): Disposable = + schedule(block, 0, TimeUnit.MILLISECONDS) + + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable { + if (!workerScope.isActive) return Disposable.disposed() + + val newBlock = RxJavaPlugins.onSchedule(block) + + val taskJob = Job(workerJob) + val task = SchedulerChannelTask(newBlock, taskJob) + + if (delay <= 0L) { + blockChannel.offer(task) + } else { + // Use `taskJob` as the parent here so the delay will also get cancelled if the Disposable + // is disposed. + workerScope.launch(taskJob) { + // Delay *before* enqueuing the task, so other tasks (e.g. via schedule without delay) + // aren't blocked by the delay. + delay(unit.toMillis(delay)) + // Once the task is ready to run, it still needs to be executed via the queue to comply + // with the Scheduler contract of running all worker tasks in a non-overlapping manner. + blockChannel.offer(task) + } + } + + return taskJob.asDisposable() + } + + override fun dispose() { + workerScope.cancel() + } + } +} + +/** + * Represents a task to be queued sequentially on a [Channel] for a [Scheduler.Worker]. + * + * Delayed tasks do not block [Channel] from processing other tasks + */ +private class SchedulerChannelTask( + private val block: Runnable, + job: Job +) : JobDisposable(job) { + fun execute() { + if (job.isActive) { + block.run() + } + } +} + +private open class JobDisposable(protected val job: Job) : Disposable { + override fun isDisposed(): Boolean = !job.isActive + + override fun dispose() { + job.cancel() + } +} + +private fun Job.asDisposable(): Disposable = JobDisposable(this) + diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt new file mode 100644 index 0000000000..ebab377561 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import kotlinx.coroutines.* +import kotlinx.coroutines.test.* +import org.junit.* +import java.util.concurrent.* + +class SchedulerStressTest : TestBase() { + @Before + fun setup() { + ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + /** + * Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + val n = 2000 * stressTestMultiplier + coroutineScope { + repeat(n) { i -> + launch { + val a = ByteArray(1000000) //1MB + val disposable = block(Runnable { + expectUnreached() + runBlocking { + keepMe(a) + } + }) + disposable.dispose() + expect(i + 2) + } + yield() + } + } + + scheduler.shutdown() + finish(n + 2) + } + + /** + * Test function that holds a reference. Used for testing OOM situations + */ + private suspend fun keepMe(a: ByteArray) { + delay(10) + } + + /** + * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposedDuringDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposedDuringDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler.createWorker()::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + val n = 2000 * stressTestMultiplier + coroutineScope { + repeat(n) { i -> + val a = ByteArray(1000000) //1MB + val delayMillis: Long = 10 + val disposable = block(Runnable { + runBlocking { + keepMe(a) + } + }, delayMillis, TimeUnit.MILLISECONDS) + disposable.dispose() + advanceTimeBy(delayMillis) + yield() + } + } + + scheduler.shutdown() + finish(2) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt index 9e95c213d0..f9b1a47f3d 100644 --- a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt @@ -4,10 +4,17 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* +import io.reactivex.rxjava3.observers.* +import io.reactivex.rxjava3.plugins.* +import io.reactivex.rxjava3.schedulers.* import kotlinx.coroutines.* -import org.junit.Before +import kotlinx.coroutines.test.* +import org.junit.* import org.junit.Test +import java.util.concurrent.* +import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { @@ -17,7 +24,7 @@ class SchedulerTest : TestBase() { } @Test - fun testIoScheduler(): Unit = runBlocking { + fun testIoScheduler(): Unit = runTest { expect(1) val mainThread = Thread.currentThread() withContext(Schedulers.io().asCoroutineDispatcher()) { @@ -31,4 +38,438 @@ class SchedulerTest : TestBase() { } finish(4) } + + @Test + fun testSchedulerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) { + expect(1) + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }) + } + yield() + finish(3) + } + + @Test + fun testSchedulerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, 300) + } + + @Test + fun testSchedulerWorkerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, 300) + } + + @Test + fun testSchedulerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) { + expect(1) + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }, delayMillis, TimeUnit.MILLISECONDS) + } + + scheduler.shutdown() + finish(3) + } + + @Test + fun testAsSchedulerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, -1) + } + + @Test + fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, -1) + } + + @Test + fun testSchedulerDisposeDuringDelay(): Unit = runBlockingTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableDisposeDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposeDuringDelay(): Unit = runBlockingTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableDisposeDuringDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableDisposeDuringDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + val delayMillis = 300L + val disposable = block(Runnable { + expectUnreached() + }, delayMillis, TimeUnit.MILLISECONDS) + delay(100) + expect(2) + disposable.dispose() + delay(300) + yield() + finish(3) + } + + @Test + fun testSchedulerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) { + expect(1) + val disposable = block(Runnable { + expectUnreached() + }) + disposable.dispose() + yield() + finish(2) + } + + @Test + fun testSchedulerWorksWithSchedulerCoroutineDispatcher(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + expect(1) + suspendCancellableCoroutine { + scheduler.scheduleDirect(Runnable { + expect(2) + it.resume(Unit) + }) + } + finish(3) + } + + @Test + fun testConvertDispatcherToOriginalScheduler(): Unit = runTest { + expect(1) + + val originalScheduler = Schedulers.io() + val dispatcher = originalScheduler.asCoroutineDispatcher() + val scheduler = dispatcher.asScheduler() + assertEquals(originalScheduler, scheduler) + + finish(2) + } + + @Test + fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest { + expect(1) + + val originalDispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = originalDispatcher.asScheduler() + val dispatcher = scheduler.asCoroutineDispatcher() + assertEquals(originalDispatcher, dispatcher) + + finish(2) + } + + @Test + fun testSchedulerExpectRxPluginsCall(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) { + expect(1) + + fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + setScheduler(2, 4) + + pauseDispatcher { + suspendCancellableCoroutine { + block(Runnable { + expect(5) + it.resume(Unit) + }) + expect(3) + resumeDispatcher() + } + } + + RxJavaPlugins.setScheduleHandler(null) + scheduler.shutdown() + finish(6) + } + + @Test + fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runBlockingTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableExpectRxPluginsCallDelay(worker::schedule) + } + + private suspend fun TestCoroutineScope.testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + setScheduler(2, 4) + + pauseDispatcher { + suspendCancellableCoroutine { + block(Runnable { + expect(5) + RxJavaPlugins.setScheduleHandler(null) + it.resume(Unit) + }, 300, TimeUnit.MILLISECONDS) + expect(3) + resumeDispatcher() + } + } + + RxJavaPlugins.setScheduleHandler(null) + scheduler.shutdown() + finish(6) + } + + private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + /** + * Let's test the [Scheduler.Worker] to make sure it satisfies the documented constraint of running all work + * sequentially + */ + @Test + fun testSchedulerWorkerSequentialOrdering(): Unit = runTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val worker = scheduler.createWorker() + + val iterations = 2 + coroutineScope { + for (i in (0..iterations)) { + launch { + suspendCancellableCoroutine { + worker.schedule(Runnable { + expect(2 + i) + it.resume(Unit) + }) + } + yield() + } + } + } + yield() + finish((iterations + 2) + 1) + } + + /** + * @see [testSchedulerWorkerSequentialOrdering] + */ + @Test + fun testSchedulerWorkerSequentialOrderingDelayed(): Unit = runTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val worker = scheduler.createWorker() + + val iterations = 2 + coroutineScope { + for (i in (0..iterations)) { + suspendCancellableCoroutine { + worker.schedule(Runnable { + expect(2 + i) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + } + } + } + yield() + finish((iterations + 2) + 1) + } + + /** + * Let's test the [Scheduler.Worker] to make sure it satisfies the documented constraint of running all work + * sequentially using RxJava primitives + */ + @Test + fun testSchedulerWorkerSequentialWithObservables(): Unit = runBlockingTest { + expect(1) + + val scheduler = Dispatchers.Default.asScheduler() + + val testObservable = Observable + .create { + it.onNext(1) + it.onNext(2) + it.onComplete() + } + .observeOn(scheduler) + .map { + runBlocking { + if (it == 1) { + // delay by some time. we expect that even with delay this iteration should be first + delay(100) + } + it + 1 + } + } + .subscribeOn(scheduler) + + val testObserver = TestObserver() + testObservable.subscribe(testObserver) + testObservable.blockingSubscribe() + testObserver.apply { + assertValueCount(2) + assertResult(2, 3) + dispose() + } + finish(2) + } + + /** + * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later, + * even when the later task is submitted before the earlier one) + * + * NOTE: not using [runBlockingTest] because of infamous "this job has not completed yet" error: + * + * https://github.com/Kotlin/kotlinx.coroutines/issues/1204 + */ + @Test + fun testSchedulerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) { + expect(1) + + coroutineScope { + launch { + suspendCancellableCoroutine { + block(Runnable { + expect(3) + it.resume(Unit) + }, 100, TimeUnit.MILLISECONDS) + } + } + + launch { + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }, 1, TimeUnit.MILLISECONDS) + } + } + } + + finish(4) + } + + /** + * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler. + * + * This is part of expected behavior documented. + */ + @Test + fun testMultipleWorkerCancellation(): Unit = runTest { + expect(1) + + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + + coroutineScope { + suspendCancellableCoroutine { + val workerOne = scheduler.createWorker() + workerOne.schedule({ + expect(3) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + + expect(2) + + val workerTwo = scheduler.createWorker() + workerTwo.schedule({ + expectUnreached() + }, 10, TimeUnit.MILLISECONDS) + workerTwo.dispose() + } + } + + finish(4) + } } + +typealias RxSchedulerBlockNoDelay = (java.lang.Runnable) -> Disposable +typealias RxSchedulerBlockWithDelay = (java.lang.Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file