Skip to content

kotlinx-coroutines-rx3: Allow specifing CoroutineStart for rxSingle, rxObservable, ... #4351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Gabweb opened this issue Feb 12, 2025 · 11 comments

Comments

@Gabweb
Copy link

Gabweb commented Feb 12, 2025

Description

Coroutines (e.g. launch) offer a parameter to specify the CoroutineStart. This is a request to add the same start parameter to the rx migration libraries (kotlinx-coroutines-rx2 and kotlinx-coroutines-rx3).

Minimal Code Example

Consider the following two code examples - one in pure RxJava and one while migrating from RxJava to Coroutines.

RxJava3

println("Start")
Single.just("Rx").subscribe { value -> println(value)  }
println("End")

// Prints Start, Rx, End 

RxJava3 + Coroutines

println("Start")
rxSingle { "Coroutine" }.subscribe { value -> println(value)  }
println("End")

// Prints Start, End, Coroutine 

While the code looks nearly identical, they behave differently (which itself is fine). However, the current API of e.g. rxSingle does not allow specifying the CoroutineStart parameter. Therefore, it's impossible to achieve the same start / scheduling behavior and makes migration difficult.

Side-note: Using Dispatcher.Confined resolves the timing difference. However, it will also overwrite the Dispatcher and might change the Thread, which is not desired.

The Shape of the API

fun <T : Any> rxSingle(
     context: CoroutineContext = EmptyCoroutineContext,
     start: CoroutineStart = CoroutineStart.DEFAULT,
     block: suspend CoroutineScope.() -> T
): Single<T>

Usage example:

println("Start")
rxSingle(start = CoroutineStart.UNDISPATCHED) { "Coroutine" }.subscribe { value -> println(value)  }
println("End")

// Prints Start, Coroutine, End 

(This should also apply to rxObservable, ...)

Prior Art

Under-the-hood, rxSingle und others are currently hard-coded to CoroutineStart.Default. To enable this feature, the parameter just needs to be exposed publicly.

@dkhalanskyjb
Copy link
Collaborator

I don't understand a reason to replace Single.just(value) with rxSingle { value }. Why start a coroutine at all if the value is immediately available? And if the value is not available, then the proposed solution won't help anyway, as the thread will switch to Dispatchers.Default as soon as there is even a single suspension in the code block, so it's still impossible to achieve the same scheduling behavior.

Could you describe your migration process in more detail? Maybe with some specific code snippets: what's the original snippet, what should it look like after the migration, and what intermediate steps would the new start parameter allow that are impossible today?

@Gabweb
Copy link
Author

Gabweb commented Feb 14, 2025

Thanks for your quick reply. You're right, my example only shows a minimal (but not realistic) example of the issue. I hope that the following explanation offers a better overview. :)

"Why start a coroutine at all if the value is immediately available?"
I'm thinking about a "conditional suspension", e.g. in a network library with caching. If the request is already in the cache, the function can return immediately - otherwise it suspends to perform the network request.
Let's assume that both cases (cache hit & network request) are implemented in single suspend function for convenience. If the request is in the cache (no suspension) and such a Coroutine is launched with start = CoroutineStart.UNDISPATCHED, then the execution will be kept in the current thread.

Actual Use-Case
We service a large Android app code-base, which heavily depends on RxJava. We would like to offer our teams a migration-path towards Coroutines.
For example, an easy first step would be to replace RxJava's Maybe<T> with suspend () -> T?. The migrated Maybe<T> might have been used in a switchMapMaybe. Therefore, I'm developing a convenience library and would like to provide an extension function for RxJava's switchMap to accept Coroutines instead: switchMapAwait(block: suspend CoroutineScope.() -> T?). An trivial implementation of this function is switchMapMaybe( rxMaybe { block() } ), which works quite well.

