You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm experimenting with hot multicasting Flows and have a couple of Flow-API related questions:
When I implement the Flow interface, I have to use @InternalCoroutinesApi on the collect method. (I cannot use flow { } because the class is hot and has shared state and coordination between its consumers.) What does this annotation imply? One shouldn't directly implement Flow or that the name/signature may change but the concept itself stays?
I haven't found any indication how a custom collect/FlowConsumer should indicate to its producer to stop producing items. I've looked at take which uses a private abort exception and saw other "hacks" elsewhere in the library. Is there a way to do it or just let it crash?
I wanted to test the implementation and since both the emission and consumption is suspending, they can't run on the same thread. I though I just use launch(Dispatchers.IO) { source.collect { } }, however, this started creating a lot of worker threads and caused 100% CPU usage. I had to create a single-threaded ExecutorService to keep things nice. Is this some kind of excess CPU usage the property of the IO dispatcher, suspendCoroutine or an artifact of my implementation regarding coordinated resumption?
The text was updated successfully, but these errors were encountered:
Annotation implies that one shouldn't directly implement Flow. As KDoc to this method suggests, the only way to implement flow is to extend AbstractFlow. Flow has context preservation contract, but nothing prevents library users to ignore it in their implementation, so we've chosen to prohibit it. Sufficiently advanced users can ignore this annotation if they know what they are doing, but for most of the usages AbstractFlow should be enough and will prevent users from direct implementation of Flow.
We care about this contract a lot because it has two very promising properties: it catches most of the custom operators concurrency errors semi-deterministically (and it's much easier to make an error here because coroutines are cheap to launch!) and it simplifies reasoning about the context of the execution
Throwing CancellationException is the way. It's not exactly Erlang-ish "let is crash", but rather a standard mechanism of cancellation in kotlinx.coroutines and all its primitives. CE is ignored on all levels of coroutines machinery.
I'm experimenting with hot multicasting Flows and have a couple of Flow-API related questions:
When I implement the
Flow
interface, I have to use@InternalCoroutinesApi
on thecollect
method. (I cannot useflow { }
because the class is hot and has shared state and coordination between its consumers.) What does this annotation imply? One shouldn't directly implementFlow
or that the name/signature may change but the concept itself stays?I haven't found any indication how a custom
collect
/FlowConsumer
should indicate to its producer to stop producing items. I've looked attake
which uses a private abort exception and saw other "hacks" elsewhere in the library. Is there a way to do it or just let it crash?I wanted to test the implementation and since both the emission and consumption is suspending, they can't run on the same thread. I though I just use
launch(Dispatchers.IO) { source.collect { } }
, however, this started creating a lot of worker threads and caused 100% CPU usage. I had to create a single-threaded ExecutorService to keep things nice. Is this some kind of excess CPU usage the property of the IO dispatcher,suspendCoroutine
or an artifact of my implementation regarding coordinated resumption?The text was updated successfully, but these errors were encountered: