Skip to content

Commit 170690f

Browse files
committed
Flow.asObservable and Flow.asFlowable converters in rx2 module
1 parent 2a815e8 commit 170690f

File tree

8 files changed

+247
-5
lines changed

8 files changed

+247
-5
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
2828
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
2929
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
3030
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
31+
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
32+
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
3133
}
3234

3335
public final class kotlinx/coroutines/rx2/RxFlowableKt {

gradle.properties

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dokka_version=0.9.16-rdev-2-mpp-hacks
1212
bintray_version=1.8.4-jetbrains-5
1313
byte_buddy_version=1.9.3
1414
reactor_vesion=3.2.5.RELEASE
15+
reactive_streams_version=1.0.2
1516
artifactory_plugin_version=4.7.3
1617

1718
# JS

reactive/kotlinx-coroutines-reactive/build.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
ext.reactive_streams_version = '1.0.2'
6-
75
dependencies {
86
compile "org.reactivestreams:reactive-streams:$reactive_streams_version"
97
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"

reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ import java.util.concurrent.atomic.*
1111
import kotlin.coroutines.*
1212

1313
/**
14-
* Transforms the given flow to a spec-compliant [Publisher]
14+
* Transforms the given flow to a spec-compliant [Publisher].
1515
*/
1616
@JvmName("from")
1717
@FlowPreview
1818
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
1919

2020
/**
2121
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
22-
* Any calls to [cancel] cancel the original flow.
22+
* [cancel] invocation cancels the original flow.
2323
*/
2424
@Suppress("PublisherImplementation")
2525
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {

reactive/kotlinx-coroutines-rx2/build.gradle

+22-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,32 @@
44

55
dependencies {
66
compile project(':kotlinx-coroutines-reactive')
7+
testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
8+
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
79
compile 'io.reactivex.rxjava2:rxjava:2.1.9'
810
}
911

1012
tasks.withType(dokka.getClass()) {
1113
externalDocumentationLink {
1214
url = new URL('http://reactivex.io/RxJava/javadoc/')
1315
}
14-
}
16+
}
17+
18+
task testNG(type: Test) {
19+
useTestNG()
20+
reports.html.destination = file("$buildDir/reports/testng")
21+
include '**/*ReactiveStreamTckTest.*'
22+
// Skip testNG when tests are filtered with --tests, otherwise it simply fails
23+
onlyIf {
24+
filter.includePatterns.isEmpty()
25+
}
26+
doFirst {
27+
// Classic gradle, nothing works without doFirst
28+
println "TestNG tests: ($includes)"
29+
}
30+
}
31+
32+
test {
33+
dependsOn(testNG)
34+
reports.html.destination = file("$buildDir/reports/junit")
35+
}

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

+40
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ package kotlinx.coroutines.rx2
77
import io.reactivex.*
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.*
11+
import kotlinx.coroutines.reactive.flow.*
12+
import org.reactivestreams.*
1013
import kotlin.coroutines.*
1114

1215
/**
@@ -76,3 +79,40 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
7679
for (t in this@asObservable)
7780
send(t)
7881
}
82+
83+
/**
84+
* Converts the given flow to a cold observable.
85+
* The original flow is cancelled if the observable subscriber was disposed.
86+
*/
87+
@FlowPreview
88+
@JvmName("from")
89+
public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
90+
/*
91+
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
92+
* asObservable is already invoked from unconfined
93+
*/
94+
val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
95+
try {
96+
collect { value -> emitter.onNext(value) }
97+
emitter.onComplete()
98+
} catch (e: Throwable) {
99+
// 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
100+
if (e !is CancellationException) emitter.onError(e)
101+
else emitter.onComplete()
102+
103+
}
104+
}
105+
emitter.setCancellable(RxCancellable(job))
106+
}
107+
108+
/**
109+
* Converts the given flow to a cold observable.
110+
* The original flow is cancelled if the flowable subscriber was disposed.
111+
*/
112+
@FlowPreview
113+
@JvmName("from")
114+
public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = FlowAsFlowable(asPublisher())
115+
116+
private class FlowAsFlowable<T: Any>(private val publisher: Publisher<T>) : Flowable<T>() {
117+
override fun subscribeActual(s: Subscriber<in T>?) = publisher.subscribe(s)
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.hamcrest.core.*
10+
import org.junit.*
11+
import org.junit.Assert.*
12+
13+
class FlowAsObservableTest : TestBase() {
14+
@Test
15+
fun testBasicSuccess() = runTest {
16+
expect(1)
17+
val observable = flow {
18+
expect(3)
19+
emit("OK")
20+
}.asObservable()
21+
22+
expect(2)
23+
observable.subscribe { value ->
24+
expect(4)
25+
assertEquals("OK", value)
26+
}
27+
28+
finish(5)
29+
}
30+
31+
@Test
32+
fun testBasicFailure() = runTest {
33+
expect(1)
34+
val observable = flow<Int> {
35+
expect(3)
36+
throw RuntimeException("OK")
37+
}.asObservable()
38+
39+
expect(2)
40+
observable.subscribe({ expectUnreached() }, { error ->
41+
expect(4)
42+
assertThat(error, IsInstanceOf(RuntimeException::class.java))
43+
assertEquals("OK", error.message)
44+
})
45+
finish(5)
46+
}
47+
48+
@Test
49+
fun testBasicUnsubscribe() = runTest {
50+
expect(1)
51+
val observable = flow<Int> {
52+
expect(3)
53+
hang {
54+
expect(4)
55+
}
56+
}.asObservable()
57+
58+
expect(2)
59+
val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
60+
sub.dispose() // will cancel coroutine
61+
finish(5)
62+
}
63+
64+
@Test
65+
fun testNotifyOnceOnCancellation() = runTest {
66+
val observable =
67+
flow {
68+
expect(3)
69+
emit("OK")
70+
hang {
71+
expect(7)
72+
}
73+
}.asObservable()
74+
.doOnNext {
75+
expect(4)
76+
assertEquals("OK", it)
77+
}
78+
.doOnDispose {
79+
expect(6) // notified once!
80+
}
81+
82+
expect(1)
83+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
84+
expect(2)
85+
observable.consumeEach {
86+
expect(5)
87+
assertEquals("OK", it)
88+
}
89+
}
90+
91+
yield()
92+
job.cancelAndJoin()
93+
finish(8)
94+
}
95+
96+
@Test
97+
fun testFailingConsumer() = runTest {
98+
expect(1)
99+
val observable = flow {
100+
expect(2)
101+
emit("OK")
102+
hang {
103+
expect(4)
104+
}
105+
106+
}.asObservable()
107+
108+
try {
109+
observable.consumeEach {
110+
expect(3)
111+
throw TestException()
112+
}
113+
} catch (e: TestException) {
114+
finish(5)
115+
}
116+
}
117+
118+
@Test
119+
fun testNonAtomicStart() = runTest {
120+
withContext(Dispatchers.Unconfined) {
121+
val observable = flow<Int> {
122+
expect(1)
123+
}.asObservable()
124+
125+
val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
126+
disposable.dispose()
127+
}
128+
finish(2)
129+
}
130+
131+
@Test
132+
fun testFlowCancelledFromWithin() = runTest {
133+
val observable = flow {
134+
expect(1)
135+
emit(1)
136+
kotlin.coroutines.coroutineContext.cancel()
137+
kotlin.coroutines.coroutineContext.ensureActive()
138+
expectUnreached()
139+
}.asObservable()
140+
141+
observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
142+
}
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import io.reactivex.*
8+
import kotlinx.coroutines.flow.*
9+
import org.junit.*
10+
import org.reactivestreams.*
11+
import org.reactivestreams.tck.*
12+
13+
class IterableFlowAsFlowableTckTest : PublisherVerification<Long>(TestEnvironment()) {
14+
15+
private fun generate(num: Long): Array<Long> {
16+
return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
17+
}
18+
19+
override fun createPublisher(elements: Long): Flowable<Long> {
20+
return generate(elements).asIterable().asFlow().asFlowable()
21+
}
22+
23+
override fun createFailedPublisher(): Publisher<Long>? = null
24+
25+
@Ignore
26+
override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
27+
}
28+
29+
@Ignore
30+
override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
31+
}
32+
33+
@Ignore
34+
override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
35+
//
36+
}
37+
}

0 commit comments

Comments
 (0)