My concern is, that the user of the library can provide a code block with a "conditional suspension" (as outlined in the beginning). If the provided block returns immediately, the timing and order of operations differ to the same code written in pure RxJava. (That's what the original code tries to demonstrate). We're concerned that his changed behavior (which isn't obvious to the developers) will cause issues.
Having the option to specify the CoroutineStart in the rxMaybe would allow to implement an extension operator, which has the same timing behavior as RxJava's original switchMap-operator (speaking of the case with no suspension).

As example, something like switchMapMaybe { Maybe.just(true) } is synchronous in RxJava, but using the switchMapAwait example from above with switchMapAwait { true } that's always "asynchronous"/suspending.
(Note: I'm just trying to highlight the case, in which no suspension is needed or in which the Maybe would emit immediately. In real code, instead of true the user would provide a code block with a conditional suspension. And in case of RxJava, instead of Maybe.just(true) some other Maybe which might or might not emit immediately is used.)

Difference in parameters
Besides my use-case, I was quiet surprised that e.g. launch/async and rxMaybe differ in the start parameter, but apart from that have an almost identical method signature. From the outside-perspective (and without knowing any design decisions), this feels inconsistent.

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job
fun <T> rxMaybe(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Maybe<T & Any>

@dkhalanskyjb
Copy link
Collaborator

If the provided block returns immediately, the timing and order of operations differ to the same code written in pure RxJava.

The problem is, if the block doesn't return immediately, the timing and order of operations will also differ, and UNDISPATCHED won't help with that. The closest you can come to RxJava in kotlinx.coroutines is Dispatchers.Unconfined (which gets you 80% of the way there, though the remaining 20% may be a dealbreaker: #3458), or, to get (if my understanding is correct) 100% of the way there,

/**
 * A dispatcher that executes the computations immediately in the thread that requests it.
 *
 * This dispatcher is similar to [Dispatchers.Unconfined] but does not attempt to avoid stack overflows.
 */
object DirectDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        block.run()
    }
}

@Gabweb
Copy link
Author

Gabweb commented Feb 18, 2025

While Dispatchers.Unconfined or DirectDispatcher are great solutions by themselves, they prevent us from using a specific thread. Usually, we run sub-parts of the our Android App in a single-threaded context to prevent any deadlocks or race-conditions.
Using UNDISPATCHED seems to be the only option to execute a coroutine in the current Thread until the first suspension-point and only then "switch" to the specified dispatcher/scheduler/thread.

The problem is, if the block doesn't return immediately, the timing and order of operations will also differ [...]

In case of a block being async, I haven't stumbled across any difference in the order of operations between Coroutines in RxJava and pure RxJava (given that both use the same Dispatcher/Scheduler/Thread).
Therefore, I'm not sure what you mean by that. Could you elaborate?

@dkhalanskyjb
Copy link
Collaborator

they prevent us from using a specific thread

So does CoroutineStart.UNDISPATCHED. Before the first suspension, the code will run in the caller thread, as you've noticed.

Therefore, I'm not sure what you mean by that. Could you elaborate?

Sure. In RxJava, if you execute some part of your code in another scheduler, all code after that will also execute there, even if you apply a scheduler to the whole pipeline: #2485 (comment) Running your rxSingle in a DirectDispatcher faithfully emulates that behavior. If you do need to have a consistent thread, then you can't emulate RxJava without the timing and order of operations differing, that's mutually exclusive.

Here's an option that seems to come close to what you want:

rxSingle(DirectDispatcher) {
  if (valueIsPresent) value
  else withContext(YourDedicatedThread) {
    computeValue().also { writeToCache(it) }
  }
}

This will ensure that computeValue() will diligently return to YourDedicatedThread even if there are some thread switches internally there, while preserving the RxJava-style leaks of execution context outside the critical block.

@dkhalanskyjb
Copy link
Collaborator

@Gabweb, what do you think? Do you believe specifying a coroutine start strategy is somehow valuable anyway?

@Gabweb
Copy link
Author

Gabweb commented Apr 1, 2025

@dkhalanskyjb Sorry for the late reply. We're currently evaluating a modified version of kotlinx-coroutines-rx3 to be able to provide the CoroutineStart.UNDISPATCHED. We agree that this is does not have the exact same behavior as RxJava. However, this seems to come closest to what we would expect from RxJava.

The implementation of our convenient library (with the modified rx3 lib) looks like this:

inline fun <R : Any, T : Any> Observable<T>.switchMapAwait(
    context: CoroutineContext = EmptyCoroutineContext,
    crossinline mapper: suspend (T) -> R?
): Observable<R> = switchMapMaybe {
    rxMaybe(context, CoroutineStart.UNDISPATCHED) { mapper(it) }
}

Usage

suspend fun foo(value: Int): String { ... }

source.switchMapAwait(ourCustomThread) { foo(it) }
   .subscribe { ... }

I would like to collect further feedback internally and come back to you next week.

@dkhalanskyjb
Copy link
Collaborator

this seems to come closest to what we would expect from RxJava

... "while preserving some of the coroutine-style behavior of confining computations to threads", right? Otherwise, DirectDispatcher should provide exactly the behavior of RxJava.

@Gabweb
Copy link
Author

Gabweb commented Apr 1, 2025

... "while preserving some of the coroutine-style behavior of confining computations to threads", right? Otherwise, DirectDispatcher should provide exactly the behavior of RxJava.

Yes, I agree. 👍

Sadly, DirectDispatcher is not sufficient for us as we do want to specify the exact thread.

I was going to make the (wrong) argument, that an alternative to using CoroutineStart.UNDISPATCHED on rxMaybe would be to make use of RxJava observeOn(customThread) + rxMaybe(DirectDispatcher). However, this does not seem to guarantee that the downstream pipe is executed on the customThread. Instead, I can observe that kotlinx.coroutines.DefaultExecutor might leak into the pipe.

Therefore, being able to specify the CoroutineStart on the rxMaybe seems to be the only option, if you do want Direct/Undispatched execution on a specific thread. Do you agree?
We still wonder, why a launch allows specifying CoroutineStart and rxMaybe doesn't. Is there any specific reason?

@dkhalanskyjb
Copy link
Collaborator

We still wonder, why a launch allows specifying CoroutineStart and rxMaybe doesn't. Is there any specific reason?

Yes.

launch is a general-purpose way of starting some asynchronous computation, with a wide variety of applications. With CoroutineStart.ATOMIC, this computation can be a cleanup procedure (for example, launch(Dispatchers.IO, start = CoroutineStart.ATOMIC) { file.close() }), and we can be sure it will run even if cancelled. With CoroutineStart.UNDISPATCHED, we can ensure that some preliminary initialization work can be executed even before the first dispatch (for example, launch(start = CoroutineStart.UNDISPATCHED) { registerSubscriber(); yield(); readElements() }; produceElements()), without having to introduce complex interactions between coroutines.

rxMaybe is not that. It's a way of running some coroutines code in a way that RxJava can consume. Its purpose is clear, and it's not nearly as universally applicable as launch is. I am worried that introducing a CoroutineStart parameter may lead people down the wrong path when using rxMaybe for its intended purpose (for example, they could reach for UNDISPATCHED and believe that they are getting idiomatic RxJava threading behavior, even though they actually aren't). If there are no actual benefits to introducing this parameter, then because of the potential for confusion, it's strictly better not to have it.

Therefore, being able to specify the CoroutineStart on the rxMaybe seems to be the only option, if you do want Direct/Undispatched execution on a specific thread. Do you agree?

Taken literally, "Direct/Undispatched execution on a specific thread" is contradictory. If it's confined to a thread, it will have to go through a dispatch if it's invoked from another thread. If you are talking about undispatched execution until the first dispatch, then the option I described in #4351 (comment) should also work for the use case you've described:

I'm thinking about a "conditional suspension", e.g. in a network library with caching. If the request is already in the cache, the function can return immediately - otherwise it suspends to perform the network request.

@Gabweb
Copy link
Author

Gabweb commented Apr 4, 2025

Thanks for your detailed explanation! 💯
I understand the reasoning behind this decision. Additionally, our use-case seems very specific. Therefore, I'm going to close this issue.

launch is a general-purpose way of starting some asynchronous computation

I guess we could harness launch instead of rxMaybe to built our custom behavior (customThread + Undispatched) without the need to have a modified version of kotlinx-coroutines-rx3.

I'm sharing the code here, in case somebody else stumbles across this thread. If you do not need a specific threading, it's probably best to use @dkhalanskyjb's suggested DirectDispatcher to solve this issue. Compare with #4351 (comment)

fun <T> rxMaybeUndispatched(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Maybe<T & Any> {
    require(context[Job] === null) { "Error: context cannot contain a Job!" }
    return Maybe.create { emitter ->
        val job = CoroutineScope(context).launch(start = CoroutineStart.UNDISPATCHED) {
            try {
                val result = block()
                if (result == null) {
                    emitter.onComplete()
                } else {
                    emitter.onSuccess(result)
                }
            } catch (cancellation: CancellationException) {
                throw cancellation
            } catch (cause: Throwable) {
                // Try to emit the error via RxJava
                if (!emitter.tryOnError(cause)) {
                    RxJavaPlugins.onError(cause)
                }
            }
        }
        emitter.setCancellable { job.cancel() }
    }
}

@Gabweb Gabweb closed this as completed Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants