Skip to content

Add Consumer Flow Control Strategies #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from

Conversation

henry701
Copy link

Intended to solve issue #333.

Currently a draft PR. What's done so far:

  • Creation of strategy interfaces for flow control, and the appropriate calls into them.
  • Creation of a LegacyFlowControlStrategy which implements the pre-existing original strategy, which has initialCredits and additionalCredits as a fixed value, and sends a request for additionalCredits right after each chunk delivery.
  • Adjustments so that some pre-existing tests also work on Windows.

What still needs to be done:

  • Some default strategies and some helper methods to easily call and set them when building a Consumer.
  • Unit tests to guarantee the interface callbacks are called appropriately by the Client listeners created by ConsumersCoordinator. Those will be in ConsumersCoordinatorTests, naturally.

henry701 added 2 commits May 27, 2023 22:50
Added also a single default flow control strategy, named legacy, which is set by default. This change should be transparent by all API consumers, but a new method to configure flow control strategies has been added.
@henry701 henry701 force-pushed the consumer-flow-control branch from 8af2e54 to 2fbd85f Compare May 28, 2023 01:50
@henry701
Copy link
Author

henry701 commented May 28, 2023

Unit test for the more important listener callbacks (handleMessage, handleSubscribe and handleUnsubscribe) are in place.
Will add configurable strategies with methods to mark messages as completed, and tests for them, later on.

henry701 added 2 commits June 10, 2023 00:28
…umerStatisticRecorder and a flow control strategy that uses it
…y using an adapter for the callback interface
@henry701
Copy link
Author

Created a max-chunks-by-subscription strategy. What's left:

  • Unit Tests for all the new classes
  • Integration Test specific to flow control to test both the legacy strategy and the new strategy
  • Expose the interface on Consumer to allow actual usage of marking messages as processed

@henry701
Copy link
Author

Integrated tests are done and some fixes were applied. Check out StreamConsumerTest#consumeWithAsyncConsumerFlowControl for a usage example. I'm still unhappy with the verbose syntax, but it's a start.

I'd like to refine theAPI before writing Unit Tests to avoid having to rewrite them. What do you think about the current implementation? Any suggestions?

Meanwhile, after some time I'll probably work on having the object returned by flowControlStrategyBuilder#messageHandlingListener() implement method builder() as well so we can fluently go back to the flow control builder, and avoid some of the intermediate variables I had to create on the test.

@acogoluegnes
Copy link
Contributor

I had a look but it's honestly hard to follow. I don't see users dealing with names like MaximumInflightChunksPerSubscriptionConsumerFlowControlStrategyBuilderFactory.Builder (strategy, factory, and builder twice in the same name!). I know from experience with AMQP client libraries that users may misuse publish confirms and even acknowledgment.

Can you dumb down the test you mentioned for me? I'm not sure what it is supposed to test and assert.

@henry701
Copy link
Author

I had a look but it's honestly hard to follow. I don't see users dealing with names like MaximumInflightChunksPerSubscriptionConsumerFlowControlStrategyBuilderFactory.Builder (strategy, factory, and builder twice in the same name!). I know from experience with AMQP client libraries that users may misuse publish confirms and even acknowledgment.

I concur, I'll simplify the naming and create a method in ConsumerBuilder like asyncFlowControlStrategy(int prefetchLevel) that delegates to the implementation class, and document it, along with other improvements (such as not saving the Message itself in the statistics map, which could lead to a nasty memory leak...).

Can you dumb down the test you mentioned for me? I'm not sure what it is supposed to test and assert.

It tests that the 100000 messages produced aren't all fetched and consumed before the initial chunks are marked as consumed with the flow control strategy.
It also tests calling markHandled both from inside the message handler callback and from outside it, to deal with both sync and async scenarios. I had to change ConsumersCoordinator to call the flow control strategy before the message handler to correctly handle this.

@acogoluegnes
Copy link
Contributor

acogoluegnes commented Jun 16, 2023

