-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathRxScheduler.kt
181 lines (152 loc) · 5.76 KB
/
RxScheduler.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.plugins.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*
/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
*/
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
if (this is DispatcherScheduler) {
dispatcher
} else {
SchedulerCoroutineDispatcher(this)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
@JvmName("asCoroutineDispatcher")
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
SchedulerCoroutineDispatcher(this)
/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
*/
public fun CoroutineDispatcher.asScheduler(): Scheduler =
if (this is SchedulerCoroutineDispatcher) {
scheduler
} else {
DispatcherScheduler(this)
}
private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
private val schedulerJob = SupervisorJob()
/**
* The scope for everything happening in this [DispatcherScheduler].
*
* Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)
/**
* The counter of created workers, for their pretty-printing.
*/
private val workerCounter = atomic(1L)
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}
override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
override fun shutdown() {
schedulerJob.cancel()
}
private class DispatcherWorker(
private val counter: Long,
private val dispatcher: CoroutineDispatcher,
parentJob: Job
) : Worker() {
private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
init {
workerScope.launch {
blockChannel.consumeEach {
it()
}
}
}
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { blockChannel.trySend(task) }
}
override fun isDisposed(): Boolean = !workerScope.isActive
override fun dispose() {
blockChannel.close()
workerJob.cancel()
}
override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
}
override fun toString(): String = dispatcher.toString()
}
private typealias Task = suspend () -> Unit
/**
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
* milliseconds.
*/
private fun CoroutineScope.scheduleTask(
block: Runnable,
delayMillis: Long,
adaptForScheduling: (Task) -> Runnable
): Disposable {
val ctx = coroutineContext
var handle: DisposableHandle? = null
val disposable = Disposable.fromRunnable {
// null if delay <= 0
handle?.dispose()
}
val decoratedBlock = RxJavaPlugins.onSchedule(block)
suspend fun task() {
if (disposable.isDisposed) return
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
handleUndeliverableException(e, ctx)
}
}
val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposable.disposed()
if (delayMillis <= 0) {
toSchedule.run()
} else {
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
}
return disposable
}
/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
*/
public class SchedulerCoroutineDispatcher(
/**
* Underlying scheduler of current [CoroutineDispatcher].
*/
public val scheduler: Scheduler
) : CoroutineDispatcher(), Delay {
/** @suppress */
override fun dispatch(context: CoroutineContext, block: Runnable) {
scheduler.scheduleDirect(block)
}
/** @suppress */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val disposable = scheduler.scheduleDirect({
with(continuation) { resumeUndispatched(Unit) }
}, timeMillis, TimeUnit.MILLISECONDS)
continuation.disposeOnCancellation(disposable)
}
/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
return DisposableHandle { disposable.dispose() }
}
/** @suppress */
override fun toString(): String = scheduler.toString()
/** @suppress */
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}