Skip to content

Commit 243e68c

Browse files
garyrussellartembilan
authored andcommitted
GH-2178: Fix CSA.onPartitionsAssigned (Manual)
Resolves #2178 The wrong collection was used as the source for the `assignments` map, `SeekPosition.BEGINNING` and `.END` entries are removed; use the `definedPartitions` field instead. **cherry-pick to 2.8.x, 2.7.x**
1 parent b27a0d6 commit 243e68c

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2932,7 +2932,7 @@ private void initPartitionsIfNeeded() {
29322932
});
29332933
doInitialSeeks(partitions, beginnings, ends);
29342934
if (this.consumerSeekAwareListener != null) {
2935-
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2935+
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
29362936
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
29372937
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
29382938
this.seekCallback);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2784,7 +2784,21 @@ public void testInitialSeek() throws Exception {
27842784
containerProps.setGroupId("grp");
27852785
containerProps.setAckMode(AckMode.RECORD);
27862786
containerProps.setClientId("clientId");
2787-
containerProps.setMessageListener((MessageListener) r -> { });
2787+
2788+
Map<TopicPartition, Long> assigned = new HashMap<>();
2789+
class Listener extends AbstractConsumerSeekAware implements MessageListener {
2790+
2791+
@Override
2792+
public void onMessage(Object data) {
2793+
}
2794+
2795+
@Override
2796+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
2797+
assigned.putAll(assignments);
2798+
}
2799+
2800+
}
2801+
containerProps.setMessageListener(new Listener());
27882802
containerProps.setMissingTopicsFatal(false);
27892803
KafkaMessageListenerContainer<Integer, String> container =
27902804
new KafkaMessageListenerContainer<>(cf, containerProps);
@@ -2802,6 +2816,7 @@ public void testInitialSeek() throws Exception {
28022816
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
28032817
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
28042818
container.stop();
2819+
assertThat(assigned).hasSize(8);
28052820
}
28062821

28072822
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)