Skip to content

ObservableSource.asFlow #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions reactive/kotlinx-coroutines-rx2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ Coroutine builders:

Integration with [Flow]:

| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow

Suspending extension functions and suspending iteration:

Expand Down Expand Up @@ -67,6 +68,7 @@ Conversion functions:
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
Expand Down
33 changes: 33 additions & 0 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -77,6 +79,37 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
send(t)
}

/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
* is applied to the resulting flow.
*
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {

val disposableRef = AtomicReference<Disposable>()

val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onError(e: Throwable) { close(e) }
}

subscribe(observer)
awaitClose { disposableRef.getAndSet(DISPOSED)?.dispose() }
}

private object DISPOSED : Disposable {
override fun isDisposed() = true
override fun dispose() = Unit
}

/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
Expand Down
185 changes: 185 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import io.reactivex.Observable
import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.disposables.Disposables
import io.reactivex.subjects.PublishSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class ObservableAsFlowTest : TestBase() {
@Test
fun testCancellation() = runTest {
var onNext = 0
var onCancelled = 0
var onError = 0

val source = rxObservable(currentDispatcher()) {
coroutineContext[Job]?.invokeOnCompletion {
if (it is CancellationException) ++onCancelled
}

repeat(100) {
send(it)
}
}

source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
onEach {
++onNext
throw RuntimeException()
}
catch<Throwable> {
++onError
}
}.join()


assertEquals(1, onNext)
assertEquals(1, onError)
assertEquals(1, onCancelled)
}

@Test
fun testImmediateCollection() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
GlobalScope.launch(Dispatchers.Unconfined) {
expect(1)
flow.collect { expect(it) }
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onNext(5)
source.onComplete()
finish(7)
}

@Test
fun testOnErrorCancellation() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect { expect(it) }
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(5)
}
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onError(exception)
finish(7)
}

@Test
fun testUnsubscribeOnCollectionException() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect {
expect(it)
if (it == 3) throw exception
}
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(4)
}
expect(5)
}
expect(2)
assertTrue(source.hasObservers())
source.onNext(3)
assertFalse(source.hasObservers())
finish(6)
}

@Test
fun testLateOnSubscribe() {
var observer: Observer<in Int>? = null
val source = ObservableSource<Int> { observer = it }
val flow = source.asFlow()
assertNull(observer)
val job = GlobalScope.launch(Dispatchers.Unconfined) {
expect(1)
flow.collect { expectUnreached() }
expectUnreached()
}
expect(2)
assertNotNull(observer)
job.cancel()
val disposable = Disposables.empty()
observer!!.onSubscribe(disposable)
assertTrue(disposable.isDisposed)
finish(3)
}

@Test
fun testBufferUnlimited() = runTest {
val source = rxObservable(currentDispatcher()) {
expect(1); send(10)
expect(2); send(11)
expect(3); send(12)
expect(4); send(13)
expect(5); send(14)
expect(6); send(15)
expect(7); send(16)
expect(8); send(17)
expect(9)
}
source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
finish(18)
}

@Test
fun testConflated() = runTest {
val source = Observable.range(1, 5)
val list = source.asFlow().conflate().toList()
assertEquals(listOf(1, 5), list)
}

@Test
fun testLongRange() = runTest {
val source = Observable.range(1, 10_000)
val count = source.asFlow().count()
assertEquals(10_000, count)
}

@Test
fun testProduce() = runTest {
val source = Observable.range(0, 10)
val flow = source.asFlow()
check((0..9).toList(), flow.produceIn(this))
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
check((0..9).toList(), flow.buffer(2).produceIn(this))
check((0..9).toList(), flow.buffer(0).produceIn(this))
check(listOf(0, 9), flow.conflate().produceIn(this))
}

private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
val result = ArrayList<Int>(10)
channel.consumeEach { result.add(it) }
assertEquals(expected, result)
}
}