Skip to content

Commit d474b59

Browse files
committed
add asScheduler extension for rx3
1 parent 35674d4 commit d474b59

File tree

7 files changed

+702
-14
lines changed

7 files changed

+702
-14
lines changed

reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
2323
SchedulerCoroutineDispatcher(this)
2424
}
2525

26-
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
26+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
2727
@JvmName("asCoroutineDispatcher")
2828
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
2929
SchedulerCoroutineDispatcher(this)

reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import io.reactivex.disposables.*
9-
import io.reactivex.functions.*
109
import io.reactivex.observers.*
1110
import io.reactivex.plugins.*
1211
import io.reactivex.schedulers.*
@@ -218,13 +217,13 @@ class SchedulerTest : TestBase() {
218217
expect(1)
219218

220219
fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) {
221-
RxJavaPlugins.setScheduleHandler(Function {
220+
RxJavaPlugins.setScheduleHandler {
222221
expect(expectedCountOnSchedule)
223222
Runnable {
224223
expect(expectCountOnRun)
225224
it.run()
226225
}
227-
})
226+
}
228227
}
229228

230229
val dispatcher = currentDispatcher() as CoroutineDispatcher
@@ -289,13 +288,13 @@ class SchedulerTest : TestBase() {
289288
}
290289

291290
private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) {
292-
RxJavaPlugins.setScheduleHandler(Function {
291+
RxJavaPlugins.setScheduleHandler {
293292
expect(expectedCountOnSchedule)
294293
Runnable {
295294
expect(expectCountOnRun)
296295
it.run()
297296
}
298-
})
297+
}
299298
}
300299

301300
/**

reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ public final class kotlinx/coroutines/rx3/RxObservableKt {
5656
}
5757

5858
public final class kotlinx/coroutines/rx3/RxSchedulerKt {
59-
public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher;
59+
public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher;
60+
public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher;
61+
public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/rxjava3/core/Scheduler;
6062
}
6163

6264
public final class kotlinx/coroutines/rx3/RxSingleKt {

reactive/kotlinx-coroutines-rx3/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ targetCompatibility = JavaVersion.VERSION_1_8
55

66
dependencies {
77
compile project(':kotlinx-coroutines-reactive')
8+
testCompile project(':kotlinx-coroutines-test')
89
testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
910
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
1011
compile "io.reactivex.rxjava3:rxjava:$rxjava3_version"

reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt

+135-4
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,29 @@
44

55
package kotlinx.coroutines.rx3
66

7-
import io.reactivex.rxjava3.core.Scheduler
7+
import io.reactivex.rxjava3.core.*
8+
import io.reactivex.rxjava3.disposables.*
9+
import io.reactivex.rxjava3.plugins.*
810
import kotlinx.coroutines.*
9-
import java.util.concurrent.TimeUnit
10-
import kotlin.coroutines.CoroutineContext
11+
import kotlinx.coroutines.channels.*
12+
import java.util.concurrent.*
13+
import kotlin.coroutines.*
1114

1215
/**
1316
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
1417
* and provides native support of [delay] and [withTimeout].
1518
*/
16-
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
19+
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
20+
if (this is DispatcherScheduler) {
21+
dispatcher
22+
} else {
23+
SchedulerCoroutineDispatcher(this)
24+
}
25+
26+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
27+
@JvmName("asCoroutineDispatcher")
28+
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
29+
SchedulerCoroutineDispatcher(this)
1730

