Skip to content

Commit 25cd53e

Browse files
committed
spring-projectsGH-3078: Compute Seek Position from Current Offset
Fixes: spring-projects#3078 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset based on the current offset. This is accomplished by a user-defined function where the user can make decision on the offset to seek to based on the current offset which is available via the function's input. * Adding tests, docs.
1 parent 265e55f commit 25cd53e

File tree

5 files changed

+223
-1
lines changed

5 files changed

+223
-1
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ The callback has the following methods:
3434
----
3535
void seek(String topic, int partition, long offset);
3636
37+
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
38+
3739
void seekToBeginning(String topic, int partition);
3840
3941
void seekToBeginning(Collection<TopicPartitions> partitions);
@@ -49,6 +51,11 @@ void seekToTimestamp(String topic, int partition, long timestamp);
4951
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
5052
----
5153

54+
The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
55+
The method that takes a `Function` as an argument to compute the offset was added in version `3.2` of the framework.
56+
This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched).
57+
The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.
58+
5259
`seekRelative` was added in version 2.3, to perform relative seeks.
5360

5461
* `offset` negative and `toCurrent` `false` - seek relative to the end of the partition.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,9 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc
5151
=== Change @RetryableTopic SameIntervalTopicReuseStrategy default value
5252
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
5353
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].
54+
55+
[[x32-seek-offset-compute-fn]]
56+
=== New API method to seek to an offset based on a user provided function
57+
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
58+
See xref:kafka/seek.adoc#seek/[Seek API Docs] for more details.
59+

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collection;
2020
import java.util.Map;
21+
import java.util.function.Function;
2122

2223
import org.apache.kafka.common.TopicPartition;
2324

@@ -27,6 +28,7 @@
2728
* seek operation.
2829
*
2930
* @author Gary Russell
31+
* @author Soby Chacko
3032
* @since 1.1
3133
*
3234
*/
@@ -105,6 +107,23 @@ interface ConsumerSeekCallback {
105107
*/
106108
void seek(String topic, int partition, long offset);
107109

110+
/**
111+
* Perform a seek operation based on the given function to compute the offset to seek to.
112+
* The function provides the user with access to the current offset in the consumer which
113+
* is the current position, i.e, the next offset to be fetched.
114+
* When called from {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)}
115+
* or from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)}
116+
* perform the seek immediately on the consumer. When called from elsewhere,
117+
* queue the seek operation to the consumer. The queued seek will occur after any
118+
* pending offset commits. The consumer must be currently assigned the specified
119+
* partition.
120+
* @param topic the topic.
121+
* @param partition the partition.
122+
* @param offsetComputeFunction function to compute the absolute offset to seek to.
123+
* @since 3.2.0
124+
*/
125+
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
126+
108127
/**
109128
* Perform a seek to beginning operation. When called from
110129
* {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} or

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3323,6 +3323,12 @@ public void seek(String topic, int partition, long offset) {
33233323
this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
33243324
}
33253325

3326+
@Override
3327+
public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
3328+
this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction.apply(
3329+
this.consumer.position(new TopicPartition(topic, partition)))));
3330+
}
3331+
33263332
@Override
33273333
public void seekToBeginning(String topic, int partition) {
33283334
this.seeks.add(new TopicPartitionOffset(topic, partition, SeekPosition.BEGINNING));
@@ -3767,6 +3773,13 @@ public void seek(String topic, int partition, long offset) {
37673773
ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset);
37683774
}
37693775

3776+
@Override
3777+
public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
3778+
ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition),
3779+
offsetComputeFunction.apply(
3780+
ListenerConsumer.this.consumer.position(new TopicPartition(topic, partition))));
3781+
}
3782+
37703783
@Override
37713784
public void seekToBeginning(String topic, int partition) {
37723785
ListenerConsumer.this.consumer.seekToBeginning(

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,106 @@ void testSyncRelativeSeeks() throws InterruptedException {
291291
container.stop();
292292
}
293293

294+
@SuppressWarnings({ "rawtypes", "unchecked" })
295+
@Test
296+
void seekOffsetFromComputeFnOnInitAssignmentAndIdleContainer() throws InterruptedException {
297+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
298+
final Consumer consumer = mock(Consumer.class);
299+
TestMessageListener3 listener = new TestMessageListener3();
300+
ConsumerRecords empty = new ConsumerRecords<>(Collections.emptyMap());
301+
willAnswer(invocation -> {
302+
Thread.sleep(10);
303+
return empty;
304+
}).given(consumer).poll(any());
305+
TopicPartition tp0 = new TopicPartition("test-topic", 0);
306+
TopicPartition tp1 = new TopicPartition("test-topic", 1);
307+
TopicPartition tp2 = new TopicPartition("test-topic", 2);
308+
TopicPartition tp3 = new TopicPartition("test-topic", 3);
309+
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
310+
willAnswer(invocation -> {
311+
((ConsumerRebalanceListener) invocation.getArgument(1))
312+
.onPartitionsAssigned(assignments);
313+
return null;
314+
}).given(consumer).subscribe(any(Collection.class), any());
315+
given(consumer.position(any())).willReturn(30L); // current offset position is always 30
316+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
317+
.willReturn(consumer);
318+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
319+
containerProperties.setGroupId("grp");
320+
containerProperties.setMessageListener(listener);
321+
containerProperties.setIdleEventInterval(10L);
322+
containerProperties.setMissingTopicsFatal(false);
323+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
324+
containerProperties);
325+
container.start();
326+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
327+
verify(consumer).seek(tp0, 20L);
328+
verify(consumer).seek(tp1, 21L);
329+
verify(consumer).seek(tp2, 22L);
330+
verify(consumer).seek(tp3, 23L);
331+
332+
verify(consumer).seek(tp0, 30L);
333+
verify(consumer).seek(tp1, 30L);
334+
verify(consumer).seek(tp2, 30L);
335+
verify(consumer).seek(tp3, 30L);
336+
container.stop();
337+
}
338+
339+
@SuppressWarnings({ "rawtypes", "unchecked" })
340+
@Test
341+
void seekOffsetFromComputeFnFromActiveListener() throws InterruptedException {
342+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
343+
final Consumer consumer = mock(Consumer.class);
344+
TestMessageListener4 listener = new TestMessageListener4();
345+
CountDownLatch latch = new CountDownLatch(2);
346+
TopicPartition tp0 = new TopicPartition("test-topic", 0);
347+
TopicPartition tp1 = new TopicPartition("test-topic", 1);
348+
TopicPartition tp2 = new TopicPartition("test-topic", 2);
349+
TopicPartition tp3 = new TopicPartition("test-topic", 3);
350+
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
351+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
352+
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("test-topic", 0, 0, null, "test-data")));
353+
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("test-topic", 1, 0, null, "test-data")));
354+
recordMap.put(tp2, Collections.singletonList(new ConsumerRecord("test-topic", 2, 0, null, "test-data")));
355+
recordMap.put(tp3, Collections.singletonList(new ConsumerRecord("test-topic", 3, 0, null, "test-data")));
356+
ConsumerRecords records = new ConsumerRecords<>(recordMap);
357+
willAnswer(invocation -> {
358+
Thread.sleep(10);
359+
if (listener.latch.getCount() <= 0) {
360+
latch.countDown();
361+
}
362+
return records;
363+
}).given(consumer).poll(any());
364+
willAnswer(invocation -> {
365+
((ConsumerRebalanceListener) invocation.getArgument(1))
366+
.onPartitionsAssigned(assignments);
367+
return null;
368+
}).given(consumer).subscribe(any(Collection.class), any());
369+
given(consumer.position(tp0)).willReturn(30L); // current offset 30, target 20 (see hard-coded in onMessage)
370+
given(consumer.position(tp1)).willReturn(10L); // current 10, target 21
371+
given(consumer.position(tp2)).willReturn(22L); // current 22, target 22
372+
given(consumer.position(tp3)).willReturn(22L); // current 22, target 23
373+
given(consumer.beginningOffsets(any())).willReturn(assignments.stream()
374+
.collect(Collectors.toMap(tp -> tp, tp -> 0L)));
375+
given(consumer.endOffsets(any())).willReturn(assignments.stream()
376+
.collect(Collectors.toMap(tp -> tp, tp -> 100L)));
377+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
378+
.willReturn(consumer);
379+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
380+
containerProperties.setGroupId("grp");
381+
containerProperties.setMessageListener(listener);
382+
containerProperties.setMissingTopicsFatal(false);
383+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
384+
containerProperties);
385+
container.start();
386+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
387+
verify(consumer).seek(tp0, 20L);
388+
verify(consumer).seek(tp1, 10L);
389+
verify(consumer).seek(tp2, 22L);
390+
verify(consumer).seek(tp3, 22L);
391+
container.stop();
392+
}
393+
294394
@SuppressWarnings({ "rawtypes", "unchecked" })
295395
@Test
296396
@DisplayName("Seek from activeListener")
@@ -1282,4 +1382,81 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
12821382

12831383
}
12841384

1385+
public static class TestMessageListener3 implements MessageListener<String, String>, ConsumerSeekAware {
1386+
1387+
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
1388+
1389+
CountDownLatch latch = new CountDownLatch(2);
1390+
1391+
@Override
1392+
public void onMessage(ConsumerRecord<String, String> data) {
1393+
1394+
}
1395+
1396+
@Override
1397+
public void registerSeekCallback(ConsumerSeekCallback callback) {
1398+
callbacks.set(callback);
1399+
}
1400+
1401+
@Override
1402+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
1403+
if (latch.getCount() > 0) {
1404+
int absoluteTarget1 = 20;
1405+
int absoluteTarget2 = 21;
1406+
int absoluteTarget3 = 22;
1407+
int absoluteTarget4 = 23;
1408+
callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current);
1409+
callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current);
1410+
callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current);
1411+
callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current);
1412+
}
1413+
this.latch.countDown();
1414+
}
1415+
1416+
1417+
@Override
1418+
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
1419+
if (latch.getCount() > 0) {
1420+
int absoluteTarget = 31;
1421+
callback.seek("test-topic", 0, current -> current > absoluteTarget ? absoluteTarget : current);
1422+
callback.seek("test-topic", 1, current -> current > absoluteTarget ? absoluteTarget : current);
1423+
callback.seek("test-topic", 2, current -> current > absoluteTarget ? absoluteTarget : current);
1424+
callback.seek("test-topic", 3, current -> current > absoluteTarget ? absoluteTarget : current);
1425+
}
1426+
this.latch.countDown();
1427+
}
1428+
1429+
}
1430+
1431+
public static class TestMessageListener4 implements MessageListener<String, String>, ConsumerSeekAware {
1432+
1433+
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
1434+
1435+
CountDownLatch latch = new CountDownLatch(1);
1436+
1437+
@Override
1438+
public void onMessage(ConsumerRecord<String, String> data) {
1439+
ConsumerSeekCallback callback = callbacks.get();
1440+
if (latch.getCount() > 0) {
1441+
1442+
int absoluteTarget1 = 20;
1443+
int absoluteTarget2 = 21;
1444+
int absoluteTarget3 = 22;
1445+
int absoluteTarget4 = 23;
1446+
1447+
callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current);
1448+
callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current);
1449+
callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current);
1450+
callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current);
1451+
}
1452+
this.latch.countDown();
1453+
}
1454+
1455+
@Override
1456+
public void registerSeekCallback(ConsumerSeekCallback callback) {
1457+
callbacks.set(callback);
1458+
}
1459+
1460+
}
1461+
12851462
}

0 commit comments

Comments
 (0)