OK, thanks.

The (long) naming is one thing, but here are other topics we must think about:

  • That's a lot of code we'll have to maintain over the years
  • The code is complex, it's hard to get your head around it and to get the big picture. We know from experience on the broker side that flow control is a complicated topic and a complex approach makes it very difficult to reason about.
  • I don't see the immediate benefits of this approach. The "legacy" approach is simplistic but good enough for many use cases. I had maybe less flexible solutions in mind, but they looked simpler (at least in my head :-) )

More of a design question. I don't understand the lifecycle of the ConsumerFlowControlStrategy instances. I would have thought there's one instance for each consumer instance, is that the case? I see ConsumerStatisticRecorder maintains a lot of state and keeping this kind of state consistent is very difficult when connections come and go and consumers have to recover (a consumer instance sees its connection/client changing in case of connection recovery, so it changes its subscription ID).

@henry701
Copy link
Author

henry701 commented Jun 16, 2023

[...] Other topics we must think about:

  • That's a lot of code we'll have to maintain over the years

  • The code is complex, it's hard to get your head around it and to get the big picture. We know from experience on the broker side that flow control is a complicated topic and a complex approach makes it very difficult to reason about.

I'll refactor and simplify, divide, and de-boilerplate it as much as possible. The builder approach is flexible but it's too bloated and involves too much generic nesting, I'll try to come up with a decent alternative.

  • I don't see the immediate benefits of this approach. The "legacy" approach is simplistic but good enough for many use cases. [...]

I'll rename the "legacy" approach to a "synchronousDefault" approach, that's the name I was looking for and didn't realize. It's good enough for all use cases where you can consume messages inside the connection thread without blocking it for too long, yes, which represents the majority of the use cases.

The benefit comes from when your consumer logic may take a long time to run, and you can build a big backlog of stream messages in the interim. This caused the original issue where we either block the connection, causing errors, or process the data asynchronously but data is fetched infinitely as a result.

For additional context... This is happening in batch data ingestion scenarios where the ingestion of a huge amount of data happens at fixed times (such as midnight) and is processed gradually in the following minutes (sometimes an hour).
Using Streams in this scenario would crash the application because it can't limit itself without proper flow control.

[...] I had maybe less flexible solutions in mind, but they looked simpler (at least in my head :-) )

I'll create some kind of shortcuts in Consumer and rename the method flowControlStrategy to customFlowControlStrategy to simplify the usage, in addition to other refactorings.

More of a design question. I don't understand the lifecycle of the ConsumerFlowControlStrategy instances. I would have thought there's one instance for each consumer instance, is that the case?

There's one ConsumerFlowControlStrategy for each Client instance created by ConsumersCoordinator. Since the inner class that creates each Client is ClientSubscriptionsManager and it's called when adding new consumers, I believe that's the case, yes.

I see ConsumerStatisticRecorder maintains a lot of state and keeping this kind of state consistent is very difficult when connections come and go and consumers have to recover (a consumer instance sees its connection/client changing in case of connection recovery, so it changes its subscription ID).

I'll add integration tests for reconnection and other edge cases later, but in theory, the current implementation should handle it well since it implements callbacks for those events (handleSubscribe and handleUnsubscribe) that update the state accordingly.

@henry701
Copy link
Author

I see ConsumerStatisticRecorder maintains a lot of state and keeping this kind of state consistent is very difficult when connections come and go and consumers have to recover (a consumer instance sees its connection/client changing in case of connection recovery, so it changes its subscription ID).

Yeah, you're right, there's a bug that inhibits message consumption on reconnect scenarios. Whoops. Will fix it soon, after studying how recovery is implemented on disconnects and reconnects.

…for it. Also add shortcuts on Consumer to avoid users having to use impl classes for flow control. Also, remove the verbose builder factories by using functional interfaces.
@henry701
Copy link
Author

I managed to fix the reconnection handling and remove some BuilderFactory boilerplate. Also created shortcuts on the Consumer API and renamed the strategies. Check it out later!

@acogoluegnes
Copy link
Contributor

There's one ConsumerFlowControlStrategy for each Client instance created by ConsumersCoordinator. Since the inner class that creates each Client is ClientSubscriptionsManager and it's called when adding new consumers, I believe that's the case, yes.

I don't see how it could work: a Client (connection) can handle several consumers, each with its own flow control strategy.

I'll add integration tests for reconnection and other edge cases later, but in theory, the current implementation should handle it well since it implements callbacks for those events (handleSubscribe and handleUnsubscribe) that update the state accordingly.

handleUnsubscribe may not be called if the connection goes away, so state depending on the subscription should be updated accordingly.

We agree on the goal of flow control. The application cannot provide feedback to the library in the current implementation, so that's indeed a problem for some use cases. Here are the parameters of the equation I identified:

  • the initial credits for a subscription (currently 1, not enough for some cases, so this is something the application should be able to control)
  • when to provide 1 credit for a given subscription. The current implementation does this as soon as a new chunk arrives. This is indeed simplistic. There could be a flag to tell the library to do it after we are done dispatching the chunk messages. This would throttle the flow, but that does not solve the problem of blocking the dispatching thread for other consumers on the connection in case of long processing time.
  • how to dispatch messages. The current implementation uses a dedicated thread (not the IO thread) for each connection. It is still possible to allocate only 1 consumer for each connection to prevent slow consumers from hogging the connection dispatching thread. It's not optimal but can be enough for many use cases.

The current situation is not ideal, but there are ways to mitigate problems for most use cases. Still, I'd like to better support the use case you mentioned (batch ingestion). What I have in mind is a solution centered around a subscription and its chunks, to avoid maintaining a complex state in some centralized place. I think we can provide a MessageHandler.Context#processed() method. The application would call it when it's done with a message. What the method does depends on the flow control strategy the applications chose: it could maintain a counter and provide a credit once the application processed half of the messages of the chunk (or whatever ratio of the chunk), it could provide a credit once the first message is processed (what we have currently, give or take) or when the last message is processed (so at the end of the chunk), it could provide a credit once we are at some location in the chunk (in the middle, whatever the count of processed messages).

There may be some details to sort out, but I think this solution would work for batch ingestion with slow consumers and still supports what we have currently. Its main advantage is it's really isolated and builds upon the ConsumersCoordinator, which handles the subscription lifecycle. WDYT?

@henry701
Copy link
Author

henry701 commented Jun 20, 2023

I don't see how it could work: a Client (connection) can handle several consumers, each with its own flow control strategy.

Ah, that's a problem. So the lifecycle here should be by Environment instead because it's one ConsumersCoordinator per Environment, right? And the ConsumersCoordinator in turn constructs as many Client instances as needed. Is this it?

I defined the flow control per subscription on the created strategy to avoid complicating the implementation even more. We could have a control per connection, but then we'd have to do the work of a scheduler, basically, to fairly divide credits between subscriptions. But we don't need this kind of flexibility.

Taking a look at the code, attaching the lifecycle of each instance of ConsumerFlowControlStrategy to the SubscriptionTracker would seem to solve this particular issue. Then we can keep using a separate strategy per Consumer instance, even if the Consumer instances share Client instances but avoid clashing subscriptions thanks to the coordinator.

handleUnsubscribe may not be called if the connection goes away, so state depending on the subscription should be updated accordingly.

Yeah, there must be another hook. Maybe using SubscriptionTracker for this per-subscription status (duh) would suffice, because then the state would go away together with the object. I should have thought of this earlier. Thanks!

[...] What I have in mind is a solution centered around a subscription and its chunks, to avoid maintaining a complex state in some centralized place.

We give up some flexibility but fair enough, we don't need this much flexibility. Agreed. And the coordinator already handles all of this complexity, we'd keep it all in the same place.

I think we can provide a MessageHandler.Context#processed() method. The application would call it when it's done with a message.

It's not so hard to provide an interface like this as the code currently stands, it's a quick adapter. I can do this before refactoring the rest of the code so that I can refactor the rest of the code around this interface.

  • It could maintain a counter and provide a credit once the application processed half of the messages of the chunk (or whatever ratio of the chunk)
  • It could provide a credit once the first message is processed (what we have currently, give or take) or when the last message is processed (so at the end of the chunk)
  • It could provide a credit once we are at some location in the chunk (in the middle, whatever the count of processed messages).

So it would preserve the flexibility of asking for credits according to custom logic and in variable quantities, but using the existing ConsumersCoordinator code and its inner classes instead of a separate set of components such as the statistics tracker? Yeah, I can see it working.

Is it ok for me to move the statistical data from ConsumerStatisticRecorder to the SubscriptionTracker to achieve this? It seems like the way forward, but I want to confirm if this approach sounds OK.

There may be some details to sort out, but I think this solution would work for batch ingestion with slow consumers and still supports what we have currently. Its main advantage is it's really isolated and builds upon the ConsumersCoordinator, which handles the subscription lifecycle. WDYT?

Yep, it would work and keep it simpler. I didn't delve much into ConsumersCoordinator but it seems the correct place to implement that. I didn't want to bloat it with more logic, but I can move the data closer to it and simplify the ConsumerFlowControlStrategy in turn, which sounds like a good tradeoff.

@acogoluegnes
Copy link
Contributor

Your understanding is correct. The SubscriptionTracker is indeed the place to use for some state on flow control. I don't think you need all the statistics the ConsumerStatisticRecorder maintains. And we should avoid adding too much logic to the ConsumerCoordinator, we should delegate as much as possible to avoid bloating the class.

I think it'd be enough to experiment with simple strategies with the MessageHandler.Context#processed() solution to see if it's worth pursuing. The library would control the implementation of the processed() method, the job of the application would just be to call it accordingly.

We could add a ProcessedCallback property to the current Context implementation (all the implementation details are hidden from the application). There's an instance of this context for each message of a chunk, but the ProcessedCallback instance could be the same for each message of a given chunk. Imagine a strategy where we want to provide one credit when half of the chunk messages are processed: the ProcessedCallback just maintains a counter and calls credit when the counter reaches the expected count (the call to credit would be made idempotent with an AtomicBoolean). We would have pretty much the same mechanism with a strategy where we want to credit when the message in the middle of the chunk is processed (here it's the location in the chunk that matters, not the count).

I don't have all the implementation details in mind, but we have the information on the chunk of a given subscription

ChunkListener chunkListener =
(client, subscriptionId, offset, messageCount, dataSize) -> {
SubscriptionTracker subscriptionTracker =
subscriptionTrackers.get(subscriptionId & 0xFF);
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
client.credit(subscriptionId, subscriptionTracker.additionalCredits);
} else {
LOGGER.debug(
"Could not find stream subscription {} or subscription closing, not providing credits",
subscriptionId & 0xFF);
}
};
, so we could use the call to create the appropriate ProcessedCallback instance or propagate info to create it later.

There are annoying details like when we create a subscription that starts at a given offset, the library filters out the first messages of the chunk until the asked offset. This could make the ProcessedCallback instance creation a bit awkward, but nothing blocking I think.

henry701 added 6 commits June 21, 2023 20:06
…nd use it in one of the integrated tests to mark messages as handled
…iptionTracker, removing subscription and stream name parameters
# Conflicts:
#	src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
#	src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
#	src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@henry701
Copy link
Author

henry701 commented Jul 1, 2023

Merged and refactored, pending review and some unit tests. It seems that changing the lifecycle/cardinality of ConsumerControlFlowStrategy and associated classes+interfaces to be one per SubscriptionTracker solved the issues.

@henry701
Copy link
Author

henry701 commented Jul 1, 2023

Oh yeah, and the strategies you mentioned (the ones that pull in chunks at a certain threshold of message consumption, without waiting for the whole chunk to be consumed). I'll work on them after some feedback.

@acogoluegnes
Copy link
Contributor

The PR looks simpler than the first version, I think it's a good thing. And it does not seem less functional. I'm still wondering if we could not remove some complexity in some places. I'm thinking of the statistics recorder class for example.

I'm experimenting on my side, I'll share it when I have something working (I started yesterday). I'd like to see if we cannot have most of the logic at the chunk dispatching level, that should make it easier to reason about.

@acogoluegnes
Copy link
Contributor

Here is a rough draft of what I had in mind: #374.

The main change is a "chunk context" that is created by the chunk listener and propagated to the client's message listener. This implies changes in Client but none in the public API.

There's a simple test that seems to address the main concern: avoiding overwhelming the application message handler.

Again, it's a WIP, but the main idea is there. Let me know what you think.

@henry701
Copy link
Author

henry701 commented Jul 5, 2023

Here is a rough draft of what I had in mind: #374.

The main change is a "chunk context" that is created by the chunk listener and propagated to the client's message listener. This implies changes in Client but none in the public API.

There's a simple test that seems to address the main concern: avoiding overwhelming the application message handler.

Again, it's a WIP, but the main idea is there. Let me know what you think.

I've taken a look there. The flow control strategy is responsible for creating the callbacks, so we can avoid having to lazily initialize the Client for credits. Nice.

The changes to the public API seem the same as the existing approach, namely:

  • Expose a way to set the strategy to be used on the Consumer interface
  • Expose a way to mark messages as processed (processed/markHandled in MessageHandler.Context)

The ConsumerStatisticRecorder is avoided entirely. The simple callback-with-counter approach works when the client uses the API correctly but apparently marks the same message as processed twice if you call processed() twice. This could be an issue if it's called twice or more.
Maybe a simple AtomicBoolean wrapper with the appropriate atomic checks when creating the MessageHandlerContext can make the method idempotent without having to maintain a central state like ConsumerStatisticRecorder does.

There are some more nitpicks but it's a rough draft so you probably already know about those.

I see that there's one strategy per Consumer. It acts as both the strategy and builder since the start method returns the callback, so it can handle sub-states using lambdas, and receives the subscription ID from the new ConsumerFlowStrategy.Context, so you don't need to change SubscriptionTracker, is that right?

Your approach ends up being a lot simpler!

@acogoluegnes
Copy link
Contributor

The simple callback-with-counter approach works when the client uses the API correctly but apparently marks the same message as processed twice if you call processed() twice. This could be an issue if it's called twice or more.

Good catch, I had not thought of this one :-) This is a problem for a counter-based approach indeed.

Maybe a simple AtomicBoolean wrapper with the appropriate atomic checks when creating the MessageHandlerContext can make the method idempotent without having to maintain a central state like ConsumerStatisticRecorder does.

