Skip to content

Commit 9ea5cec

Browse files
garyrussellartembilan
authored andcommitted
GH-1876: Seek To Timestamp with Manual Assignment
Resolves #1876 Support initial seek to timestamp when manually assigning partitions. **cherry-pick to 2.7.x**
1 parent e4a26dc commit 9ea5cec

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2504,7 +2504,11 @@ else if (position.equals(SeekPosition.TIMESTAMP)) {
25042504
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
25052505
.offsetsForTimes(
25062506
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
2507-
offsetsForTimes.forEach((tp, ot) -> this.consumer.seek(tp, ot.offset()));
2507+
offsetsForTimes.forEach((tp, ot) -> {
2508+
if (ot != null) {
2509+
this.consumer.seek(tp, ot.offset());
2510+
}
2511+
});
25082512
}
25092513
else {
25102514
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
@@ -2568,6 +2572,18 @@ private void initPartitionsIfNeeded() {
25682572
.map(Entry::getKey)
25692573
.collect(Collectors.toSet());
25702574
ends.forEach(partitions::remove);
2575+
Map<TopicPartition, Long> times = partitions.entrySet().stream()
2576+
.filter(e -> SeekPosition.TIMESTAMP.equals(e.getValue().seekPosition))
2577+
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().offset));
2578+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
2579+
offsetsForTimes.forEach((tp, off) -> {
2580+
if (off == null) {
2581+
ends.add(tp);
2582+
}
2583+
else {
2584+
partitions.put(tp, new OffsetMetadata(off.offset(), false, SeekPosition.TIMESTAMP));
2585+
}
2586+
});
25712587
if (beginnings.size() > 0) {
25722588
this.consumer.seekToBeginning(beginnings);
25732589
}
@@ -3119,11 +3135,11 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
31193135

31203136
private static final class OffsetMetadata {
31213137

3122-
private final Long offset;
3138+
final Long offset; // NOSONAR
31233139

3124-
private final boolean relativeToCurrent;
3140+
final boolean relativeToCurrent; // NOSONAR
31253141

3126-
private final SeekPosition seekPosition;
3142+
final SeekPosition seekPosition; // NOSONAR
31273143

31283144
OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
31293145
this.offset = offset;

spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public enum SeekPosition {
6262
END,
6363

6464
/**
65-
* Seek to the time stamp.
65+
* Seek to the time stamp; if no records exist with a timestamp greater than or
66+
* equal to the timestamp seek to the end.
6667
*/
6768
TIMESTAMP
6869

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.kafka.clients.consumer.ConsumerRecords;
7070
import org.apache.kafka.clients.consumer.KafkaConsumer;
7171
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
72+
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
7273
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
7374
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7475
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -137,7 +138,7 @@
137138
KafkaMessageListenerContainerTests.topic17, KafkaMessageListenerContainerTests.topic18,
138139
KafkaMessageListenerContainerTests.topic19, KafkaMessageListenerContainerTests.topic20,
139140
KafkaMessageListenerContainerTests.topic21, KafkaMessageListenerContainerTests.topic22,
140-
KafkaMessageListenerContainerTests.topic23 })
141+
KafkaMessageListenerContainerTests.topic23, KafkaMessageListenerContainerTests.topic24 })
141142
public class KafkaMessageListenerContainerTests {
142143

143144
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -190,6 +191,8 @@ public class KafkaMessageListenerContainerTests {
190191

191192
public static final String topic24 = "testTopic24";
192193

194+
public static final String topic25 = "testTopic24";
195+
193196
private static EmbeddedKafkaBroker embeddedKafka;
194197

195198
@BeforeAll
@@ -2542,15 +2545,20 @@ public void testInitialSeek() throws Exception {
25422545
Thread.sleep(50);
25432546
return emptyRecords;
25442547
});
2545-
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
2548+
2549+
Map<TopicPartition, OffsetAndTimestamp> offsets = new HashMap<>();
2550+
offsets.put(new TopicPartition("foo", 6), new OffsetAndTimestamp(42L, 1234L));
2551+
offsets.put(new TopicPartition("foo", 7), null);
2552+
given(consumer.offsetsForTimes(any())).willReturn(offsets);
2553+
ContainerProperties containerProps = new ContainerProperties(
25462554
new TopicPartitionOffset("foo", 0, SeekPosition.BEGINNING),
25472555
new TopicPartitionOffset("foo", 1, SeekPosition.END),
25482556
new TopicPartitionOffset("foo", 2, 0L),
25492557
new TopicPartitionOffset("foo", 3, Long.MAX_VALUE),
25502558
new TopicPartitionOffset("foo", 4, SeekPosition.BEGINNING),
25512559
new TopicPartitionOffset("foo", 5, SeekPosition.END),
2552-
};
2553-
ContainerProperties containerProps = new ContainerProperties(topicPartition);
2560+
new TopicPartitionOffset("foo", 6, 1234L, SeekPosition.TIMESTAMP),
2561+
new TopicPartitionOffset("foo", 7, 1234L, SeekPosition.TIMESTAMP));
25542562
containerProps.setGroupId("grp");
25552563
containerProps.setAckMode(AckMode.RECORD);
25562564
containerProps.setClientId("clientId");
@@ -2566,9 +2574,11 @@ public void testInitialSeek() throws Exception {
25662574
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))));
25672575
verify(consumer).seekToEnd(captor.capture());
25682576
assertThat(captor.getValue())
2569-
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5))));
2577+
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5),
2578+
new TopicPartition("foo", 7))));
25702579
verify(consumer).seek(new TopicPartition("foo", 2), 0L);
25712580
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
2581+
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
25722582
container.stop();
25732583
}
25742584

0 commit comments

Comments
 (0)