Skip to content

Commit 2d89cbf

Browse files
author
Rechee
committed
running blocks more directly inside DispatcherScheduler. also fixing stress tests
1 parent 8eb0e37 commit 2d89cbf

File tree

2 files changed

+45
-43
lines changed

2 files changed

+45
-43
lines changed

reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt

+17-15
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,35 @@ public fun CoroutineDispatcher.asScheduler(): Scheduler =
3535

3636
private class DispatcherScheduler(internal val dispatcher: CoroutineDispatcher) : Scheduler() {
3737

38-
val job = SupervisorJob()
39-
private val scope = CoroutineScope(job)
38+
private val job = SupervisorJob()
39+
private val scope = CoroutineScope(job + dispatcher)
4040

41-
override fun scheduleDirect(run: Runnable): Disposable {
41+
override fun scheduleDirect(block: Runnable): Disposable {
4242
if (scope.isActive) {
4343
return scope.launch {
44-
dispatchBlock(run)
44+
block.runWithRx()
4545
}.asDisposable()
4646
}
4747

4848
return Disposables.disposed()
4949
}
5050

51-
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
51+
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
5252
if (delay <= 0) {
53-
return scheduleDirect(run)
53+
return scheduleDirect(block)
5454
}
5555

5656
if (scope.isActive) {
5757
return scope.launch {
5858
delay(unit.toMillis(delay))
59-
dispatchBlock(run)
59+
block.runWithRx()
6060
}.asDisposable()
6161
}
6262

6363
return Disposables.disposed()
6464
}
6565

66-
private fun dispatchBlock(block: Runnable) {
67-
val decoratedRun = RxJavaPlugins.onSchedule(block)
68-
dispatcher.dispatch(EmptyCoroutineContext, decoratedRun)
69-
}
66+
private fun Runnable.runWithRx() = RxJavaPlugins.onSchedule(this).run()
7067

7168
override fun createWorker(): Worker =
7269
DispatcherWorker(dispatcher, job)
@@ -86,7 +83,7 @@ private class DispatcherScheduler(internal val dispatcher: CoroutineDispatcher)
8683
override fun schedule(block: Runnable): Disposable {
8784
startProcessingQueue()
8885
if (workerScope.isActive) {
89-
val job = workerScope.launch(Dispatchers.Unconfined, CoroutineStart.LAZY) {
86+
val job = workerScope.launch(start = CoroutineStart.LAZY) {
9087
block.run()
9188
}
9289
blockChannel.offer(job)
@@ -169,7 +166,12 @@ public class SchedulerCoroutineDispatcher(
169166
override fun hashCode(): Int = System.identityHashCode(scheduler)
170167
}
171168

172-
private fun Job.asDisposable(): Disposable = object : Disposable {
173-
override fun isDisposed(): Boolean = !isActive
174-
override fun dispose() = cancel()
169+
private class JobDisposable(private val job: Job) : Disposable {
170+
override fun isDisposed(): Boolean = !job.isActive
171+
172+
override fun dispose() {
173+
job.cancel()
174+
}
175175
}
176+
177+
public fun Job.asDisposable(): Disposable = JobDisposable(this)

reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt

+28-28
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ package kotlinx.coroutines.rx2
77
import kotlinx.coroutines.*
88
import org.junit.*
99
import java.util.concurrent.*
10-
import kotlin.coroutines.*
1110

1211
class SchedulerStressTest : TestBase() {
1312
@Before
1413
fun setup() {
1514
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
1615
}
1716

17+
/**
18+
* Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd
19+
* see a OOM error.
20+
*/
1821
@Test
1922
fun testScheduleDirectDisposed(): Unit = runTest {
2023
expect(1)
@@ -29,20 +32,14 @@ class SchedulerStressTest : TestBase() {
2932
val n = 2000 * stressTestMultiplier
3033
coroutineScope {
3134
repeat(n) { i ->
32-
launch {
33-
val a = ByteArray(1000000) //1MB
34-
suspendCancellableCoroutine<Unit> {
35-
val disposable = scheduler.scheduleDirect {
36-
runBlocking {
37-
keepMe(a)
38-
it.resume(Unit)
39-
}
40-
}
41-
expect(i + 2)
42-
disposable.dispose()
43-
it.resume(Unit)
35+
val a = ByteArray(1000000) //1MB
36+
val disposable = scheduler.scheduleDirect {
37+
runBlocking {
38+
keepMe(a)
4439
}
4540
}
41+
disposable.dispose()
42+
expect(i + 2)
4643
yield()
4744
}
4845
}
@@ -51,34 +48,37 @@ class SchedulerStressTest : TestBase() {
5148
finish(n + 2)
5249
}
5350

54-
@Ignore
51+
/**
52+
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
53+
* see a OOM error.
54+
*/
5555
@Test
5656
fun testScheduleDirectDisposedDuringDelay(): Unit = runTest {
5757
expect(1)
5858

59-
fun keepMe(a: ByteArray) {
60-
// does nothing, makes sure the variable is kept in state-machine
59+
suspend fun keepMe(a: ByteArray) {
60+
delay(10)
6161
}
6262

6363
val dispatcher = currentDispatcher() as CoroutineDispatcher
6464
val scheduler = dispatcher.asScheduler()
6565

6666
val n = 2000 * stressTestMultiplier
6767
coroutineScope {
68-
repeat(n) {
69-
launch {
70-
val a = ByteArray(1000000) //1MB
71-
val disposable = scheduler.scheduleDirect({
72-
expectUnreached()
73-
}, 50, TimeUnit.MILLISECONDS)
74-
disposable.dispose()
75-
check(disposable.isDisposed)
76-
delay(60)
77-
keepMe(a)
78-
}
68+
repeat(n) { i ->
69+
val a = ByteArray(1000000) //1MB
70+
val disposable = scheduler.scheduleDirect({
71+
runBlocking {
72+
keepMe(a)
73+
}
74+
}, 10, TimeUnit.MILLISECONDS)
75+
disposable.dispose()
76+
expect(i + 2)
77+
yield()
7978
}
8079
}
8180

82-
finish(2)
81+
scheduler.shutdown()
82+
finish(n + 2)
8383
}
8484
}

0 commit comments

Comments
 (0)