Skip to content

Commit a1f4e26

Browse files
Wzy19930507spring-builds
authored andcommitted
Fix bug in ConsumerSeekAware.seekRelative
Seek relative to the beginning of the partition. (cherry picked from commit 02c6383)
1 parent 6cc3eee commit a1f4e26

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3090,36 +3090,36 @@ private void processSeeks() {
30903090
traceSeek(offset);
30913091
try {
30923092
SeekPosition position = offset.getPosition();
3093+
TopicPartition topicPartition = offset.getTopicPartition();
30933094
Long whereTo = offset.getOffset();
30943095
if (position == null) {
30953096
if (offset.isRelativeToCurrent()) {
3096-
whereTo += this.consumer.position(offset.getTopicPartition());
3097+
whereTo += this.consumer.position(topicPartition);
30973098
whereTo = Math.max(whereTo, 0);
30983099
}
3099-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3100+
this.consumer.seek(topicPartition, whereTo);
31003101
}
3101-
else if (position.equals(SeekPosition.BEGINNING)) {
3102-
this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition()));
3103-
if (whereTo != null) {
3104-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3105-
}
3106-
}
3107-
else if (position.equals(SeekPosition.TIMESTAMP)) {
3102+
else if (SeekPosition.TIMESTAMP.equals(position)) {
31083103
// possible late addition since the grouped processing above
31093104
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
31103105
.offsetsForTimes(
3111-
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
3106+
Collections.singletonMap(topicPartition, offset.getOffset()));
31123107
offsetsForTimes.forEach((tp, ot) -> {
31133108
if (ot != null) {
31143109
this.consumer.seek(tp, ot.offset());
31153110
}
31163111
});
31173112
}
31183113
else {
3119-
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
3114+
if (SeekPosition.BEGINNING.equals(position)) {
3115+
this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
3116+
}
3117+
else {
3118+
this.consumer.seekToEnd(Collections.singletonList(topicPartition));
3119+
}
31203120
if (whereTo != null) {
3121-
whereTo += this.consumer.position(offset.getTopicPartition());
3122-
this.consumer.seek(offset.getTopicPartition(), whereTo);
3121+
whereTo += this.consumer.position(topicPartition);
3122+
this.consumer.seek(topicPartition, whereTo);
31233123
}
31243124
}
31253125
}
@@ -3353,7 +3353,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
33533353
@Override
33543354
public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
33553355
if (toCurrent) {
3356-
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent));
3356+
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true));
33573357
}
33583358
else if (offset >= 0) {
33593359
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING));

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,6 +78,8 @@
7878

7979
/**
8080
* @author Gary Russell
81+
* @author Wang Zhiyang
82+
*
8183
* @since 2.2.4
8284
*
8385
*/
@@ -343,7 +345,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
343345
verify(consumer).seekToEnd(Collections.singletonList(tp2));
344346
verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock)
345347
verify(consumer).seekToBeginning(Collections.singletonList(tp3));
346-
verify(consumer).seek(tp3, 30L);
348+
verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock)
347349
container.stop();
348350
}
349351

0 commit comments

Comments
 (0)