Skip to content

Commit f5f0983

Browse files
committed
Restored binary compatibility for SubscriptionReceiveChannel removal
1 parent 1d6230a commit f5f0983

File tree

13 files changed

+94
-39
lines changed

13 files changed

+94
-39
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+7-3
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,8 @@ public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChanne
543543
public fun close (Ljava/lang/Throwable;)Z
544544
public final fun getCapacity ()I
545545
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
546-
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
546+
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
547+
public synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
547548
}
548549

549550
public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
@@ -564,11 +565,13 @@ public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/cor
564565
public abstract interface class kotlinx/coroutines/experimental/channels/BroadcastChannel : kotlinx/coroutines/experimental/channels/SendChannel {
565566
public static final field Factory Lkotlinx/coroutines/experimental/channels/BroadcastChannel$Factory;
566567
public abstract fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
567-
public abstract fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
568+
public abstract fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
569+
public abstract synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
568570
}
569571

570572
public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$DefaultImpls {
571573
public static fun open (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
574+
public static synthetic fun openSubscription (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
572575
}
573576

574577
public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$Factory {
@@ -720,7 +723,8 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
720723
public fun isFull ()Z
721724
public fun offer (Ljava/lang/Object;)Z
722725
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
723-
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
726+
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
727+
public synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
724728
public fun send (Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
725729
}
726730

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ public final class kotlinx/coroutines/experimental/reactive/ChannelKt {
1111
public static final synthetic fun consumeEach (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
1212
public static final fun iterator (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/experimental/channels/ChannelIterator;
1313
public static final fun open (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
14-
public static final fun openSubscription (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
15-
public static final fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
14+
public static final synthetic fun openSubscription (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
15+
public static final fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
16+
public static final synthetic fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
17+
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
1618
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
1719
}
1820

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx1.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ public final class kotlinx/coroutines/experimental/rx1/RxChannelKt {
1313
public static final synthetic fun consumeEach (Lrx/Observable;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
1414
public static final fun iterator (Lrx/Observable;)Lkotlinx/coroutines/experimental/channels/ChannelIterator;
1515
public static final fun open (Lrx/Observable;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
16-
public static final fun openSubscription (Lrx/Observable;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
17-
public static final fun openSubscription (Lrx/Observable;I)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
16+
public static final synthetic fun openSubscription (Lrx/Observable;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
17+
public static final fun openSubscription (Lrx/Observable;I)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
18+
public static final synthetic fun openSubscription (Lrx/Observable;I)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
19+
public static synthetic fun openSubscription$default (Lrx/Observable;IILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
1820
public static synthetic fun openSubscription$default (Lrx/Observable;IILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
1921
}
2022

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ public final class kotlinx/coroutines/experimental/rx2/RxChannelKt {
1616
public static final fun iterator (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/experimental/channels/ChannelIterator;
1717
public static final fun open (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
1818
public static final fun open (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
19-
public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
20-
public static final fun openSubscription (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
19+
public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
20+
public static final synthetic fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
21+
public static final fun openSubscription (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
22+
public static final synthetic fun openSubscription (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
2123
}
2224

2325
public final class kotlinx/coroutines/experimental/rx2/RxCompletableKt {

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.common.kt

+2
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ internal expect annotation class JvmMultifileClass()
2727
internal expect annotation class JvmField()
2828

2929
internal expect annotation class Volatile()
30+
31+
internal expect annotation class JsName(val name: String)

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class ArrayBroadcastChannel<E>(
6868
override val isBufferAlwaysFull: Boolean get() = false
6969
override val isBufferFull: Boolean get() = size >= capacity
7070

71+
@Suppress("RETURN_TYPE_MISMATCH_ON_OVERRIDE")
7172
override fun openSubscription(): ReceiveChannel<E> =
7273
Subscriber(this).also {
7374
updateHead(addSub = it)
@@ -195,7 +196,7 @@ class ArrayBroadcastChannel<E>(
195196

196197
private class Subscriber<E>(
197198
private val broadcastChannel: ArrayBroadcastChannel<E>
198-
) : AbstractChannel<E>(), ReceiveChannel<E> {
199+
) : AbstractChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
199200
private val subLock = ReentrantLock()
200201

201202
@Volatile

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kotlinx.coroutines.experimental.*
2020
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2121
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
2222
import kotlinx.coroutines.experimental.internal.Closeable
23+
import kotlinx.coroutines.experimental.internalAnnotations.*
2324

2425
/**
2526
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
@@ -48,14 +49,23 @@ public interface BroadcastChannel<E> : SendChannel<E> {
4849
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
4950
* broadcast channel.
5051
*/
52+
@Suppress("CONFLICTING_OVERLOADS")
5153
public fun openSubscription(): ReceiveChannel<E>
5254

55+
/**
56+
* @suppress **Deprecated**: Return type changed to `ReceiveChannel`, this one left here for binary compatibility.
57+
*/
58+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Return type changed to `ReceiveChannel`, this one left here for binary compatibility")
59+
@Suppress("CONFLICTING_OVERLOADS")
60+
@JsName("openSubscriptionDeprecated")
61+
public fun openSubscription(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
62+
5363
/**
5464
* @suppress **Deprecated**: Renamed to [openSubscription]
5565
*/
5666
@Deprecated(message = "Renamed to `openSubscription`",
5767
replaceWith = ReplaceWith("openSubscription()"))
58-
public fun open(): ReceiveChannel<E> = openSubscription()
68+
public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
5969
}
6070

6171
/**
@@ -80,10 +90,11 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
8090
*
8191
* Note, that invocation of [cancel] also closes subscription.
8292
*/
83-
@Deprecated("Deprecated in favour of `ReceiveChannel`")
93+
@Deprecated("Deprecated in favour of `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
8494
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
8595
/**
8696
* Closes this subscription. This is a synonym for [cancel].
8797
*/
98+
@Deprecated("Use `cancel`", replaceWith = ReplaceWith("cancel()"))
8899
public override fun close() { cancel() }
89100
}

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
113113
override val isClosedForSend: Boolean get() = _state.value is Closed
114114
override val isFull: Boolean get() = false
115115

116-
@Suppress("UNCHECKED_CAST")
116+
@Suppress("UNCHECKED_CAST", "RETURN_TYPE_MISMATCH_ON_OVERRIDE")
117117
override fun openSubscription(): ReceiveChannel<E> {
118118
val subscriber = Subscriber<E>(this)
119119
_state.loop { state ->
@@ -247,7 +247,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
247247

248248
private class Subscriber<E>(
249249
private val broadcastChannel: ConflatedBroadcastChannel<E>
250-
) : ConflatedChannel<E>(), ReceiveChannel<E> {
250+
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
251251
override fun cancel(cause: Throwable?): Boolean =
252252
close(cause).also { closed ->
253253
if (closed) broadcastChannel.closeSubscriber(this)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt

+2
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ internal actual typealias JvmField = kotlin.jvm.JvmField
2727

2828
@Suppress("ACTUAL_WITHOUT_EXPECT")
2929
internal actual typealias Volatile = kotlin.jvm.Volatile
30+
31+
internal actual annotation class JsName(actual val name: String)

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt

+4
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ internal actual annotation class JvmMultifileClass
2525
internal actual annotation class JvmField
2626

2727
internal actual annotation class Volatile
28+
29+
@Suppress("ACTUAL_WITHOUT_EXPECT")
30+
internal actual typealias JsName = kotlin.js.JsName
31+

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt

+14-10
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,36 @@
1616

1717
package kotlinx.coroutines.experimental.reactive
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
21-
import kotlinx.coroutines.experimental.channels.LinkedListChannel
22-
import kotlinx.coroutines.experimental.channels.ReceiveChannel
23-
import org.reactivestreams.Publisher
24-
import org.reactivestreams.Subscriber
25-
import org.reactivestreams.Subscription
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.channels.*
21+
import org.reactivestreams.*
2622

2723
/**
2824
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
2925
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
3026
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
3127
*/
32-
@JvmOverloads // for binary compatibility
28+
@Suppress("CONFLICTING_OVERLOADS")
3329
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
3430
val channel = SubscriptionChannel<T>(request)
3531
subscribe(channel)
3632
return channel
3733
}
3834

35+
/** @suppress **Deprecated**: Left here for binary compatibility */
36+
@JvmOverloads // for binary compatibility
37+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
38+
@Suppress("CONFLICTING_OVERLOADS")
39+
public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> =
40+
openSubscription(request) as SubscriptionReceiveChannel<T>
41+
3942
/**
4043
* @suppress **Deprecated**: Renamed to [openSubscription]
4144
*/
4245
@Deprecated(message = "Renamed to `openSubscription`",
4346
replaceWith = ReplaceWith("openSubscription()"))
44-
public fun <T> Publisher<T>.open(): ReceiveChannel<T> = openSubscription()
47+
public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> =
48+
openSubscription() as SubscriptionReceiveChannel<T>
4549

4650
/**
4751
* Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
@@ -74,7 +78,7 @@ public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =
7478

7579
private class SubscriptionChannel<T>(
7680
private val request: Int
77-
) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T> {
81+
) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T>, SubscriptionReceiveChannel<T> {
7882
init {
7983
require(request >= 0) { "Invalid request size: $request" }
8084
}

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt

+14-10
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,16 @@
1616

1717
package kotlinx.coroutines.experimental.rx1
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
21-
import kotlinx.coroutines.experimental.channels.LinkedListChannel
22-
import kotlinx.coroutines.experimental.channels.ReceiveChannel
23-
import rx.Observable
24-
import rx.Subscriber
25-
import rx.Subscription
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.channels.*
21+
import rx.*
2622

2723
/**
2824
* Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
2925
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this observable.
3026
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
3127
*/
32-
@JvmOverloads // for binary compatibility
28+
@Suppress("CONFLICTING_OVERLOADS")
3329
public fun <T> Observable<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
3430
val channel = SubscriptionChannel<T>(request)
3531
val subscription = subscribe(channel.subscriber)
@@ -38,12 +34,20 @@ public fun <T> Observable<T>.openSubscription(request: Int = 0): ReceiveChannel<
3834
return channel
3935
}
4036

37+
/** @suppress **Deprecated**: Left here for binary compatibility */
38+
@JvmOverloads // for binary compatibility
39+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
40+
@Suppress("CONFLICTING_OVERLOADS")
41+
public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> =
42+
openSubscription(request) as SubscriptionReceiveChannel<T>
43+
4144
/**
4245
* @suppress **Deprecated**: Renamed to [openSubscription]
4346
*/
4447
@Deprecated(message = "Renamed to `openSubscription`",
4548
replaceWith = ReplaceWith("openSubscription()"))
46-
public fun <T> Observable<T>.open(): ReceiveChannel<T> = openSubscription()
49+
public fun <T> Observable<T>.open(): SubscriptionReceiveChannel<T> =
50+
openSubscription() as SubscriptionReceiveChannel<T>
4751

4852
/**
4953
* Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
@@ -76,7 +80,7 @@ public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
7680

7781
private class SubscriptionChannel<T>(
7882
private val request: Int
79-
) : LinkedListChannel<T>(), ReceiveChannel<T> {
83+
) : LinkedListChannel<T>(), ReceiveChannel<T>, SubscriptionReceiveChannel<T> {
8084
init {
8185
require(request >= 0) { "Invalid request size: $request" }
8286
}

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt

+22-5
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,57 @@ package kotlinx.coroutines.experimental.rx2
1818

1919
import io.reactivex.*
2020
import io.reactivex.disposables.Disposable
21-
import kotlinx.coroutines.experimental.channels.LinkedListChannel
22-
import kotlinx.coroutines.experimental.channels.ReceiveChannel
21+
import kotlinx.coroutines.experimental.channels.*
2322

2423
/**
2524
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
2625
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
2726
*/
27+
@Suppress("CONFLICTING_OVERLOADS")
2828
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2929
val channel = SubscriptionChannel<T>()
3030
subscribe(channel)
3131
return channel
3232
}
3333

34+
/** @suppress **Deprecated**: Left here for binary compatibility */
35+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
36+
@Suppress("CONFLICTING_OVERLOADS")
37+
public fun <T> MaybeSource<T>.openSubscription(): SubscriptionReceiveChannel<T> =
38+
openSubscription() as SubscriptionReceiveChannel<T>
39+
3440
/**
3541
* @suppress **Deprecated**: Renamed to [openSubscription]
3642
*/
3743
@Deprecated(message = "Renamed to `openSubscription`",
3844
replaceWith = ReplaceWith("openSubscription()"))
39-
public fun <T> MaybeSource<T>.open(): ReceiveChannel<T> = openSubscription()
45+
public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> =
46+
openSubscription() as SubscriptionReceiveChannel<T>
4047

4148
/**
4249
* Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
4350
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
4451
*/
52+
@Suppress("CONFLICTING_OVERLOADS")
4553
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
4654
val channel = SubscriptionChannel<T>()
4755
subscribe(channel)
4856
return channel
4957
}
5058

59+
/** @suppress **Deprecated**: Left here for binary compatibility */
60+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
61+
@Suppress("CONFLICTING_OVERLOADS")
62+
public fun <T> ObservableSource<T>.openSubscription(): SubscriptionReceiveChannel<T> =
63+
openSubscription() as SubscriptionReceiveChannel<T>
64+
5165
/**
5266
* @suppress **Deprecated**: Renamed to [openSubscription]
5367
*/
5468
@Deprecated(message = "Renamed to `openSubscription`",
5569
replaceWith = ReplaceWith("openSubscription()"))
56-
public fun <T> ObservableSource<T>.open(): ReceiveChannel<T> = openSubscription()
70+
public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> =
71+
openSubscription() as SubscriptionReceiveChannel<T>
5772

5873
/**
5974
* Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
@@ -93,7 +108,9 @@ public inline suspend fun <T> ObservableSource<T>.consumeEach(action: (T) -> Uni
93108
public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) =
94109
consumeEach { action(it) }
95110

96-
private class SubscriptionChannel<T> : LinkedListChannel<T>(), ReceiveChannel<T>, Observer<T>, MaybeObserver<T> {
111+
private class SubscriptionChannel<T> :
112+
LinkedListChannel<T>(), ReceiveChannel<T>, Observer<T>, MaybeObserver<T>, SubscriptionReceiveChannel<T>
113+
{
97114
@Volatile
98115
var subscription: Disposable? = null
99116

0 commit comments

Comments
 (0)