Skip to content

Commit 5c4efe6

Browse files
recheejpablobaxter
authored andcommitted
Provide a way to use CoroutineDispatcher as an Rx's Scheduler.
Fixes Kotlin#968 Fixes Kotlin#548
1 parent da5e7e0 commit 5c4efe6

File tree

8 files changed

+1379
-18
lines changed

8 files changed

+1379
-18
lines changed

reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public final class kotlinx/coroutines/rx2/RxObservableKt {
6969
}
7070

7171
public final class kotlinx/coroutines/rx2/RxSchedulerKt {
72-
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
72+
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher;
73+
public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
74+
public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/Scheduler;
7375
}
7476

7577
public final class kotlinx/coroutines/rx2/RxSingleKt {

reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt

+134-5
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,143 @@
44

55
package kotlinx.coroutines.rx2
66

7-
import io.reactivex.Scheduler
7+
import io.reactivex.*
8+
import io.reactivex.disposables.*
9+
import io.reactivex.plugins.*
10+
import kotlinx.atomicfu.*
811
import kotlinx.coroutines.*
9-
import java.util.concurrent.TimeUnit
10-
import kotlin.coroutines.CoroutineContext
12+
import kotlinx.coroutines.channels.*
13+
import java.util.concurrent.*
14+
import kotlin.coroutines.*
1115

1216
/**
1317
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
1418
* and provides native support of [delay] and [withTimeout].
1519
*/
16-
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
20+
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
21+
if (this is DispatcherScheduler) {
22+
dispatcher
23+
} else {
24+
SchedulerCoroutineDispatcher(this)
25+
}
26+
27+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
28+
@JvmName("asCoroutineDispatcher")
29+
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
30+
SchedulerCoroutineDispatcher(this)
31+
32+
/**
33+
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
34+
*/
35+
public fun CoroutineDispatcher.asScheduler(): Scheduler =
36+
if (this is SchedulerCoroutineDispatcher) {
37+
scheduler
38+
} else {
39+
DispatcherScheduler(this)
40+
}
41+
42+
private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
43+
44+
private val schedulerJob = SupervisorJob()
45+
46+
/**
47+
* The scope for everything happening in this [DispatcherScheduler].
48+
*
49+
* Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
50+
*/
51+
private val scope = CoroutineScope(schedulerJob + dispatcher)
52+
53+
/**
54+
* The counter of created workers, for their pretty-printing.
55+
*/
56+
private val workerCounter = atomic(1L)
57+
58+
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
59+
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
60+
Runnable { scope.launch { task() } }
61+
}
62+
63+
override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
64+
65+
override fun shutdown() {
66+
schedulerJob.cancel()
67+
}
68+
69+
private class DispatcherWorker(
70+
private val counter: Long,
71+
private val dispatcher: CoroutineDispatcher,
72+
parentJob: Job
73+
) : Worker() {
74+
75+
private val workerJob = SupervisorJob(parentJob)
76+
private val workerScope = CoroutineScope(workerJob + dispatcher)
77+
private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
78+
79+
init {
80+
workerScope.launch {
81+
blockChannel.consumeEach {
82+
it()
83+
}
84+
}
85+
}
86+
87+
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
88+
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
89+
Runnable { blockChannel.trySend(task) }
90+
}
91+
92+
override fun isDisposed(): Boolean = !workerScope.isActive
93+
94+
override fun dispose() {
95+
blockChannel.close()
96+
workerJob.cancel()
97+
}
98+
99+
override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
100+
}
101+
102+
override fun toString(): String = dispatcher.toString()
103+
}
104+
105+
private typealias Task = suspend () -> Unit
106+
107+
/**
108+
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
109+
* milliseconds.
110+
*/
111+
private fun CoroutineScope.scheduleTask(
112+
block: Runnable,
113+
delayMillis: Long,
114+
adaptForScheduling: (Task) -> Runnable
115+
): Disposable {
116+
val ctx = coroutineContext
117+
var handle: DisposableHandle? = null
118+
val disposable = Disposables.fromRunnable {
119+
// null if delay <= 0
120+
handle?.dispose()
121+
}
122+
val decoratedBlock = RxJavaPlugins.onSchedule(block)
123+
suspend fun task() {
124+
if (disposable.isDisposed) return
125+
try {
126+
runInterruptible {
127+
decoratedBlock.run()
128+
}
129+
} catch (e: Throwable) {
130+
handleUndeliverableException(e, ctx)
131+
}
132+
}
133+
134+
val toSchedule = adaptForScheduling(::task)
135+
if (!isActive) return Disposables.disposed()
136+
if (delayMillis <= 0) {
137+
toSchedule.run()
138+
} else {
139+
@Suppress("INVISIBLE_MEMBER")
140+
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
141+
}
142+
return disposable
143+
}
17144

