-
Notifications
You must be signed in to change notification settings - Fork 1.9k
PublisherAsFlow ignores the CoroutineContext passed in from flowOn? #1765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I don't think it's possible for the flow dispatcher to be used to affect the thread that an Rx stream is subscribed on, at least not as a reasonable default. If you really need to, you could write an operator that wraps |
I can use rx schedulers to switch threads. But if I get a flow from a API which converted from a Flowable inside, and just want to make it flowOn a specific dispatcher, it won't act as expected. If it is forbidden to call on flowOn on a PublisherAsFlow instance to switch threads, just throw a IllegalStateException to make it clear. |
How would the library detect that? The implementation of That said, this isn't really a problem with the reactive integration – the same thing comes up if you call I see your point though, it just seems like a really hard problem to solve in a general way. I would expect good libraries to allow you to configure the schedulers being used if it's hiding the Rx types from you. A simple implementation of an operator that sets the subscription scheduler based on the context might look like this: @UseExperimental(ExperimentalCoroutinesApi::class)
fun <T : Any> Flowable<T>.asFlowWithScheduler(): Flow<T> = flow {
val flowable = this@asFlowWithScheduler
val upstream = (coroutineContext[ContinuationInterceptor] as? CoroutineDispatcher)
// RxJava analog to Unconfined is to just not specify a scheduler.
?.takeUnless { it == Dispatchers.Unconfined }
?.let { dispatcher -> Schedulers.from(dispatcher.asExecutor()) }
?.let { scheduler -> flowable.subscribeOn(scheduler) }
?: flowable
emitAll(upstream.asFlow())
} |
Rx stuffs are called in the upstream of I just wonder why But it just ignore the context and use private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(publisher, capacity)
...
} When we call |
The solution seems to simply add a parameter
private class PublisherAsFlow<T : Any>(
context: CoroutineContext,
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(context, publisher, capacity)
...
} Maybe I could have a try later on. |
Good catch. The fix would not so trivial, though. It is way more complicated than that. As a work-around, you can use the following trick to convert
|
* When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context. * Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic. Fixes #1765
* When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context. * Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic. Fixes #1765
kotlin version: 1.3.61
kotlinx.coroutines version: 1.3.3
Hi, I found that
PublisherAsFlow
ignores the CoroutineContext when created. It always uses anEmptyCoroutineContext
instead.So if I call
flowOn
on it, nothing happens.The text was updated successfully, but these errors were encountered: