Skip to content

Remove SubscriptionReceiveChannel #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ArrayBroadcastChannel<E>(
override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity

override fun openSubscription(): SubscriptionReceiveChannel<E> =
override fun openSubscription(): ReceiveChannel<E> =
Subscriber(this).also {
updateHead(addSub = it)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ class ArrayBroadcastChannel<E>(

private class Subscriber<E>(
private val broadcastChannel: ArrayBroadcastChannel<E>
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
) : AbstractChannel<E>(), ReceiveChannel<E> {
private val subLock = ReentrantLock()

@Volatile @JvmField
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Closeable

/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
* that subscribe for the elements using [openSubscription] function and unsubscribe using [SubscriptionReceiveChannel.close]
* that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
* function.
*
* See `BroadcastChannel()` factory function for the description of available
Expand All @@ -44,17 +44,17 @@ public interface BroadcastChannel<E> : SendChannel<E> {

/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
* broadcast channel.
*/
public fun openSubscription(): SubscriptionReceiveChannel<E>
public fun openSubscription(): ReceiveChannel<E>

/**
* @suppress **Deprecated**: Renamed to [openSubscription]
*/
@Deprecated(message = "Renamed to `openSubscription`",
replaceWith = ReplaceWith("openSubscription()"))
public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
public fun open(): ReceiveChannel<E> = openSubscription()
}

/**
Expand All @@ -79,6 +79,7 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
*
* Note, that invocation of [cancel] also closes subscription.
*/
@Deprecated("Deprecated in favour of `ReceiveChannel`")
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
/**
* Closes this subscription. This is a synonym for [cancel].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfin

/**
* Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
* from it by always invoking [cancel][SubscriptionReceiveChannel.cancel] after the execution of the block.
* from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
*/
public inline fun <E, R> BroadcastChannel<E>.consume(block: SubscriptionReceiveChannel<E>.() -> R): R {
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
try {
return channel.block()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import kotlinx.coroutines.experimental.selects.SelectInstance
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* This implementation is fully lock-free. In this implementation
* [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*/
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
Expand Down Expand Up @@ -115,7 +115,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
override val isFull: Boolean get() = false

@Suppress("UNCHECKED_CAST")
override fun openSubscription(): SubscriptionReceiveChannel<E> {
override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber<E>(this)
_state.loop { state ->
when (state) {
Expand Down Expand Up @@ -248,7 +248,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {

private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
) : ConflatedChannel<E>(), ReceiveChannel<E> {
override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.closeSubscriber(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class ArrayBroadcastChannelTest : TestBase() {
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
sub.close()
sub.cancel()
}
}
check(expected == 2)
Expand All @@ -174,7 +174,7 @@ class ArrayBroadcastChannelTest : TestBase() {
val channel = BroadcastChannel<Int>(1)
val sub = channel.openSubscription()
assertFalse(sub.isClosedForReceive)
sub.close()
sub.cancel()
assertTrue(sub.isClosedForReceive)
sub.receive()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ class BroadcastChannelMultiReceiveStressTest(
val name = "Receiver$receiverIndex"
println("Launching $name")
receivers += launch(ctx + CoroutineName(name)) {
broadcast.openSubscription().use { sub ->
val channel = broadcast.openSubscription()
when (receiverIndex % 5) {
0 -> doReceive(sub, receiverIndex)
1 -> doReceiveOrNull(sub, receiverIndex)
2 -> doIterator(sub, receiverIndex)
3 -> doReceiveSelect(sub, receiverIndex)
4 -> doReceiveSelectOrNull(sub, receiverIndex)
0 -> doReceive(channel, receiverIndex)
1 -> doReceiveOrNull(channel, receiverIndex)
2 -> doIterator(channel, receiverIndex)
3 -> doReceiveSelect(channel, receiverIndex)
4 -> doReceiveSelectOrNull(channel, receiverIndex)
}
}
channel.cancel()
}
printProgress()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ class BroadcastChannelSubStressTest(
launch(context = ctx + CoroutineName("Receiver")) {
var last = -1L
while (isActive) {
broadcast.openSubscription().use { sub ->
val i = sub.receive()
check(i >= last) { "Last was $last, got $i" }
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
receivedTotal.incrementAndGet()
last = i
}
val channel = broadcast.openSubscription()
val i = channel.receive()
check(i >= last) { "Last was $last, got $i" }
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
receivedTotal.incrementAndGet()
last = i
channel.cancel()
}
}
var prevSent = -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
}

suspend fun waitForEvent(): Int =
broadcast.openSubscription().use {
it.receive()
with(broadcast.openSubscription()) {
val value = receive()
cancel()
value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
expect(6)
assertThat(sub.receive(), IsEqual("two")) // suspends
expect(12)
sub.close()
sub.cancel()
expect(13)
}
expect(4)
Expand All @@ -59,7 +59,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
expect(17)
assertThat(sub.receiveOrNull(), IsNull()) // suspends until closed
expect(20)
sub.close()
sub.cancel()
expect(21)
}
expect(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package kotlinx.coroutines.experimental.reactive
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.channels.LinkedListChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

/**
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this publisher.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
*/
@JvmOverloads // for binary compatibility
public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> {
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>(request)
subscribe(channel)
return channel
Expand All @@ -41,7 +41,7 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionRece
*/
@Deprecated(message = "Renamed to `openSubscription`",
replaceWith = ReplaceWith("openSubscription()"))
public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> = openSubscription()
public fun <T> Publisher<T>.open(): ReceiveChannel<T> = openSubscription()

/**
* Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
Expand All @@ -60,9 +60,9 @@ public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
* Subscribes to this [Publisher] and performs the specified action for each received element.
*/
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
openSubscription().use { channel ->
for (x in channel) action(x)
}
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
}

/**
Expand All @@ -74,7 +74,7 @@ public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =

private class SubscriptionChannel<T>(
private val request: Int
) : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T> {
init {
require(request >= 0) { "Invalid request size: $request" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class IntegrationTest(
checkNumbers(n, pub)
val channel = pub.openSubscription()
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
channel.close()
channel.cancel()
}

private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,34 @@ class PublisherSubscriptionSelectTest(val request: Int) : TestBase() {
var a = 0
var b = 0
// open two subs
source.openSubscription(request).use { channelA ->
source.openSubscription(request).use { channelB ->
loop@ while (true) {
val done: Int = select {
channelA.onReceiveOrNull {
if (it != null) assertEquals(a++, it)
if (it == null) 0 else 1
}
channelB.onReceiveOrNull {
if (it != null) assertEquals(b++, it)
if (it == null) 0 else 2
}
}
when (done) {
0 -> break@loop
1 -> {
val r = channelB.receiveOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
val r = channelA.receiveOrNull()
if (r != null) assertEquals(a++, r)
}
}
val channelA = source.openSubscription(request)
val channelB = source.openSubscription(request)
loop@ while (true) {
val done: Int = select {
channelA.onReceiveOrNull {
if (it != null) assertEquals(a++, it)
if (it == null) 0 else 1
}
channelB.onReceiveOrNull {
if (it != null) assertEquals(b++, it)
if (it == null) 0 else 2
}
}
when (done) {
0 -> break@loop
1 -> {
val r = channelB.receiveOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
val r = channelA.receiveOrNull()
if (r != null) assertEquals(a++, r)
}
}
}

channelA.cancel()
channelB.cancel()
// should receive one of them fully
assertTrue(a == n || b == n)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package kotlinx.coroutines.experimental.rx1
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.channels.LinkedListChannel
import kotlinx.coroutines.experimental.channels.SubscriptionReceiveChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import rx.Observable
import rx.Subscriber
import rx.Subscription

/**
* Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this observable.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this observable.
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
*/
@JvmOverloads // for binary compatibility
public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> {
public fun <T> Observable<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>(request)
val subscription = subscribe(channel.subscriber)
channel.subscription = subscription
Expand All @@ -43,7 +43,7 @@ public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionRec
*/
@Deprecated(message = "Renamed to `openSubscription`",
replaceWith = ReplaceWith("openSubscription()"))
public fun <T> Observable<T>.open(): SubscriptionReceiveChannel<T> = openSubscription()
public fun <T> Observable<T>.open(): ReceiveChannel<T> = openSubscription()

/**
* Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
Expand All @@ -62,9 +62,9 @@ public operator fun <T> Observable<T>.iterator() = openSubscription().iterator()
* Subscribes to this [Observable] and performs the specified action for each received element.
*/
public suspend inline fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
openSubscription().use { channel ->
for (x in channel) action(x)
}
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
}

/**
Expand All @@ -76,7 +76,7 @@ public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =

private class SubscriptionChannel<T>(
private val request: Int
) : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
) : LinkedListChannel<T>(), ReceiveChannel<T> {
init {
require(request >= 0) { "Invalid request size: $request" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class IntegrationTest(
checkNumbers(n, observable)
val channel = observable.openSubscription()
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
channel.close()
channel.cancel()
}

private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
Expand Down
Loading