Yes, that would be one way. I'd like to make this the responsibility of the flow strategy, but it's not easily doable with the current design. Some strategies may be idempotent by nature, so always using an AtomicBoolean would be a waste (that's the case of the "default" strategy, which provides a credit on start).

I see that there's one strategy per Consumer. It acts as both the strategy and builder since the start method returns the callback, so it can handle sub-states using lambdas, and receives the subscription ID from the new ConsumerFlowStrategy.Context, so you don't need to change SubscriptionTracker, is that right?

There's one strategy by consumer, but apart from final configuration settings, I expect strategies to be stateless, so instances could be shared between consumers. The strategy should get whatever it needs from the ConsumerFlowStrategy.Context, currently it's just the number of messages in the chunk and the credits(int) method, which encapsulates the Client#credits(int) call. There's no need for the subscription ID for now, but it could be added later, even if I'm not sure how a strategy could use it (it's transient, it can change during the life of a consumer instance).

There are some more nitpicks but it's a rough draft so you probably already know about those.

Don't hesitate to share your thoughts in the PR. Here are the things I'm planning to work on:

  • Find a proper place for the strategy implementations, maybe in a dedicated package. They are in the interface definition now because it makes things easier.
  • a LongConsumer callback is OK for the prototype, but it's limited if we need more information. Or we can keep it simple and accept to make the interface evolve in the future.
  • The chunk context is an Object and it's cast later on. That's the key internal change in the PR. The right way would be to use generics, by using the same type parameter for the ChunkListener, the MessageListener, and the Client (!). That's a significant change in the Client API, but it's considered an internal class, so that's OK.
  • There's a new need for a MessageFilteredListener callback in Client (see TODO in the PR). This is required when the offset specification of a subscription is a given offset. Let's say a subscription is created with an offset specification at offset 1000. The chunk that contains the message at offset 1000 can start at offset 900, so the client library "filters out" the first messages of the chunk so that the application sees only messages from 1000, not from 900. For such cases the library would call the new listener, which would in turn call "processed" for the message. Maybe "ignored" is a better term than "filtered" here.

Thanks for the feedback!

@henry701
Copy link
Author

henry701 commented Jul 6, 2023

I'd like to make this the responsibility of the flow strategy, but it's not easily doable with the current design. Some strategies may be idempotent by nature, so always using an AtomicBoolean would be a waste (that's the case of the "default" strategy, which provides a credit on start).

There's one strategy by consumer, but apart from final configuration settings, I expect strategies to be stateless, so instances could be shared between consumers. The strategy should get whatever it needs from the ConsumerFlowStrategy.Context, currently it's just the number of messages in the chunk and the credits(int) method, which encapsulates the Client#credits(int) call.

In case the strategies are stateless we can't have idempotent strategies because this requires keeping track of the message state. I don't see an easy way around this, but agree that always using the AtomicBoolean would be a waste of resources for the default/sync strategy.

There's no need for the subscription ID for now, but it could be added later, even if I'm not sure how a strategy could use it (it's transient, it can change during the life of a consumer instance).

If we won't implement balancing credits between subscriptions (and I'm assuming this much is unnecessary and out of scope) we don't need the subscription ID.

Don't hesitate to share your thoughts in the PR. Here are the things I'm planning to work on:

All the things I had thought about while reading the PR are already contained in your list ^^

  • There's a new need for a MessageFilteredListener callback in Client (see TODO in the PR). This is required when the offset specification of a subscription is a given offset. Let's say a subscription is created with an offset specification at offset 1000. The chunk that contains the message at offset 1000 can start at offset 900, so the client library "filters out" the first messages of the chunk so that the application sees only messages from 1000, not from 900. For such cases the library would call the new listener, which would in turn call "processed" for the message. Maybe "ignored" is a better term than "filtered" here.

I think either ignored or filtered is fine. This is analogous to what's done inside ConsumerStatisticRecorder according to the stored OffsetSpecification of the subscription, correct? Yeah, this is needed to avoid cases where we stop asking for credits because there are messages that are perpetually marked as unprocessed, or to avoid asking for more credits than what was configured when marking those "duplicate" messages as handled.

@henry701
Copy link
Author

henry701 commented Jul 6, 2023

This is analogous to what's done inside ConsumerStatisticRecorder according to the stored OffsetSpecification of the subscription, correct?

Replying to myself after rereading, I think not, this is because by only being aware of chunks and not messages, we have this shortcoming of information inside the flow strategy.

Actually, my implementation regarding ChunkStatistics also misses this. If messages are filtered away, the counter will never equal the message count of the chunk. It's fairly easy to fix if it comes to it, though.

We definitely need an automated test for this because this is an edge case.

@acogoluegnes
Copy link
Contributor

@henry701 Thanks for your work on this PR, I'm sorry it did not get in. It helped us understand consumer flow control better in the library though, this was useful to come up with #374.

Thanks also for having a look at #374, your comments are useful. I'm planning to merge it soon, but don't hesitate to test it with the use case that needed parallel processing in the first place IIRC.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants