Skip to content

Add way to convert CoroutineDispatcher to Scheduler #1923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 76 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
abc2158
created DispatcherScheduler and added first test
Apr 19, 2020
f016ca8
some refactoring to better handle cancellation
Apr 19, 2020
f7f318e
added test for shutdown
Apr 19, 2020
cc423ea
remove redundant isActive check
Apr 19, 2020
7bb4530
remove try finally
Apr 19, 2020
81e65c7
remove comment
Apr 19, 2020
39df4ed
add test for scheduler with SchedulerCoroutineDispatcher
Apr 19, 2020
7d0d475
remove formatting changes
Apr 19, 2020
99f3e6c
add missing expectUnreached call in one test
Apr 19, 2020
136260c
add a small kdoc
Apr 19, 2020
23085cb
improve scheduleDirect with delay.
Apr 19, 2020
0e936fe
optimize imports
Apr 20, 2020
a9b99eb
formatting changes
Apr 20, 2020
3a27d4b
using job from launch for worker
Apr 20, 2020
2733fd8
remove spaces
Apr 21, 2020
6fa3ad1
simplify job handling, thus fix OOM
Apr 21, 2020
9c9e867
remove back ticks from test names
Apr 21, 2020
56919ff
call into overload
Apr 21, 2020
695d553
now handling non positive delays
Apr 21, 2020
566e2f4
formatting
Apr 21, 2020
7b8c5cf
using supervisorscope
Apr 21, 2020
f5306e2
using if expressions for return
Apr 21, 2020
56d09c9
create test that verifies sequential ordering
Apr 23, 2020
dd61f82
make rx scheduler worker sequential
Apr 23, 2020
af07533
trying out using channel to schedule sequentially
Apr 23, 2020
eb18274
made queue processing easier. also fixed unit tests
Apr 24, 2020
a3d02f3
return DispatcherScheduler dispatcher in asCoroutineDispatcher
Apr 24, 2020
04a1fef
add tests to verify conversions between dispatcher and scheduler
Apr 24, 2020
6beb046
small comment
Apr 24, 2020
a086d62
moving stress tests to another class. ignoring for now
Apr 25, 2020
debaa09
checking in current state of tests
Apr 26, 2020
e346b93
fix sequential ordering if we're using a dispatcher with thread pool
Apr 26, 2020
621388c
simplify isActive check
Apr 26, 2020
0c62da3
let channel suspend while empty
Apr 26, 2020
ef71887
change test name
Apr 26, 2020
9bd16a7
now running blocks directly in scheduleDirect
Apr 26, 2020
2e9f1f1
test with observables
Apr 26, 2020
09e990b
running blocks more directly inside DispatcherScheduler. also fixing …
Apr 26, 2020
de113d3
make asDisposable private
Apr 28, 2020
89eceae
fix dispose logic
Apr 28, 2020
ebdaf8e
move queue job processing into constructor
Apr 28, 2020
bbe3edb
making schedule block defer to override with delay param
Apr 28, 2020
9cf2c13
reduce if nesting
Apr 28, 2020
7686e2f
call RxJavaPlugins before delay
Apr 28, 2020
435faee
rx plugins for worker too
Apr 28, 2020
e40083b
add top level CoroutineExceptionHandler
Apr 28, 2020
9ab3762
adding if check to schedule direct
May 2, 2020
fda0e7a
now using coroutines test to assert rxjavaplugins call
May 2, 2020
9d8b4ae
remove queue processing job
May 2, 2020
4d5dd17
improving how jobs are added to the channel
May 4, 2020
37bff66
adding comment about keepMe function
May 4, 2020
8b90b5b
use run blocking test for tests that call delay
May 4, 2020
44c9ebe
removing redundant thread tests
May 4, 2020
537de18
separate plugin tests
May 4, 2020
eb3da9b
adding more tests for worker tasks
May 4, 2020
a100fb0
make worker job private
May 4, 2020
554ef38
check for job cancelled
May 4, 2020
33d5c67
remove println
May 4, 2020
9e100c2
refactor tests to be more generic for worker tests in next commit
May 5, 2020
446c787
wrote tests for scheduler worker and made fixes based off tests
May 5, 2020
631110b
updating scheduler stress tests to include worker tests
May 5, 2020
e0d70d4
make sure we're waiting for delayJob properly
May 7, 2020
cb506a6
refactor delay logic (again :)) to delay first before enqueuing work
May 10, 2020
0477dfd
remove redundant expect
May 10, 2020
53e0122
remove redundant channel close
May 10, 2020
7b361b3
using one job for both non-delayed and delayed. also fix redundant jo…
May 12, 2020
26c9e1e
remove redundant modifier
Sep 9, 2020
e5e86f1
api dump
Sep 9, 2020
64b3743
use start function and reuse JobDisposable
Oct 16, 2020
78457b3
remove addTaskToQueue function
Oct 16, 2020
5f5e637
fix typo in tests
Oct 16, 2020
b634a8c
unit test for multiple workers
Oct 17, 2020
8f204bf
maintain binary compat. also formatting
Oct 20, 2020
a829462
api dump
Oct 20, 2020
15f96b3
change if statements
Oct 25, 2020
2aab409
add asScheduler extension for rx3
recheej Dec 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public final class kotlinx/coroutines/rx2/RxObservableKt {
}

public final class kotlinx/coroutines/rx2/RxSchedulerKt {
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher;
public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elizarov is this "synthetic" ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is because it is HIDDEN. Old code that was compiled against the previous version of the library will still work.

public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/Scheduler;
}