1831
/**
1932
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
@@ -50,3 +63,121 @@ public class SchedulerCoroutineDispatcher(
5063
/** @suppress */
5164
override fun hashCode(): Int = System.identityHashCode(scheduler)
5265
}
66+
67+
/**
68+
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
69+
*/
70+
public fun CoroutineDispatcher.asScheduler(): Scheduler =
71+
if (this is SchedulerCoroutineDispatcher) {
72+
scheduler
73+
} else {
74+
DispatcherScheduler(this)
75+
}
76+
77+
private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() {
78+
79+
private val schedulerJob = SupervisorJob()
80+
81+
private val scope = CoroutineScope(schedulerJob + dispatcher + CoroutineExceptionHandler { _, throwable ->
82+
RxJavaPlugins.onError(throwable)
83+
})
84+
85+
override fun scheduleDirect(block: Runnable): Disposable =
86+
scheduleDirect(block, 0, TimeUnit.MILLISECONDS)
87+
88+
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
89+
if (!scope.isActive) return Disposable.disposed()
90+
val newBlock = RxJavaPlugins.onSchedule(block)
91+
return scope.launch {
92+
delay(unit.toMillis(delay))
93+
newBlock.run()
94+
}.asDisposable()
95+
}
96+
97+
override fun createWorker(): Worker =
98+
DispatcherWorker(dispatcher, schedulerJob).also {
99+
it.start()
100+
}
101+
102+
override fun shutdown() {
103+
scope.cancel()
104+
}
105+
106+
private class DispatcherWorker(dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() {
107+
108+
private val workerJob = SupervisorJob(parentJob)
109+
private val workerScope = CoroutineScope(workerJob + dispatcher)
110+
private val blockChannel = Channel<SchedulerChannelTask>(Channel.UNLIMITED)
111+
112+
fun start() {
113+
workerScope.launch {
114+
while (isActive) {
115+
val task = blockChannel.receive()
116+
task.execute()
117+
}
118+
}
119+
}
120+
121+
override fun isDisposed(): Boolean = !workerScope.isActive
122+
123+
override fun schedule(block: Runnable): Disposable =
124+
schedule(block, 0, TimeUnit.MILLISECONDS)
125+
126+
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
127+
if (!workerScope.isActive) return Disposable.disposed()
128+
129+
val newBlock = RxJavaPlugins.onSchedule(block)
130+
131+
val taskJob = Job(workerJob)
132+
val task = SchedulerChannelTask(newBlock, taskJob)
133+
134+
if (delay <= 0L) {
135+
blockChannel.offer(task)
136+
} else {
137+
// Use `taskJob` as the parent here so the delay will also get cancelled if the Disposable
138+
// is disposed.
139+
workerScope.launch(taskJob) {
140+
// Delay *before* enqueuing the task, so other tasks (e.g. via schedule without delay)
141+
// aren't blocked by the delay.
142+
delay(unit.toMillis(delay))
143+
// Once the task is ready to run, it still needs to be executed via the queue to comply
144+
// with the Scheduler contract of running all worker tasks in a non-overlapping manner.
145+
blockChannel.offer(task)
146+
}
147+
}
148+
149+
return taskJob.asDisposable()
150+
}
151+
152+
override fun dispose() {
153+
workerScope.cancel()
154+
}
155+
}
156+
}
157+
158+
/**
159+
* Represents a task to be queued sequentially on a [Channel] for a [Scheduler.Worker].
160+
*
161+
* Delayed tasks do not block [Channel] from processing other tasks
162+
*/
163+
private class SchedulerChannelTask(
164+
private val block: Runnable,
165+
job: Job
166+
) : JobDisposable(job) {
167+
fun execute() {
168+
if (job.isActive) {
169+
block.run()
170+
}
171+
}
172+
}
173+
174+
private open class JobDisposable(protected val job: Job) : Disposable {
175+
override fun isDisposed(): Boolean = !job.isActive
176+
177+
override fun dispose() {
178+
job.cancel()
179+
}
180+
}
181+
182+
private fun Job.asDisposable(): Disposable = JobDisposable(this)
183+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx3
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.test.*
9+
import org.junit.*
10+
import java.util.concurrent.*
11+
12+
class SchedulerStressTest : TestBase() {
13+
@Before
14+
fun setup() {
15+
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
16+
}
17+
18+
/**
19+
* Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd
20+
* see a OOM error.
21+
*/
22+
@Test
23+
fun testSchedulerDisposed(): Unit = runTest {
24+
val dispatcher = currentDispatcher() as CoroutineDispatcher
25+
val scheduler = dispatcher.asScheduler()
26+
testRunnableDisposed(scheduler::scheduleDirect)
27+
}
28+
29+
@Test
30+
fun testSchedulerWorkerDisposed(): Unit = runTest {
31+
val dispatcher = currentDispatcher() as CoroutineDispatcher
32+
val scheduler = dispatcher.asScheduler()
33+
testRunnableDisposed(scheduler.createWorker()::schedule)
34+
}
35+
36+
private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
37+
expect(1)
38+
39+
val dispatcher = currentDispatcher() as CoroutineDispatcher
40+
val scheduler = dispatcher.asScheduler()
41+
42+
val n = 2000 * stressTestMultiplier
43+
coroutineScope {
44+
repeat(n) { i ->
45+
launch {
46+
val a = ByteArray(1000000) //1MB
47+
val disposable = block(Runnable {
48+
expectUnreached()
49+
runBlocking {
50+
keepMe(a)
51+
}
52+
})
53+
disposable.dispose()
54+
expect(i + 2)
55+
}
56+
yield()
57+
}
58+
}
59+
60+
scheduler.shutdown()
61+
finish(n + 2)
62+
}
63+
64+
/**
65+
* Test function that holds a reference. Used for testing OOM situations
66+
*/
67+
private suspend fun keepMe(a: ByteArray) {
68+
delay(10)
69+
}
70+
71+
/**
72+
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
73+
* see a OOM error.
74+
*/
75+
@Test
76+
fun testSchedulerDisposedDuringDelay(): Unit = runBlockingTest {
77+
val dispatcher = currentDispatcher() as CoroutineDispatcher
78+
val scheduler = dispatcher.asScheduler()
79+
testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
80+
}
81+
82+
@Test
83+
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runBlockingTest {
84+
val dispatcher = currentDispatcher() as CoroutineDispatcher
85+
val scheduler = dispatcher.asScheduler()
86+
testRunnableDisposedDuringDelay(scheduler.createWorker()::schedule)
87+
}
88+
89+
private suspend fun TestCoroutineScope.testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
90+
expect(1)
91+
92+
val dispatcher = currentDispatcher() as CoroutineDispatcher
93+
val scheduler = dispatcher.asScheduler()
94+
95+
val n = 2000 * stressTestMultiplier
96+
coroutineScope {
97+
repeat(n) { i ->
98+
val a = ByteArray(1000000) //1MB
99+
val delayMillis: Long = 10
100+
val disposable = block(Runnable {
101+
runBlocking {
102+
keepMe(a)
103+
}
104+
}, delayMillis, TimeUnit.MILLISECONDS)
105+
disposable.dispose()
106+
advanceTimeBy(delayMillis)
107+
yield()
108+
}
109+
}
110+
111+
scheduler.shutdown()
112+
finish(2)
113+
}
114+
}

0 commit comments

Comments
 (0)