Skip to content

Commit 3858069

Browse files
committed
~fixup replacement for ReceiveChannel.asObservable
1 parent d49215a commit 3858069

File tree

3 files changed

+10
-7
lines changed

3 files changed

+10
-7
lines changed

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutin
137137

138138
@Deprecated(
139139
message = "Deprecated in the favour of Flow",
140-
level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()") // Deprecated since 1.4.0
141-
)
140+
level = DeprecationLevel.ERROR,
141+
replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
142+
) // Deprecated since 1.4.0
142143
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
143144
for (t in this@asObservable)
144145
send(t)

reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
910
import org.junit.Assert
1011
import org.junit.Test
1112
import kotlin.test.*
@@ -126,7 +127,7 @@ class ConvertTest : TestBase() {
126127
delay(50)
127128
send("K")
128129
}
129-
val observable = c.asObservable(Dispatchers.Unconfined)
130+
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
130131
checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
131132
assertEquals("OK", it)
132133
}
@@ -140,7 +141,7 @@ class ConvertTest : TestBase() {
140141
delay(50)
141142
throw TestException("K")
142143
}
143-
val observable = c.asObservable(Dispatchers.Unconfined)
144+
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
144145
val single = rxSingle(Dispatchers.Unconfined) {
145146
var result = ""
146147
try {
@@ -155,4 +156,4 @@ class ConvertTest : TestBase() {
155156
assertEquals("OK", it)
156157
}
157158
}
158-
}
159+
}

reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
910
import org.junit.Test
1011
import org.junit.runner.*
1112
import org.junit.runners.*
@@ -92,7 +93,7 @@ class IntegrationTest(
9293
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
9394
checkNumbers(n, observable)
9495
val channel = observable.openSubscription()
95-
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
96+
checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
9697
channel.cancel()
9798
}
9899

@@ -131,4 +132,4 @@ class IntegrationTest(
131132
assertEquals(n, last)
132133
}
133134

134-
}
135+
}

0 commit comments

Comments
 (0)