Skip to content

Custom Flow.collect method implementation and collector cancellation #1380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
akarnokd opened this issue Jul 25, 2019 · 2 comments
Closed

Custom Flow.collect method implementation and collector cancellation #1380

akarnokd opened this issue Jul 25, 2019 · 2 comments
Labels
docs KDoc and API reference flow question

Comments

@akarnokd
Copy link

I'm experimenting with hot multicasting Flows and have a couple of Flow-API related questions:

  1. When I implement the Flow interface, I have to use @InternalCoroutinesApi on the collect method. (I cannot use flow { } because the class is hot and has shared state and coordination between its consumers.) What does this annotation imply? One shouldn't directly implement Flow or that the name/signature may change but the concept itself stays?

  2. I haven't found any indication how a custom collect/FlowConsumer should indicate to its producer to stop producing items. I've looked at take which uses a private abort exception and saw other "hacks" elsewhere in the library. Is there a way to do it or just let it crash?

  3. I wanted to test the implementation and since both the emission and consumption is suspending, they can't run on the same thread. I though I just use launch(Dispatchers.IO) { source.collect { } }, however, this started creating a lot of worker threads and caused 100% CPU usage. I had to create a single-threaded ExecutorService to keep things nice. Is this some kind of excess CPU usage the property of the IO dispatcher, suspendCoroutine or an artifact of my implementation regarding coordinated resumption?

@qwwdfsad qwwdfsad added the flow label Jul 25, 2019
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jul 25, 2019

Hi,

  1. Annotation implies that one shouldn't directly implement Flow. As KDoc to this method suggests, the only way to implement flow is to extend AbstractFlow.
    Flow has context preservation contract, but nothing prevents library users to ignore it in their implementation, so we've chosen to prohibit it. Sufficiently advanced users can ignore this annotation if they know what they are doing, but for most of the usages AbstractFlow should be enough and will prevent users from direct implementation of Flow.
    We care about this contract a lot because it has two very promising properties: it catches most of the custom operators concurrency errors semi-deterministically (and it's much easier to make an error here because coroutines are cheap to launch!) and it simplifies reasoning about the context of the execution

  2. Throwing CancellationException is the way. It's not exactly Erlang-ish "let is crash", but rather a standard mechanism of cancellation in kotlinx.coroutines and all its primitives. CE is ignored on all levels of coroutines machinery.

  3. Unfortunately, it is a problem in Dispatchers.IO: Android Thread handling issue #1286. I will fix it in 1.3.1

@qwwdfsad qwwdfsad added question docs KDoc and API reference labels Jul 25, 2019
@akarnokd
Copy link
Author

Thanks. The issue can be closed unless you plan to do something in relation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs KDoc and API reference flow question
Projects
None yet
Development

No branches or pull requests

2 participants