-
Notifications
You must be signed in to change notification settings - Fork 15
Add Consumer Flow Control Strategies #340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Added also a single default flow control strategy, named legacy, which is set by default. This change should be transparent by all API consumers, but a new method to configure flow control strategies has been added.
8af2e54
to
2fbd85f
Compare
Unit test for the more important listener callbacks (handleMessage, handleSubscribe and handleUnsubscribe) are in place. |
src/main/java/com/rabbitmq/stream/ConsumerFlowControlStrategy.java
Outdated
Show resolved
Hide resolved
…umerStatisticRecorder and a flow control strategy that uses it
…y using an adapter for the callback interface
Created a max-chunks-by-subscription strategy. What's left:
|
Integrated tests are done and some fixes were applied. Check out I'd like to refine theAPI before writing Unit Tests to avoid having to rewrite them. What do you think about the current implementation? Any suggestions? Meanwhile, after some time I'll probably work on having the object returned by |
I had a look but it's honestly hard to follow. I don't see users dealing with names like Can you dumb down the test you mentioned for me? I'm not sure what it is supposed to test and assert. |
I concur, I'll simplify the naming and create a method in ConsumerBuilder like
It tests that the 100000 messages produced aren't all fetched and consumed before the initial chunks are marked as consumed with the flow control strategy. |
OK, thanks. The (long) naming is one thing, but here are other topics we must think about:
More of a design question. I don't understand the lifecycle of the |
I'll refactor and simplify, divide, and de-boilerplate it as much as possible. The builder approach is flexible but it's too bloated and involves too much generic nesting, I'll try to come up with a decent alternative.
I'll rename the "legacy" approach to a "synchronousDefault" approach, that's the name I was looking for and didn't realize. It's good enough for all use cases where you can consume messages inside the connection thread without blocking it for too long, yes, which represents the majority of the use cases. The benefit comes from when your consumer logic may take a long time to run, and you can build a big backlog of stream messages in the interim. This caused the original issue where we either block the connection, causing errors, or process the data asynchronously but data is fetched infinitely as a result. For additional context... This is happening in batch data ingestion scenarios where the ingestion of a huge amount of data happens at fixed times (such as midnight) and is processed gradually in the following minutes (sometimes an hour).
I'll create some kind of shortcuts in
There's one
I'll add integration tests for reconnection and other edge cases later, but in theory, the current implementation should handle it well since it implements callbacks for those events ( |
Yeah, you're right, there's a bug that inhibits message consumption on reconnect scenarios. Whoops. Will fix it soon, after studying how recovery is implemented on disconnects and reconnects. |
…for it. Also add shortcuts on Consumer to avoid users having to use impl classes for flow control. Also, remove the verbose builder factories by using functional interfaces.
I managed to fix the reconnection handling and remove some BuilderFactory boilerplate. Also created shortcuts on the |
I don't see how it could work: a
We agree on the goal of flow control. The application cannot provide feedback to the library in the current implementation, so that's indeed a problem for some use cases. Here are the parameters of the equation I identified:
The current situation is not ideal, but there are ways to mitigate problems for most use cases. Still, I'd like to better support the use case you mentioned (batch ingestion). What I have in mind is a solution centered around a subscription and its chunks, to avoid maintaining a complex state in some centralized place. I think we can provide a There may be some details to sort out, but I think this solution would work for batch ingestion with slow consumers and still supports what we have currently. Its main advantage is it's really isolated and builds upon the |
Ah, that's a problem. So the lifecycle here should be by I defined the flow control per subscription on the created strategy to avoid complicating the implementation even more. We could have a control per connection, but then we'd have to do the work of a scheduler, basically, to fairly divide credits between subscriptions. But we don't need this kind of flexibility. Taking a look at the code, attaching the lifecycle of each instance of
Yeah, there must be another hook. Maybe using
We give up some flexibility but fair enough, we don't need this much flexibility. Agreed. And the coordinator already handles all of this complexity, we'd keep it all in the same place.
It's not so hard to provide an interface like this as the code currently stands, it's a quick adapter. I can do this before refactoring the rest of the code so that I can refactor the rest of the code around this interface.
So it would preserve the flexibility of asking for credits according to custom logic and in variable quantities, but using the existing Is it ok for me to move the statistical data from
Yep, it would work and keep it simpler. I didn't delve much into |
Your understanding is correct. The I think it'd be enough to experiment with simple strategies with the We could add a I don't have all the implementation details in mind, but we have the information on the chunk of a given subscription rabbitmq-stream-java-client/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java Lines 568 to 579 in 9ee4b70
ProcessedCallback instance or propagate info to create it later.
There are annoying details like when we create a subscription that starts at a given offset, the library filters out the first messages of the chunk until the asked offset. This could make the |
…e per-subscription
…nd use it in one of the integrated tests to mark messages as handled
…iptionTracker, removing subscription and stream name parameters
# Conflicts: # src/main/java/com/rabbitmq/stream/ConsumerBuilder.java # src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java # src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Merged and refactored, pending review and some unit tests. It seems that changing the lifecycle/cardinality of |
Oh yeah, and the strategies you mentioned (the ones that pull in chunks at a certain threshold of message consumption, without waiting for the whole chunk to be consumed). I'll work on them after some feedback. |
src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Outdated
Show resolved
Hide resolved
The PR looks simpler than the first version, I think it's a good thing. And it does not seem less functional. I'm still wondering if we could not remove some complexity in some places. I'm thinking of the statistics recorder class for example. I'm experimenting on my side, I'll share it when I have something working (I started yesterday). I'd like to see if we cannot have most of the logic at the chunk dispatching level, that should make it easier to reason about. |
Here is a rough draft of what I had in mind: #374. The main change is a "chunk context" that is created by the chunk listener and propagated to the client's message listener. This implies changes in There's a simple test that seems to address the main concern: avoiding overwhelming the application message handler. Again, it's a WIP, but the main idea is there. Let me know what you think. |
I've taken a look there. The flow control strategy is responsible for creating the callbacks, so we can avoid having to lazily initialize the The changes to the public API seem the same as the existing approach, namely:
The There are some more nitpicks but it's a rough draft so you probably already know about those. I see that there's one strategy per Your approach ends up being a lot simpler! |
Good catch, I had not thought of this one :-) This is a problem for a counter-based approach indeed.
Yes, that would be one way. I'd like to make this the responsibility of the flow strategy, but it's not easily doable with the current design. Some strategies may be idempotent by nature, so always using an
There's one strategy by consumer, but apart from final configuration settings, I expect strategies to be stateless, so instances could be shared between consumers. The strategy should get whatever it needs from the
Don't hesitate to share your thoughts in the PR. Here are the things I'm planning to work on:
Thanks for the feedback! |
In case the strategies are stateless we can't have idempotent strategies because this requires keeping track of the message state. I don't see an easy way around this, but agree that always using the
If we won't implement balancing credits between subscriptions (and I'm assuming this much is unnecessary and out of scope) we don't need the subscription ID.
All the things I had thought about while reading the PR are already contained in your list ^^
I think either ignored or filtered is fine. This is analogous to what's done inside |
Replying to myself after rereading, I think not, this is because by only being aware of chunks and not messages, we have this shortcoming of information inside the flow strategy. Actually, my implementation regarding We definitely need an automated test for this because this is an edge case. |
@henry701 Thanks for your work on this PR, I'm sorry it did not get in. It helped us understand consumer flow control better in the library though, this was useful to come up with #374. Thanks also for having a look at #374, your comments are useful. I'm planning to merge it soon, but don't hesitate to test it with the use case that needed parallel processing in the first place IIRC. |
Intended to solve issue #333.
Currently a draft PR. What's done so far:
LegacyFlowControlStrategy
which implements the pre-existing original strategy, which hasinitialCredits
andadditionalCredits
as a fixed value, and sends a request foradditionalCredits
right after each chunk delivery.What still needs to be done:
Consumer
.Client
listeners created byConsumersCoordinator
. Those will be inConsumersCoordinatorTests
, naturally.