Skip to content

Provide asFlowable and asObservable by their names in binary instead … #2285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {
public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
}
Expand Down
46 changes: 25 additions & 21 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*

/**
* Converts this job to the hot reactive completable that signals
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
Expand Down Expand Up @@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay

/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
Expand All @@ -65,21 +65,6 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
[email protected]()
}

/**
* Converts a stream of elements received from the channel to the hot reactive observable.
*
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in round-robin way.
*/
@Deprecated(
message = "Deprecated in the favour of Flow",
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
)
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
for (t in this@asObservable)
send(t)
}

/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
Expand Down Expand Up @@ -113,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
Expand Down Expand Up @@ -148,8 +131,29 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))

@Deprecated(
message = "Deprecated in the favour of Flow",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
) // Deprecated since 1.4.0
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
for (t in this@asObservable)
send(t)
}

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
7 changes: 4 additions & 3 deletions reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.junit.Assert
import org.junit.Test
import kotlin.test.*
Expand Down Expand Up @@ -126,7 +127,7 @@ class ConvertTest : TestBase() {
delay(50)
send("K")
}
val observable = c.asObservable(Dispatchers.Unconfined)
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
assertEquals("OK", it)
}
Expand All @@ -140,7 +141,7 @@ class ConvertTest : TestBase() {
delay(50)
throw TestException("K")
}
val observable = c.asObservable(Dispatchers.Unconfined)
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
val single = rxSingle(Dispatchers.Unconfined) {
var result = ""
try {
Expand All @@ -155,4 +156,4 @@ class ConvertTest : TestBase() {
assertEquals("OK", it)
}
}
}
}
5 changes: 3 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
Expand Down Expand Up @@ -92,7 +93,7 @@ class IntegrationTest(
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
checkNumbers(n, observable)
val channel = observable.openSubscription()
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
channel.cancel()
}

Expand Down Expand Up @@ -131,4 +132,4 @@ class IntegrationTest(
assertEquals(n, last)
}

}
}
12 changes: 8 additions & 4 deletions reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ public final class kotlinx/coroutines/rx3/RxCompletableKt {
public final class kotlinx/coroutines/rx3/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
}
Expand Down
21 changes: 15 additions & 6 deletions reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*

/**
* Converts this job to the hot reactive completable that signals
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
Expand Down Expand Up @@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay

/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
Expand Down Expand Up @@ -98,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
Expand Down Expand Up @@ -133,8 +131,19 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)