18145
/**
19146
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
@@ -45,8 +172,10 @@ public class SchedulerCoroutineDispatcher(
45172

46173
/** @suppress */
47174
override fun toString(): String = scheduler.toString()
175+
48176
/** @suppress */
49177
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
178+
50179
/** @suppress */
51180
override fun hashCode(): Int = System.identityHashCode(scheduler)
52-
}
181+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
import java.util.concurrent.*
10+
11+
class SchedulerStressTest : TestBase() {
12+
@Before
13+
fun setup() {
14+
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
15+
}
16+
17+
/**
18+
* Test that we don't get an OOM if we schedule many jobs at once.
19+
* It's expected that if you don't dispose you'd see an OOM error.
20+
*/
21+
@Test
22+
fun testSchedulerDisposed(): Unit = runTest {
23+
val dispatcher = currentDispatcher() as CoroutineDispatcher
24+
val scheduler = dispatcher.asScheduler()
25+
testRunnableDisposed(scheduler::scheduleDirect)
26+
}
27+
28+
@Test
29+
fun testSchedulerWorkerDisposed(): Unit = runTest {
30+
val dispatcher = currentDispatcher() as CoroutineDispatcher
31+
val scheduler = dispatcher.asScheduler()
32+
val worker = scheduler.createWorker()
33+
testRunnableDisposed(worker::schedule)
34+
}
35+
36+
private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
37+
val n = 2000 * stressTestMultiplier
38+
repeat(n) {
39+
val a = ByteArray(1000000) //1MB
40+
val disposable = block(Runnable {
41+
keepMe(a)
42+
expectUnreached()
43+
})
44+
disposable.dispose()
45+
yield() // allow the scheduled task to observe that it was disposed
46+
}
47+
}
48+
49+
/**
50+
* Test function that holds a reference. Used for testing OOM situations
51+
*/
52+
private fun keepMe(a: ByteArray) {
53+
Thread.sleep(a.size / (a.size + 1) + 10L)
54+
}
55+
56+
/**
57+
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
58+
* see a OOM error.
59+
*/
60+
@Test
61+
fun testSchedulerDisposedDuringDelay(): Unit = runTest {
62+
val dispatcher = currentDispatcher() as CoroutineDispatcher
63+
val scheduler = dispatcher.asScheduler()
64+
testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
65+
}
66+
67+
@Test
68+
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest {
69+
val dispatcher = currentDispatcher() as CoroutineDispatcher
70+
val scheduler = dispatcher.asScheduler()
71+
val worker = scheduler.createWorker()
72+
testRunnableDisposedDuringDelay(worker::schedule)
73+
}
74+
75+
private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
76+
val n = 2000 * stressTestMultiplier
77+
repeat(n) {
78+
val a = ByteArray(1000000) //1MB
79+
val delayMillis: Long = 10
80+
val disposable = block(Runnable {
81+
keepMe(a)
82+
expectUnreached()
83+
}, delayMillis, TimeUnit.MILLISECONDS)
84+
disposable.dispose()
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)