-
Notifications
You must be signed in to change notification settings - Fork 15
Add consumer flow control API #333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Also, I'm available as a contributor to implement this feature, if it would be an option. Let me know. |
Thanks for the analysis. We indeed need a way to give some sort of control over the credit mechanism to the client API. I'd like to avoid references to chunks, as it's an implementation detail. I had thought we could add a
WDYT? |
Thanks for the response! I can't entirely agree that chunks are an implementation detail since they are a part of the stream protocol itself, but I see where you're coming from: Having this control via the credit mechanism only available via the Chunk granularity is not a perfect fit abstraction-wise for consumers that only care about messages. I think that with careful API design, we can achieve both goals, those being:
Your idea of those three different strategies can be expressed as a group of strategies that mark each message as processed. Other strategies such as chunk-based ones, or even manual-callback ones, wouldn't be concerned about the individual processing status of each message. Instead of using Other very flexible API ideas that could stem from this approach are:
Those two alone would offer a lot of "automagic" flow control to consumers because you can specify the minimum messages or chunks that would be ideal to have in memory (queued or processing) at each given point in time, oriented towards high throughput. Your thoughts? |
Right, I meant "implementation detail" for the "high-level" API of this client library ( The advantage of using something like Feel free to submit a PR, I'll be happy to have a look and to discuss the design. |
Ah, sorry for the mixup there.
Good point, it would be easier to implement by receiving
I will submit a PR later. Thanks! |
Fixed in #374. |
After changes for rabbitmq/rabbitmq-stream-java-client#333. (cherry picked from commit c594c77) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
After changes for rabbitmq/rabbitmq-stream-java-client#333. (cherry picked from commit c594c77) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml (cherry picked from commit f57c6db)
After changes for rabbitmq/rabbitmq-stream-java-client#333. (cherry picked from commit c594c77) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml (cherry picked from commit f57c6db) (cherry picked from commit 422be98) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml
After changes for rabbitmq/rabbitmq-stream-java-client#333. (cherry picked from commit c594c77) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml (cherry picked from commit f57c6db) (cherry picked from commit 422be98) # Conflicts: # deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml # deps/rabbitmq_stream_management/test/http_SUITE_data/pom.xml (cherry picked from commit 59cf432)
Is your feature request related to a problem? Please describe.
In high-volume scenarios, consuming from the start of a very big Stream (many chunks), consumers that consume messages asynchronously and take very long to consume messages will keep receiving chunks until the application runs out of memory.
Describe the solution you'd like
Expose chunk-level control flow mechanism at the Consumer API level.
This could be implemented by exposing a ChunkListener on the Consumer builder. This listener callback would receive a Context object with information about the received chunk, much like the existing API design today.
The crucial part is that the Context would also have a callback so that the listener could control when to request additional chunks (by internally sending Credit messages using the Client).
Example (pseudocode from the top of my head, will elaborate and fix minor errors later, if applicable):
Of course, this is just a naive implementation draft. Better flow control would have a buffer to ensure high throughput, instead of asking for credits only when the consumer is already idle.
The default chunkListener would correspond to just asking for
additionalCredits
, this way current behavior is preserved in a backward-compatible way.As a bonus, this explicit flow control would make integration with RxJava possible as well, which could be very useful for integrating this library together with Spring, for instance.
Describe alternatives you've considered
An alternative would be to consume messages synchronously from the connection thread, but this blocks some response handling and causes some RPC calls to time out, so it's not ideal.
Additional context
There are similar discussions on issue #262 and https://stackoverflow.com/questions/71932072/add-delay-in-consuming-messages-in-rabbitmq-stream
The text was updated successfully, but these errors were encountered: