Skip to content

Commit 1395c4e

Browse files
committed
adapt examples for use with ReceiveChannel
1 parent fc5048c commit 1395c4e

File tree

5 files changed

+15
-16
lines changed

5 files changed

+15
-16
lines changed

reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class IntegrationTest(
106106
checkNumbers(n, pub)
107107
val channel = pub.openSubscription()
108108
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
109-
channel.close()
109+
channel.cancel()
110110
}
111111

112112
private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class IntegrationTest(
128128
checkNumbers(n, observable)
129129
val channel = observable.openSubscription()
130130
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
131-
channel.close()
131+
channel.cancel()
132132
}
133133

134134
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,10 @@ fun main(args: Array<String>) = runBlocking<Unit> {
2626
.doOnSubscribe { println("OnSubscribe") } // provide some insight
2727
.doFinally { println("Finally") } // ... into what's going on
2828
var cnt = 0
29-
source.openSubscription().use { channel -> // open channel to the source
30-
for (x in channel) { // iterate over the channel to receive elements from it
31-
println(x)
32-
if (++cnt >= 3) break // break when 3 elements are printed
33-
}
34-
// `use` will close the channel when this block of code is complete
29+
val channel = source.openSubscription() // open channel to the source
30+
for (x in channel) { // iterate over the channel to receive elements from it
31+
println(x)
32+
if (++cnt >= 3) break // break when 3 elements are printed
3533
}
34+
channel.cancel() // `cancel` closes the channel
3635
}

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import kotlin.coroutines.experimental.CoroutineContext
2424
import kotlinx.coroutines.experimental.selects.whileSelect
2525

2626
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
27-
this@takeUntil.openSubscription().use { thisChannel -> // explicitly open channel to Publisher<T>
28-
other.openSubscription().use { otherChannel -> // explicitly open channel to Publisher<U>
29-
whileSelect {
30-
otherChannel.onReceive { false } // bail out on any received element from `other`
31-
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
32-
}
33-
}
27+
val thisChannel = this@takeUntil.openSubscription() // explicitly open channel to Publisher<T>
28+
val otherChannel = other.openSubscription() // explicitly open channel to Publisher<U>
29+
whileSelect {
30+
otherChannel.onReceive { false } // bail out on any received element from `other`
31+
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
3432
}
33+
thisChannel.cancel()
34+
otherChannel.cancel()
3535
}
3636

3737
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {

reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class IntegrationTest(
108108
checkNumbers(n, observable)
109109
val channel = observable.openSubscription()
110110
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
111-
channel.close()
111+
channel.cancel()
112112
}
113113

114114
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {

0 commit comments

Comments
 (0)