Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

Commit ed94b1a

Browse files
garyrussellartembilan
authored andcommitted
KMSource - fix RebalanceListener (incremental)
Support cooperative rebalancing - incremental assignment/revocation - re-pause if a rebalance occurs while paused **cheerry-pick to 3.2.x**
1 parent bf6382b commit ed94b1a

File tree

2 files changed

+66
-9
lines changed

2 files changed

+66
-9
lines changed

src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.Iterator;
26+
import java.util.LinkedHashSet;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Set;
@@ -120,6 +121,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
120121

121122
private final ConsumerProperties consumerProperties;
122123

124+
private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<>();
125+
123126
private Duration pollTimeout;
124127

125128
private RecordMessageConverter messageConverter = new MessagingMessageConverter();
@@ -136,8 +139,6 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
136139

137140
private volatile Consumer<K, V> consumer;
138141

139-
private volatile Collection<TopicPartition> assignedPartitions = new ArrayList<>();
140-
141142
private volatile boolean pausing;
142143

143144
private volatile boolean paused;
@@ -233,6 +234,24 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
233234
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
234235
}
235236

237+
/**
238+
* Return the currently assigned partitions.
239+
* @return the partitions.
240+
* @since 3.2.2
241+
*/
242+
public Collection<TopicPartition> getAssignedPartitions() {
243+
return Collections.unmodifiableCollection(this.assignedPartitions);
244+
}
245+
246+
/**
247+
* Return true if the source is currently paused.
248+
* @return true if paused.
249+
* @since 3.2.2
250+
*/
251+
public boolean isPaused() {
252+
return this.paused;
253+
}
254+
236255
@Override
237256
protected void onInit() {
238257
if (!StringUtils.hasText(this.consumerProperties.getClientId())) {
@@ -478,7 +497,7 @@ protected void createConsumer() {
478497

479498
@Override
480499
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
481-
KafkaMessageSource.this.assignedPartitions.clear();
500+
KafkaMessageSource.this.assignedPartitions.removeAll(partitions);
482501
if (KafkaMessageSource.this.logger.isInfoEnabled()) {
483502
KafkaMessageSource.this.logger
484503
.info("Partitions revoked: " + partitions);
@@ -495,9 +514,27 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
495514

496515
}
497516

517+
@Override
518+
public void onPartitionsLost(Collection<TopicPartition> partitions) {
519+
if (providedRebalanceListener != null) {
520+
if (isConsumerAware) {
521+
((ConsumerAwareRebalanceListener) providedRebalanceListener).onPartitionsLost(partitions);
522+
}
523+
else {
524+
providedRebalanceListener.onPartitionsLost(partitions);
525+
}
526+
}
527+
onPartitionsRevoked(partitions);
528+
}
529+
498530
@Override
499531
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
500-
KafkaMessageSource.this.assignedPartitions = new ArrayList<>(partitions);
532+
KafkaMessageSource.this.assignedPartitions.addAll(partitions);
533+
if (KafkaMessageSource.this.paused) {
534+
KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions);
535+
KafkaMessageSource.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
536+
+ "consumer paused again, so the initial poll() will never return any records");
537+
}
501538
if (KafkaMessageSource.this.logger.isInfoEnabled()) {
502539
KafkaMessageSource.this.logger
503540
.info("Partitions assigned: " + partitions);
@@ -524,7 +561,7 @@ else if (this.consumerProperties.getTopicPartitions() != null) {
524561
.map(TopicPartitionOffset::getTopicPartition)
525562
.collect(Collectors.toList());
526563
this.consumer.assign(topicPartitionsToAssign);
527-
this.assignedPartitions = new ArrayList<>(topicPartitionsToAssign);
564+
this.assignedPartitions.addAll(topicPartitionsToAssign);
528565

529566
TopicPartitionOffset[] partitions = this.consumerProperties.getTopicPartitions();
530567

src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535

3636
import java.time.Duration;
3737
import java.time.temporal.ChronoUnit;
38+
import java.util.ArrayList;
3839
import java.util.Arrays;
3940
import java.util.Collection;
4041
import java.util.Collections;
4142
import java.util.HashSet;
4243
import java.util.LinkedHashMap;
44+
import java.util.LinkedHashSet;
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
@@ -157,8 +159,10 @@ public void onPartitionsAssigned(Consumer<?, ?> cons, Collection<TopicPartition>
157159
@Test
158160
void testRebalanceListener() {
159161
Consumer consumer = mock(Consumer.class);
160-
TopicPartition topicPartition = new TopicPartition("foo", 0);
161-
List<TopicPartition> assigned = Collections.singletonList(topicPartition);
162+
TopicPartition topicPartition1 = new TopicPartition("foo", 0);
163+
List<TopicPartition> assigned1 = new ArrayList<>(Collections.singletonList(topicPartition1));
164+
TopicPartition topicPartition2 = new TopicPartition("foo", 1);
165+
List<TopicPartition> assigned2 = new ArrayList<>(Collections.singletonList(topicPartition2));
162166
AtomicReference<ConsumerRebalanceListener> listener = new AtomicReference<>();
163167
willAnswer(i -> {
164168
listener.set(i.getArgument(1));
@@ -190,11 +194,27 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
190194

191195
source.receive();
192196

193-
listener.get().onPartitionsAssigned(assigned);
197+
listener.get().onPartitionsAssigned(assigned1);
194198
assertThat(partitionsAssignedCalled.get()).isTrue();
199+
assertThat(new ArrayList<>(source.getAssignedPartitions())).isEqualTo(assigned1);
200+
listener.get().onPartitionsAssigned(assigned2);
201+
List<TopicPartition> temp = new ArrayList<>(assigned1);
202+
temp.addAll(assigned2);
203+
assertThat(new ArrayList<>(source.getAssignedPartitions())).isEqualTo(temp);
195204

196-
listener.get().onPartitionsRevoked(assigned);
205+
listener.get().onPartitionsRevoked(assigned1);
197206
assertThat(partitionsRevokedCalled.get()).isTrue();
207+
assertThat(new ArrayList<>(source.getAssignedPartitions())).isEqualTo(assigned2);
208+
209+
source.pause();
210+
assertThat(source.isPaused()).isFalse();
211+
InOrder inOrder = inOrder(consumer);
212+
source.receive();
213+
assertThat(source.isPaused()).isTrue();
214+
inOrder.verify(consumer).pause(new LinkedHashSet<>(assigned2));
215+
inOrder.verify(consumer).poll(any());
216+
listener.get().onPartitionsAssigned(assigned1);
217+
inOrder.verify(consumer).pause(new LinkedHashSet<>(temp));
198218
}
199219

200220
@Test

0 commit comments

Comments
 (0)