-
Notifications
You must be signed in to change notification settings - Fork 1.9k
kotlinx-coroutines-rx3: Allow specifing CoroutineStart for rxSingle, rxObservable, ... #4351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I don't understand a reason to replace Could you describe your migration process in more detail? Maybe with some specific code snippets: what's the original snippet, what should it look like after the migration, and what intermediate steps would the new |
Thanks for your quick reply. You're right, my example only shows a minimal (but not realistic) example of the issue. I hope that the following explanation offers a better overview. :) "Why start a coroutine at all if the value is immediately available?" Actual Use-Case My concern is, that the user of the library can provide a code As example, something like Difference in parameters fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job fun <T> rxMaybe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T & Any> |
The problem is, if the block doesn't return immediately, the timing and order of operations will also differ, and /**
* A dispatcher that executes the computations immediately in the thread that requests it.
*
* This dispatcher is similar to [Dispatchers.Unconfined] but does not attempt to avoid stack overflows.
*/
object DirectDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
block.run()
}
} |
While
In case of a block being async, I haven't stumbled across any difference in the order of operations between Coroutines in RxJava and pure RxJava (given that both use the same Dispatcher/Scheduler/Thread). |
So does
Sure. In RxJava, if you execute some part of your code in another scheduler, all code after that will also execute there, even if you apply a scheduler to the whole pipeline: #2485 (comment) Running your Here's an option that seems to come close to what you want: rxSingle(DirectDispatcher) {
if (valueIsPresent) value
else withContext(YourDedicatedThread) {
computeValue().also { writeToCache(it) }
}
} This will ensure that |
@Gabweb, what do you think? Do you believe specifying a coroutine start strategy is somehow valuable anyway? |
@dkhalanskyjb Sorry for the late reply. We're currently evaluating a modified version of kotlinx-coroutines-rx3 to be able to provide the The implementation of our convenient library (with the modified rx3 lib) looks like this: inline fun <R : Any, T : Any> Observable<T>.switchMapAwait(
context: CoroutineContext = EmptyCoroutineContext,
crossinline mapper: suspend (T) -> R?
): Observable<R> = switchMapMaybe {
rxMaybe(context, CoroutineStart.UNDISPATCHED) { mapper(it) }
} Usage suspend fun foo(value: Int): String { ... }
source.switchMapAwait(ourCustomThread) { foo(it) }
.subscribe { ... } I would like to collect further feedback internally and come back to you next week. |
... "while preserving some of the coroutine-style behavior of confining computations to threads", right? Otherwise, |
Yes, I agree. 👍 Sadly, DirectDispatcher is not sufficient for us as we do want to specify the exact thread. I was going to make the (wrong) argument, that an alternative to using Therefore, being able to specify the |
Yes.
Taken literally, "Direct/Undispatched execution on a specific thread" is contradictory. If it's confined to a thread, it will have to go through a dispatch if it's invoked from another thread. If you are talking about undispatched execution until the first dispatch, then the option I described in #4351 (comment) should also work for the use case you've described:
|
Thanks for your detailed explanation! 💯
I guess we could harness I'm sharing the code here, in case somebody else stumbles across this thread. If you do not need a specific threading, it's probably best to use @dkhalanskyjb's suggested fun <T> rxMaybeUndispatched(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T & Any> {
require(context[Job] === null) { "Error: context cannot contain a Job!" }
return Maybe.create { emitter ->
val job = CoroutineScope(context).launch(start = CoroutineStart.UNDISPATCHED) {
try {
val result = block()
if (result == null) {
emitter.onComplete()
} else {
emitter.onSuccess(result)
}
} catch (cancellation: CancellationException) {
throw cancellation
} catch (cause: Throwable) {
// Try to emit the error via RxJava
if (!emitter.tryOnError(cause)) {
RxJavaPlugins.onError(cause)
}
}
}
emitter.setCancellable { job.cancel() }
}
} |
Description
Coroutines (e.g. launch) offer a parameter to specify the CoroutineStart. This is a request to add the same start parameter to the rx migration libraries (
kotlinx-coroutines-rx2
andkotlinx-coroutines-rx3
).Minimal Code Example
Consider the following two code examples - one in pure RxJava and one while migrating from RxJava to Coroutines.
RxJava3
RxJava3 + Coroutines
While the code looks nearly identical, they behave differently (which itself is fine). However, the current API of e.g. rxSingle does not allow specifying the CoroutineStart parameter. Therefore, it's impossible to achieve the same start / scheduling behavior and makes migration difficult.
Side-note: Using Dispatcher.Confined resolves the timing difference. However, it will also overwrite the Dispatcher and might change the Thread, which is not desired.
The Shape of the API
Usage example:
(This should also apply to rxObservable, ...)
Prior Art
Under-the-hood, rxSingle und others are currently hard-coded to CoroutineStart.Default. To enable this feature, the parameter just needs to be exposed publicly.
The text was updated successfully, but these errors were encountered: