Skip to content

Commit 43ed676

Browse files
garyrussellartembilan
authored andcommitted
GH-2178: Fix CSA.onPartitionsAssigned (Manual)
Resolves #2178 The wrong collection was used as the source for the `assignments` map, `SeekPosition.BEGINNING` and `.END` entries are removed; use the `definedPartitions` field instead. **cherry-pick to 2.8.x, 2.7.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent 51ff88a commit 43ed676

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2668,7 +2668,7 @@ else if (metadata.relativeToCurrent) {
26682668
}
26692669
}
26702670
if (this.consumerSeekAwareListener != null) {
2671-
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2671+
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
26722672
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
26732673
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
26742674
this.seekCallback);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2562,7 +2562,21 @@ public void testInitialSeek() throws Exception {
25622562
containerProps.setGroupId("grp");
25632563
containerProps.setAckMode(AckMode.RECORD);
25642564
containerProps.setClientId("clientId");
2565-
containerProps.setMessageListener((MessageListener) r -> { });
2565+
2566+
Map<TopicPartition, Long> assigned = new HashMap<>();
2567+
class Listener extends AbstractConsumerSeekAware implements MessageListener {
2568+
2569+
@Override
2570+
public void onMessage(Object data) {
2571+
}
2572+
2573+
@Override
2574+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
2575+
assigned.putAll(assignments);
2576+
}
2577+
2578+
}
2579+
containerProps.setMessageListener(new Listener());
25662580
containerProps.setMissingTopicsFatal(false);
25672581
KafkaMessageListenerContainer<Integer, String> container =
25682582
new KafkaMessageListenerContainer<>(cf, containerProps);
@@ -2580,6 +2594,7 @@ public void testInitialSeek() throws Exception {
25802594
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
25812595
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
25822596
container.stop();
2597+
assertThat(assigned).hasSize(8);
25832598
}
25842599

25852600
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)