Skip to content

Commit 1d6230a

Browse files
deva666elizarov
authored andcommitted
Replace SubscriptionReceiveChannel with ReceiveChannel
Fixes #283
1 parent b418074 commit 1d6230a

File tree

19 files changed

+150
-147
lines changed

19 files changed

+150
-147
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class ArrayBroadcastChannel<E>(
6868
override val isBufferAlwaysFull: Boolean get() = false
6969
override val isBufferFull: Boolean get() = size >= capacity
7070

71-
override fun openSubscription(): SubscriptionReceiveChannel<E> =
71+
override fun openSubscription(): ReceiveChannel<E> =
7272
Subscriber(this).also {
7373
updateHead(addSub = it)
7474
}
@@ -195,7 +195,7 @@ class ArrayBroadcastChannel<E>(
195195

196196
private class Subscriber<E>(
197197
private val broadcastChannel: ArrayBroadcastChannel<E>
198-
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
198+
) : AbstractChannel<E>(), ReceiveChannel<E> {
199199
private val subLock = ReentrantLock()
200200

201201
@Volatile

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import kotlinx.coroutines.experimental.internal.Closeable
2323

2424
/**
2525
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
26-
* that subscribe for the elements using [openSubscription] function and unsubscribe using [SubscriptionReceiveChannel.close]
26+
* that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
2727
* function.
2828
*
2929
* See `BroadcastChannel()` factory function for the description of available
@@ -45,17 +45,17 @@ public interface BroadcastChannel<E> : SendChannel<E> {
4545

4646
/**
4747
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
48-
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
48+
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
4949
* broadcast channel.
5050
*/
51-
public fun openSubscription(): SubscriptionReceiveChannel<E>
51+
public fun openSubscription(): ReceiveChannel<E>
5252

5353
/**
5454
* @suppress **Deprecated**: Renamed to [openSubscription]
5555
*/
5656
@Deprecated(message = "Renamed to `openSubscription`",
5757
replaceWith = ReplaceWith("openSubscription()"))
58-
public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
58+
public fun open(): ReceiveChannel<E> = openSubscription()
5959
}
6060

6161
/**
@@ -80,6 +80,7 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
8080
*
8181
* Note, that invocation of [cancel] also closes subscription.
8282
*/
83+
@Deprecated("Deprecated in favour of `ReceiveChannel`")
8384
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
8485
/**
8586
* Closes this subscription. This is a synonym for [cancel].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfin
4646

4747
/**
4848
* Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
49-
* from it by always invoking [cancel][SubscriptionReceiveChannel.cancel] after the execution of the block.
49+
* from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
5050
*/
51-
public inline fun <E, R> BroadcastChannel<E>.consume(block: SubscriptionReceiveChannel<E>.() -> R): R {
51+
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
5252
val channel = openSubscription()
5353
try {
5454
return channel.block()

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import kotlinx.coroutines.experimental.selects.*
3434
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
3535
*
3636
* This implementation is fully lock-free. In this implementation
37-
* [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
37+
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
3838
* number of subscribers.
3939
*/
4040
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
@@ -114,7 +114,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
114114
override val isFull: Boolean get() = false
115115

116116
@Suppress("UNCHECKED_CAST")
117-
override fun openSubscription(): SubscriptionReceiveChannel<E> {
117+
override fun openSubscription(): ReceiveChannel<E> {
118118
val subscriber = Subscriber<E>(this)
119119
_state.loop { state ->
120120
when (state) {
@@ -247,7 +247,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
247247

248248
private class Subscriber<E>(
249249
private val broadcastChannel: ConflatedBroadcastChannel<E>
250-
) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
250+
) : ConflatedChannel<E>(), ReceiveChannel<E> {
251251
override fun cancel(cause: Throwable?): Boolean =
252252
close(cause).also { closed ->
253253
if (closed) broadcastChannel.closeSubscriber(this)

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
3838
expect(6)
3939
assertEquals("two", sub.receive()) // suspends
4040
expect(12)
41-
sub.close()
41+
sub.cancel()
4242
expect(13)
4343
}
4444

@@ -60,7 +60,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
6060
expect(17)
6161
assertNull(sub.receiveOrNull()) // suspends until closed
6262
expect(20)
63-
sub.close()
63+
sub.cancel()
6464
expect(21)
6565
}
6666

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ class BroadcastChannelMultiReceiveStressTest(
7878
val name = "Receiver$receiverIndex"
7979
println("Launching $name")
8080
receivers += launch(ctx + CoroutineName(name)) {
81-
broadcast.openSubscription().use { sub ->
81+
val channel = broadcast.openSubscription()
8282
when (receiverIndex % 5) {
83-
0 -> doReceive(sub, receiverIndex)
84-
1 -> doReceiveOrNull(sub, receiverIndex)
85-
2 -> doIterator(sub, receiverIndex)
86-
3 -> doReceiveSelect(sub, receiverIndex)
87-
4 -> doReceiveSelectOrNull(sub, receiverIndex)
83+
0 -> doReceive(channel, receiverIndex)
84+
1 -> doReceiveOrNull(channel, receiverIndex)
85+
2 -> doIterator(channel, receiverIndex)
86+
3 -> doReceiveSelect(channel, receiverIndex)
87+
4 -> doReceiveSelectOrNull(channel, receiverIndex)
8888
}
89-
}
89+
channel.cancel()
9090
}
9191
printProgress()
9292
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ class BroadcastChannelSubStressTest(
6060
launch(context = ctx + CoroutineName("Receiver")) {
6161
var last = -1L
6262
while (isActive) {
63-
broadcast.openSubscription().use { sub ->
64-
val i = sub.receive()
65-
check(i >= last) { "Last was $last, got $i" }
66-
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
67-
receivedTotal.incrementAndGet()
68-
last = i
69-
}
63+
val channel = broadcast.openSubscription()
64+
val i = channel.receive()
65+
check(i >= last) { "Last was $last, got $i" }
66+
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
67+
receivedTotal.incrementAndGet()
68+
last = i
69+
channel.cancel()
7070
}
7171
}
7272
var prevSent = -1L

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
9595
}
9696

9797
suspend fun waitForEvent(): Int =
98-
broadcast.openSubscription().use {
99-
it.receive()
98+
with(broadcast.openSubscription()) {
99+
val value = receive()
100+
cancel()
101+
value
100102
}
101103
}

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ package kotlinx.coroutines.experimental.reactive
1919
import kotlinx.atomicfu.atomic
2020
import kotlinx.atomicfu.loop
2121
import kotlinx.coroutines.experimental.channels.LinkedListChannel
22-
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
22+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2323
import org.reactivestreams.Publisher
2424
import org.reactivestreams.Subscriber
2525
import org.reactivestreams.Subscription
2626

2727
/**
2828
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
29-
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this publisher.
29+
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
3030
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
3131
*/
3232
@JvmOverloads // for binary compatibility
33-
public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> {
33+
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
3434
val channel = SubscriptionChannel<T>(request)
3535
subscribe(channel)
3636
return channel
@@ -41,7 +41,7 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionRece
4141
*/
4242
@Deprecated(message = "Renamed to `openSubscription`",
4343
replaceWith = ReplaceWith("openSubscription()"))
44-
public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> = openSubscription()
44+
public fun <T> Publisher<T>.open(): ReceiveChannel<T> = openSubscription()
4545

4646
/**
4747
* Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
@@ -60,9 +60,9 @@ public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
6060
* Subscribes to this [Publisher] and performs the specified action for each received element.
6161
*/
6262
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
63-
openSubscription().use { channel ->
64-
for (x in channel) action(x)
65-
}
63+
val channel = openSubscription()
64+
for (x in channel) action(x)
65+
channel.cancel()
6666
}
6767

6868
/**
@@ -74,7 +74,7 @@ public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =
7474

7575
private class SubscriptionChannel<T>(
7676
private val request: Int
77-
) : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
77+
) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T> {
7878
init {
7979
require(request >= 0) { "Invalid request size: $request" }
8080
}

reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class IntegrationTest(
104104
checkNumbers(n, pub)
105105
val channel = pub.openSubscription()
106106
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
107-
channel.close()
107+
channel.cancel()
108108
}
109109

110110
private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {

reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherSubscriptionSelectTest.kt

+25-24
Original file line numberDiff line numberDiff line change
@@ -40,33 +40,34 @@ class PublisherSubscriptionSelectTest(val request: Int) : TestBase() {
4040
var a = 0
4141
var b = 0
4242
// open two subs
43-
source.openSubscription(request).use { channelA ->
44-
source.openSubscription(request).use { channelB ->
45-
loop@ while (true) {
46-
val done: Int = select {
47-
channelA.onReceiveOrNull {
48-
if (it != null) assertEquals(a++, it)
49-
if (it == null) 0 else 1
50-
}
51-
channelB.onReceiveOrNull {
52-
if (it != null) assertEquals(b++, it)
53-
if (it == null) 0 else 2
54-
}
55-
}
56-
when (done) {
57-
0 -> break@loop
58-
1 -> {
59-
val r = channelB.receiveOrNull()
60-
if (r != null) assertEquals(b++, r)
61-
}
62-
2 -> {
63-
val r = channelA.receiveOrNull()
64-
if (r != null) assertEquals(a++, r)
65-
}
66-
}
43+
val channelA = source.openSubscription(request)
44+
val channelB = source.openSubscription(request)
45+
loop@ while (true) {
46+
val done: Int = select {
47+
channelA.onReceiveOrNull {
48+
if (it != null) assertEquals(a++, it)
49+
if (it == null) 0 else 1
50+
}
51+
channelB.onReceiveOrNull {
52+
if (it != null) assertEquals(b++, it)
53+
if (it == null) 0 else 2
54+
}
55+
}
56+
when (done) {
57+
0 -> break@loop
58+
1 -> {
59+
val r = channelB.receiveOrNull()
60+
if (r != null) assertEquals(b++, r)
61+
}
62+
2 -> {
63+
val r = channelA.receiveOrNull()
64+
if (r != null) assertEquals(a++, r)
6765
}
6866
}
6967
}
68+
69+
channelA.cancel()
70+
channelB.cancel()
7071
// should receive one of them fully
7172
assertTrue(a == n || b == n)
7273
}

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ package kotlinx.coroutines.experimental.rx1
1919
import kotlinx.atomicfu.atomic
2020
import kotlinx.atomicfu.loop
2121
import kotlinx.coroutines.experimental.channels.LinkedListChannel
22-
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
22+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2323
import rx.Observable
2424
import rx.Subscriber
2525
import rx.Subscription
2626

2727
/**
2828
* Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
29-
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this observable.
29+
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this observable.
3030
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
3131
*/
3232
@JvmOverloads // for binary compatibility
33-
public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> {
33+
public fun <T> Observable<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
3434
val channel = SubscriptionChannel<T>(request)
3535
val subscription = subscribe(channel.subscriber)
3636
channel.subscription = subscription
@@ -43,7 +43,7 @@ public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionRec
4343
*/
4444
@Deprecated(message = "Renamed to `openSubscription`",
4545
replaceWith = ReplaceWith("openSubscription()"))
46-
public fun <T> Observable<T>.open(): SubscriptionReceiveChannel<T> = openSubscription()
46+
public fun <T> Observable<T>.open(): ReceiveChannel<T> = openSubscription()
4747

4848
/**
4949
* Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
@@ -62,9 +62,9 @@ public operator fun <T> Observable<T>.iterator() = openSubscription().iterator()
6262
* Subscribes to this [Observable] and performs the specified action for each received element.
6363
*/
6464
public suspend inline fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
65-
openSubscription().use { channel ->
66-
for (x in channel) action(x)
67-
}
65+
val channel = openSubscription()
66+
for (x in channel) action(x)
67+
channel.cancel()
6868
}
6969

7070
/**
@@ -76,7 +76,7 @@ public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
7676

7777
private class SubscriptionChannel<T>(
7878
private val request: Int
79-
) : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
79+
) : LinkedListChannel<T>(), ReceiveChannel<T> {
8080
init {
8181
require(request >= 0) { "Invalid request size: $request" }
8282
}

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class IntegrationTest(
126126
checkNumbers(n, observable)
127127
val channel = observable.openSubscription()
128128
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
129-
channel.close()
129+
channel.cancel()
130130
}
131131

132132
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {

0 commit comments

Comments
 (0)