Skip to content

Commit ac8b6da

Browse files
sdeleuzeelizarov
authored andcommitted
Update Reactor support to leverage Bismuth release train
PR #141
1 parent bc5a0c4 commit ac8b6da

File tree

4 files changed

+19
-40
lines changed

4 files changed

+19
-40
lines changed

reactive/kotlinx-coroutines-reactor/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<dependency>
6565
<groupId>io.projectreactor</groupId>
6666
<artifactId>reactor-bom</artifactId>
67-
<version>Aluminium-SR3</version>
67+
<version>Bismuth-RELEASE</version>
6868
<type>pom</type>
6969
<scope>import</scope>
7070
</dependency>
@@ -77,7 +77,7 @@
7777
<artifactId>reactor-core</artifactId>
7878
</dependency>
7979
<dependency>
80-
<groupId>io.projectreactor.addons</groupId>
80+
<groupId>io.projectreactor</groupId>
8181
<artifactId>reactor-test</artifactId>
8282
<scope>test</scope>
8383
</dependency>
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
package kotlinx.coroutines.experimental.reactor
22

3-
import kotlinx.coroutines.experimental.CancellableContinuation
4-
import kotlinx.coroutines.experimental.CoroutineDispatcher
5-
import kotlinx.coroutines.experimental.Delay
6-
import kotlinx.coroutines.experimental.DisposableHandle
7-
import kotlinx.coroutines.experimental.disposeOnCompletion
8-
import reactor.core.Cancellation
3+
import kotlinx.coroutines.experimental.*
4+
import reactor.core.Disposable
95
import reactor.core.scheduler.Scheduler
10-
import reactor.core.scheduler.TimedScheduler
116
import java.util.concurrent.TimeUnit
127
import kotlin.coroutines.experimental.CoroutineContext
138

@@ -16,45 +11,31 @@ import kotlin.coroutines.experimental.CoroutineContext
1611
*/
1712
fun Scheduler.asCoroutineDispatcher() = SchedulerCoroutineDispatcher(this)
1813

19-
/**
20-
* Converts an instance of [TimedScheduler] to an implementation of [CoroutineDispatcher]
21-
* and provides native [delay][Delay.delay] support.
22-
*/
23-
fun TimedScheduler.asCoroutineDispatcher() = TimedSchedulerCoroutineDispatcher(this)
24-
2514
/**
2615
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
2716
* @param scheduler a scheduler.
2817
*/
29-
open class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : CoroutineDispatcher() {
18+
open class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : CoroutineDispatcher(), Delay {
3019
override fun dispatch(context: CoroutineContext, block: Runnable) {
3120
scheduler.schedule(block)
3221
}
3322

34-
override fun toString(): String = scheduler.toString()
35-
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
36-
override fun hashCode(): Int = System.identityHashCode(scheduler)
37-
}
38-
39-
/**
40-
* Implements [CoroutineDispatcher] on top of an arbitrary [TimedScheduler].
41-
* @param scheduler a timed scheduler.
42-
*/
43-
open class TimedSchedulerCoroutineDispatcher(private val scheduler: TimedScheduler) : SchedulerCoroutineDispatcher(scheduler), Delay {
44-
4523
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
4624
val disposable = scheduler.schedule({
4725
with(continuation) { resumeUndispatched(Unit) }
4826
}, time, unit)
49-
5027
continuation.disposeOnCompletion(disposable.asDisposableHandle())
5128
}
5229

5330
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
5431
scheduler.schedule(block, time, unit).asDisposableHandle()
32+
33+
override fun toString(): String = scheduler.toString()
34+
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
35+
override fun hashCode(): Int = System.identityHashCode(scheduler)
5536
}
5637

57-
private fun Cancellation.asDisposableHandle(): DisposableHandle =
58-
object : DisposableHandle {
59-
override fun dispose() = this@asDisposableHandle.dispose()
60-
}
38+
private fun Disposable.asDisposableHandle(): DisposableHandle =
39+
object : DisposableHandle {
40+
override fun dispose() = this@asDisposableHandle.dispose()
41+
}

reactive/kotlinx-coroutines-reactor/src/test/kotlin/kotlinx/coroutines/experimental/reactor/FluxSingleTest.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717
package kotlinx.coroutines.experimental.reactor
1818

1919
import kotlinx.coroutines.experimental.CommonPool
20-
import kotlinx.coroutines.experimental.reactive.awaitFirst
21-
import kotlinx.coroutines.experimental.reactive.awaitFirstOrDefault
22-
import kotlinx.coroutines.experimental.reactive.awaitLast
23-
import kotlinx.coroutines.experimental.reactive.awaitSingle
24-
import kotlinx.coroutines.experimental.reactive.consumeEach
20+
import kotlinx.coroutines.experimental.reactive.*
2521
import kotlinx.coroutines.experimental.runBlocking
2622
import org.junit.Assert.assertEquals
2723
import org.junit.Assert.fail
2824
import org.junit.Test
2925
import reactor.core.publisher.Flux
26+
import java.time.Duration.ofMillis
3027

3128
/**
3229
* Tests emitting single item with [flux].
@@ -62,7 +59,7 @@ class FluxSingleTest {
6259
@Test
6360
fun testSingleWithDelay() {
6461
val flux = flux(CommonPool) {
65-
send(Flux.just("O").delayMillis(50).awaitSingle() + "K")
62+
send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
6663
}
6764

6865
checkSingleValue(flux) {

reactive/kotlinx-coroutines-reactor/src/test/kotlin/kotlinx/coroutines/experimental/reactor/MonoTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.junit.Before
3232
import org.junit.Test
3333
import reactor.core.publisher.Flux
3434
import reactor.core.publisher.Mono
35+
import java.time.Duration.ofMillis
3536

3637
/**
3738
* Tests emitting single item with [mono].
@@ -87,7 +88,7 @@ class MonoTest : TestBase() {
8788
null
8889
}
8990
expect(2)
90-
mono.subscribe ({}, { throw it }, {
91+
mono.subscribe({}, { throw it }, {
9192
expect(5)
9293
})
9394
expect(3)
@@ -148,7 +149,7 @@ class MonoTest : TestBase() {
148149
@Test
149150
fun testMonoWithDelay() {
150151
val mono = mono(CommonPool) {
151-
Flux.just("O").delayMillis(50).awaitSingle() + "K"
152+
Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
152153
}
153154

154155
checkMonoValue(mono) {

0 commit comments

Comments
 (0)