Skip to content

Commit 02c6383

Browse files
authored
Fix bug in ConsumerSeekAware.seekRelative
Seek relative to the beginning of the partition. **Auto-cherry-pick to `3.1.x`**
1 parent f25b46f commit 02c6383

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3087,36 +3087,36 @@ private void processSeeks() {
30873087
traceSeek(offset);
30883088
try {
30893089
SeekPosition position = offset.getPosition();
3090+
TopicPartition topicPartition = offset.getTopicPartition();
30903091
Long whereTo = offset.getOffset();
30913092
if (position == null) {
30923093
if (offset.isRelativeToCurrent()) {
3093-
whereTo += this.consumer.position(offset.getTopicPartition());
3094+
whereTo += this.consumer.position(topicPartition);
30943095
whereTo = Math.max(whereTo, 0);
30953096
}
3096-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3097+
this.consumer.seek(topicPartition, whereTo);
30973098
}
3098-
else if (position.equals(SeekPosition.BEGINNING)) {
3099-
this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition()));
3100-
if (whereTo != null) {
3101-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3102-
}
3103-
}
3104-
else if (position.equals(SeekPosition.TIMESTAMP)) {
3099+
else if (SeekPosition.TIMESTAMP.equals(position)) {
31053100
// possible late addition since the grouped processing above
31063101
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
31073102
.offsetsForTimes(
3108-
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
3103+
Collections.singletonMap(topicPartition, offset.getOffset()));
31093104
offsetsForTimes.forEach((tp, ot) -> {
31103105
if (ot != null) {
31113106
this.consumer.seek(tp, ot.offset());
31123107
}
31133108
});
31143109
}
31153110
else {
3116-
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
3111+
if (SeekPosition.BEGINNING.equals(position)) {
3112+
this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
3113+
}
3114+
else {
3115+
this.consumer.seekToEnd(Collections.singletonList(topicPartition));
3116+
}
31173117
if (whereTo != null) {
3118-
whereTo += this.consumer.position(offset.getTopicPartition());
3119-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3118+
whereTo += this.consumer.position(topicPartition);
3119+
this.consumer.seek(topicPartition, whereTo);
31203120
}
31213121
}
31223122
}
@@ -3350,7 +3350,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
33503350
@Override
33513351
public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
33523352
if (toCurrent) {
3353-
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent));
3353+
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true));
33543354
}
33553355
else if (offset >= 0) {
33563356
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING));

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,6 +78,8 @@
7878

7979
/**
8080
* @author Gary Russell
81+
* @author Wang Zhiyang
82+
*
8183
* @since 2.2.4
8284
*
8385
*/
@@ -343,7 +345,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
343345
verify(consumer).seekToEnd(Collections.singletonList(tp2));
344346
verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock)
345347
verify(consumer).seekToBeginning(Collections.singletonList(tp3));
346-
verify(consumer).seek(tp3, 30L);
348+
verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock)
347349
container.stop();
348350
}
349351

0 commit comments

Comments
 (0)