public final class kotlinx/coroutines/rx2/RxSingleKt {
Expand Down
1 change: 1 addition & 0 deletions reactive/kotlinx-coroutines-rx2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
dependencies {
compile project(':kotlinx-coroutines-reactive')
testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
testCompile project(':kotlinx-coroutines-test')
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
compile "io.reactivex.rxjava2:rxjava:$rxjava2_version"
}
Expand Down
138 changes: 134 additions & 4 deletions reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,136 @@

package kotlinx.coroutines.rx2

import io.reactivex.Scheduler
import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*

/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
*/
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
if (this is DispatcherScheduler) {
dispatcher
} else {
SchedulerCoroutineDispatcher(this)
}

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
@JvmName("asCoroutineDispatcher")
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
SchedulerCoroutineDispatcher(this)

/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
*/
public fun CoroutineDispatcher.asScheduler(): Scheduler =
if (this is SchedulerCoroutineDispatcher) {
scheduler
} else {
DispatcherScheduler(this)
}

private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() {

private val schedulerJob = SupervisorJob()

private val scope = CoroutineScope(schedulerJob + dispatcher + CoroutineExceptionHandler { _, throwable ->
RxJavaPlugins.onError(throwable)
})

override fun scheduleDirect(block: Runnable): Disposable =
scheduleDirect(block, 0, TimeUnit.MILLISECONDS)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!scope.isActive) return Disposables.disposed()
val newBlock = RxJavaPlugins.onSchedule(block)
return scope.launch {
delay(unit.toMillis(delay))
newBlock.run()
}.asDisposable()
}

override fun createWorker(): Worker =
DispatcherWorker(dispatcher, schedulerJob).also {
it.start()
}

override fun shutdown() {
scope.cancel()
}

private class DispatcherWorker(dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() {

private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
private val blockChannel = Channel<SchedulerChannelTask>(Channel.UNLIMITED)

fun start() {
workerScope.launch {
while (isActive) {
val task = blockChannel.receive()
task.execute()
}
}
}

override fun isDisposed(): Boolean = !workerScope.isActive

override fun schedule(block: Runnable): Disposable =
schedule(block, 0, TimeUnit.MILLISECONDS)

override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!workerScope.isActive) return Disposables.disposed()

val newBlock = RxJavaPlugins.onSchedule(block)

val taskJob = Job(workerJob)
val task = SchedulerChannelTask(newBlock, taskJob)

if (delay <= 0L) {
blockChannel.offer(task)
} else {
// Use `taskJob` as the parent here so the delay will also get cancelled if the Disposable
// is disposed.
workerScope.launch(taskJob) {
// Delay *before* enqueuing the task, so other tasks (e.g. via schedule without delay)
// aren't blocked by the delay.
delay(unit.toMillis(delay))
// Once the task is ready to run, it still needs to be executed via the queue to comply
// with the Scheduler contract of running all worker tasks in a non-overlapping manner.
blockChannel.offer(task)
}
}

return taskJob.asDisposable()
}

override fun dispose() {
workerScope.cancel()
}
}
}

/**
* Represents a task to be queued sequentially on a [Channel] for a [Scheduler.Worker].
*
* Delayed tasks do not block [Channel] from processing other tasks
*/
private class SchedulerChannelTask(
private val block: Runnable,
job: Job
) : JobDisposable(job) {
fun execute() {
if (job.isActive) {
block.run()
}
}
}

/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
Expand Down Expand Up @@ -50,3 +170,13 @@ public class SchedulerCoroutineDispatcher(
/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}

private open class JobDisposable(protected val job: Job) : Disposable {
override fun isDisposed(): Boolean = !job.isActive

override fun dispose() {
job.cancel()
}
}

private fun Job.asDisposable(): Disposable = JobDisposable(this)
114 changes: 114 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import org.junit.*
import java.util.concurrent.*

class SchedulerStressTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}

/**
* Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd
* see a OOM error.
*/
@Test
fun testSchedulerDisposed(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposed(scheduler::scheduleDirect)
}

@Test
fun testSchedulerWorkerDisposed(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposed(scheduler.createWorker()::schedule)
}

private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
expect(1)

val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()

val n = 2000 * stressTestMultiplier
coroutineScope {
repeat(n) { i ->
launch {
val a = ByteArray(1000000) //1MB
val disposable = block(Runnable {
expectUnreached()
runBlocking {
keepMe(a)
}
})
disposable.dispose()
expect(i + 2)
}
yield()
}
}

scheduler.shutdown()
finish(n + 2)
}

/**
* Test function that holds a reference. Used for testing OOM situations
*/
private suspend fun keepMe(a: ByteArray) {
delay(10)
}

/**
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
* see a OOM error.
*/
@Test
fun testSchedulerDisposedDuringDelay(): Unit = runBlockingTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
}

@Test
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runBlockingTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposedDuringDelay(scheduler.createWorker()::schedule)
}

private suspend fun TestCoroutineScope.testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
expect(1)

val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()

val n = 2000 * stressTestMultiplier
coroutineScope {
repeat(n) { i ->
val a = ByteArray(1000000) //1MB
val delayMillis: Long = 10
val disposable = block(Runnable {
runBlocking {
keepMe(a)
}
}, delayMillis, TimeUnit.MILLISECONDS)
disposable.dispose()
advanceTimeBy(delayMillis)
yield()
}
}

scheduler.shutdown()
finish(2)
}
}
Loading