Skip to content

Commit 448106a

Browse files
authored
Provide asFlowable and asObservable by their names in binary instead … (#2285)
* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users. * Do not provide @jvmoverloads for convenience of Java interop * Deprecate ReceiveChannel.asObservable by the way Fixes #2182
1 parent d4c55ce commit 448106a

File tree

6 files changed

+63
-40
lines changed

6 files changed

+63
-40
lines changed

reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,17 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {
3030
public final class kotlinx/coroutines/rx2/RxConvertKt {
3131
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
3232
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
33+
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
34+
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
3335
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
3436
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
37+
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
38+
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
3539
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
36-
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
37-
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
38-
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
39-
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
40+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
41+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
42+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
43+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
4044
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
4145
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
4246
}

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

+25-21
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import kotlin.coroutines.*
1616

1717
/**
1818
* Converts this job to the hot reactive completable that signals
19-
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
19+
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
2020
*
2121
* Every subscriber gets the signal at the same time.
2222
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
5050

5151
/**
5252
* Converts this deferred value to the hot reactive single that signals either
53-
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
53+
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
5454
*
5555
* Every subscriber gets the same completion value.
5656
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -65,21 +65,6 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
6565
this@asSingle.await()
6666
}
6767

68-
/**
69-
* Converts a stream of elements received from the channel to the hot reactive observable.
70-
*
71-
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
72-
* they'll receive values in round-robin way.
73-
*/
74-
@Deprecated(
75-
message = "Deprecated in the favour of Flow",
76-
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
77-
)
78-
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
79-
for (t in this@asObservable)
80-
send(t)
81-
}
82-
8368
/**
8469
* Transforms given cold [ObservableSource] into cold [Flow].
8570
*
@@ -113,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
11398
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
11499
* is used, so calls are performed from an arbitrary thread.
115100
*/
116-
@JvmOverloads // binary compatibility
117-
@JvmName("from")
118101
@ExperimentalCoroutinesApi
119102
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
120103
/*
@@ -148,8 +131,29 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
148131
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
149132
* is used, so calls are performed from an arbitrary thread.
150133
*/
151-
@JvmOverloads // binary compatibility
152-
@JvmName("from")
153134
@ExperimentalCoroutinesApi
154135
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
155136
Flowable.fromPublisher(asPublisher(context))
137+
138+
@Deprecated(
139+
message = "Deprecated in the favour of Flow",
140+
level = DeprecationLevel.ERROR,
141+
replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
142+
) // Deprecated since 1.4.0
143+
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
144+
for (t in this@asObservable)
145+
send(t)
146+
}
147+
148+
@Suppress("UNUSED") // KT-42513
149+
@JvmOverloads // binary compatibility
150+
@JvmName("from")
151+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
152+
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
153+
asFlowable(context)
154+
155+
@Suppress("UNUSED") // KT-42513
156+
@JvmOverloads // binary compatibility
157+
@JvmName("from")
158+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
159+
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)

reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
910
import org.junit.Assert
1011
import org.junit.Test
1112
import kotlin.test.*
@@ -126,7 +127,7 @@ class ConvertTest : TestBase() {
126127
delay(50)
127128
send("K")
128129
}
129-
val observable = c.asObservable(Dispatchers.Unconfined)
130+
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
130131
checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
131132
assertEquals("OK", it)
132133
}
@@ -140,7 +141,7 @@ class ConvertTest : TestBase() {
140141
delay(50)
141142
throw TestException("K")
142143
}
143-
val observable = c.asObservable(Dispatchers.Unconfined)
144+
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
144145
val single = rxSingle(Dispatchers.Unconfined) {
145146
var result = ""
146147
try {
@@ -155,4 +156,4 @@ class ConvertTest : TestBase() {
155156
assertEquals("OK", it)
156157
}
157158
}
158-
}
159+
}

reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
910
import org.junit.Test
1011
import org.junit.runner.*
1112
import org.junit.runners.*
@@ -92,7 +93,7 @@ class IntegrationTest(
9293
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
9394
checkNumbers(n, observable)
9495
val channel = observable.openSubscription()
95-
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
96+
checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
9697
channel.cancel()
9798
}
9899

@@ -131,4 +132,4 @@ class IntegrationTest(
131132
assertEquals(n, last)
132133
}
133134

134-
}
135+
}

reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ public final class kotlinx/coroutines/rx3/RxCompletableKt {
2626
public final class kotlinx/coroutines/rx3/RxConvertKt {
2727
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
2828
public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
29+
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
30+
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
2931
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
32+
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
33+
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
3034
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
31-
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
32-
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
33-
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
34-
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
35+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
36+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
37+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
38+
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
3539
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
3640
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
3741
}

reactive/kotlinx-coroutines-rx3/src/RxConvert.kt

+15-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import kotlin.coroutines.*
1616

1717
/**
1818
* Converts this job to the hot reactive completable that signals
19-
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
19+
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
2020
*
2121
* Every subscriber gets the signal at the same time.
2222
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
5050

5151
/**
5252
* Converts this deferred value to the hot reactive single that signals either
53-
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
53+
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
5454
*
5555
* Every subscriber gets the same completion value.
5656
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -98,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
9898
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
9999
* is used, so calls are performed from an arbitrary thread.
100100
*/
101-
@JvmOverloads // binary compatibility
102-
@JvmName("from")
103101
@ExperimentalCoroutinesApi
104102
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
105103
/*
@@ -133,8 +131,19 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
133131
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
134132
* is used, so calls are performed from an arbitrary thread.
135133
*/
136-
@JvmOverloads // binary compatibility
137-
@JvmName("from")
138134
@ExperimentalCoroutinesApi
139135
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
140136
Flowable.fromPublisher(asPublisher(context))
137+
138+
@Suppress("UNUSED") // KT-42513
139+
@JvmOverloads // binary compatibility
140+
@JvmName("from")
141+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
142+
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
143+
asFlowable(context)
144+
145+
@Suppress("UNUSED") // KT-42513
146+
@JvmOverloads // binary compatibility
147+
@JvmName("from")
148+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
149+
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)

0 commit comments

Comments
 (0)