Skip to content

Do Not Resume Paused Partitions After a Rebalance #2222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
garyrussell opened this issue Apr 7, 2022 · 13 comments · Fixed by #2229 or #2353
Closed

Do Not Resume Paused Partitions After a Rebalance #2222

garyrussell opened this issue Apr 7, 2022 · 13 comments · Fixed by #2229 or #2353

Comments

@garyrussell
Copy link
Contributor

If individual partitions are paused (by @RetryableTopic or by users), they will be resumed by a rebalance.

While this is not really a problem for @RetryableTopic, because the partition will be automatically paused again, but it is a problem for user-paused partitions.

We already handle this case when the whole container is paused, we need something similar for paused partitions.

if (ListenerConsumer.this.consumerPaused) {
ListenerConsumer.this.consumer.pause(partitions);
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
}

@tomazfernandes
Copy link
Contributor

You can assign this to me @garyrussell, I should be able to pick it up sometime this week, in time for 2.8.5.

Thanks!

@garyrussell
Copy link
Contributor Author

Thanks; I believe we also need to remove any pauseRequested entries for any partitions are are not reassigned to us after a rebalance (in case we get them back on a later rebalance). We need to take into account cooperative rebalancing (where there is only a partial revoke, but we might get some of those back again). I think we need to keep track of which partitions were revoked then, in onPartitionsAssigned, re-pause any newly assigned that are in pauseRequested and remove any revoked and not re-assigned partitions from the pauseRequested.

@tomazfernandes
Copy link
Contributor

Ok, sounds a bit more complicated than what I had anticipated 😄 Thanks for the pointers, when I open a PR we can discuss if that looks alright - and please let me know if you think of anything else.

Considering there are some concepts I'm not that familiar with (never heard of cooperative rebalance before, sounds interesting), I'm not sure now if I can deliver it in time for us to properly review and discuss it before the next release. But we've come this far without this, hopefully it won't be a problem if we end up skipping one more version.

Or, sounds like you have a solution figured out already, if you prefer to go ahead and implement it I'm ok with that too.

Thanks!

@garyrussell
Copy link
Contributor Author

OK; I'll take it back; was planning to do it soon anyway.

@tomazfernandes
Copy link
Contributor

Sure, thanks! I'll look into these concepts anyway and look forward for your solution - hopefully I'll be able to have a better understanding of it from the pointers you gave.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Apr 11, 2022
Resolves spring-projects#2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Apr 11, 2022
Resolves spring-projects#2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
artembilan pushed a commit that referenced this issue Apr 12, 2022
Resolves #2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
garyrussell added a commit that referenced this issue Apr 12, 2022
Resolves #2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
garyrussell added a commit that referenced this issue Apr 12, 2022
Resolves #2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
garyrussell added a commit that referenced this issue Apr 12, 2022
Resolves #2222

Re-pause any paused partitions that are re-assigned after a rebalance,
Remove any partitions that are revoked and not re-assigned from the
paused partitions collections (pause requests and actually paused).
garyrussell added a commit that referenced this issue Apr 14, 2022
garyrussell added a commit that referenced this issue Apr 14, 2022
@stephprat
Copy link

Hi, I am using 2.8.5 or 2.9.0-RC1 and still having a similar issue in which the KafkaMessageListenerContainers does not have the value of the ConcurrentMessageListenerContainer after a rebalance.
image

@garyrussell
Copy link
Contributor Author

Hmmm - that's definitely wrong, but I believe it is benign - the field at the concurrent container level is never used, isPartitionPauseRequested() is only ever called within the KMLC.

The field should not even exist in the parent container.

@stephprat
Copy link

But I am still receiving messages on topic partition "tenant2" although a pause has been requested for that partition.
The KMLC.pauseRequestedPartitions is wrong for my case, the CMLC.pauseRequestedPartitions is right.

@garyrussell
Copy link
Contributor Author

garyrussell commented Jul 13, 2022

Oh - right - I can see that could happen if there are two rebalances.

When using Non-blocking retries, the framework will re-pause that partition if the time is not yet satisfied.

When manually pausing individual partitions, that won't happen.

To satisfy both use cases, we must not remove the partitions from the pause requested list when a rebalance occurs.

@garyrussell garyrussell reopened this Jul 13, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 13, 2022
Resolves spring-projects#2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also resolves spring-projects#2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 13, 2022
Resolves spring-projects#2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also resolves spring-projects#2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 13, 2022
Resolves spring-projects#2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also fixes some synchronization around `CMLC.containers`.

Also resolves spring-projects#2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
@stephprat
Copy link

Hello @garyrussell, thanks, in the meantime, can I loop over all KMLCs and call .pausePartition() with the right list of partitions to pause?

@garyrussell
Copy link
Contributor Author

Yes, if you add a ConsumerRebalanceListener, it will be called by the internal rebalance listener after it has re-paused (and incorrectly removed the revoked partitions from the pauseRequestedPartitions), so you can re-request the pauses and, on the next rebalance, when the revoked partition is re-assigned, they will be paused again.

@garyrussell
Copy link
Contributor Author

It's probably best to do it via the ConcurrentMLC because it filters the requests to find the right KMLC for the partition.

@stephprat
Copy link

I am using ConsumerAwareRebalanceListener.onPartitionsAssigned() which provides directly the consumer and its partitions.

artembilan pushed a commit that referenced this issue Jul 13, 2022
Resolves #2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also fixes some synchronization around `CMLC.containers`.

Also resolves #2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Jul 13, 2022
Resolves #2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also fixes some synchronization around `CMLC.containers`.

Also resolves #2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Jul 13, 2022
Resolves #2350

Do not maintain the `pausePartitionsRequested` field in the concurrent MLC,
it is not used and can cause confusion.

Also fixes some synchronization around `CMLC.containers`.

Also resolves #2222

The previous fix removed revoked partitions from `pausePartitionsRequested`;
this was incorrect - consider a rebalance where we lose `topic-0` and another
rebalance where we are re-assigned `topic-0`. According to the contract, this
partition should remain paused.

**cherry-pick to 2.9.x, 2.8.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment