Skip to content

Commit 65105b6

Browse files
authored
GH-3108: Unsafe access to consumer in seek API
Fixes: #3108 * When the seek API that uses the user-provided function to compute the offset to seek, there is a concurrency issue in which the Kafka consumer is used unsafely. See this for details: #3078 (comment) * Fix Checkstyle violation
1 parent 8355162 commit 65105b6

File tree

4 files changed

+74
-4
lines changed

4 files changed

+74
-4
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,9 @@ See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topi
5757
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
5858
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
5959

60+
[[x32-topic-partition-offset-constructor]]
61+
=== New constructor in TopicPartitionOffset that accepts a function to compute the offset to seek to
62+
`TopicPartitionOffset` has a new constructor that takes a user-provided function to compute the offset to seek to.
63+
When this constructor is used, the framework calls the function with the input argument of the current consumer offset position.
64+
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
65+

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3005,11 +3005,15 @@ private void processSeeks() {
30053005
SeekPosition position = offset.getPosition();
30063006
TopicPartition topicPartition = offset.getTopicPartition();
30073007
Long whereTo = offset.getOffset();
3008+
Function<Long, Long> offsetComputeFunction = offset.getOffsetComputeFunction();
30083009
if (position == null) {
30093010
if (offset.isRelativeToCurrent()) {
30103011
whereTo += this.consumer.position(topicPartition);
30113012
whereTo = Math.max(whereTo, 0);
30123013
}
3014+
else if (offsetComputeFunction != null) {
3015+
whereTo = offsetComputeFunction.apply(this.consumer.position(topicPartition));
3016+
}
30133017
this.consumer.seek(topicPartition, whereTo);
30143018
}
30153019
else if (SeekPosition.TIMESTAMP.equals(position)) {
@@ -3262,8 +3266,7 @@ public void seek(String topic, int partition, long offset) {
32623266

32633267
@Override
32643268
public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
3265-
this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction.apply(
3266-
this.consumer.position(new TopicPartition(topic, partition)))));
3269+
this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction));
32673270
}
32683271

32693272
@Override

spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support;
1818

1919
import java.util.Objects;
20+
import java.util.function.Function;
2021

2122
import org.apache.kafka.common.TopicPartition;
2223

@@ -41,6 +42,7 @@
4142
*
4243
* @author Artem Bilan
4344
* @author Gary Russell
45+
* @author Soby Chacko
4446
*
4547
* @since 2.3
4648
*/
@@ -77,6 +79,8 @@ public enum SeekPosition {
7779

7880
private boolean relativeToCurrent;
7981

82+
private Function<Long, Long> offsetComputeFunction;
83+
8084
/**
8185
* Construct an instance with no initial offset management.
8286
* @param topic the topic.
@@ -98,6 +102,19 @@ public TopicPartitionOffset(String topic, int partition, Long offset) {
98102
this(topic, partition, offset, false);
99103
}
100104

105+
/**
106+
* Construct an instance with the provided function to compute the offset.
107+
* @param topic the topic.
108+
* @param partition the partition.
109+
* @param offsetComputeFunction function to compute the offset.
110+
* @since 3.2.0
111+
*/
112+
public TopicPartitionOffset(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
113+
this.topicPartition = new TopicPartition(topic, partition);
114+
this.offsetComputeFunction = offsetComputeFunction;
115+
this.position = null;
116+
}
117+
101118
/**
102119
* Construct an instance with the provided initial offset.
103120
* @param topic the topic.
@@ -198,6 +215,10 @@ public SeekPosition getPosition() {
198215
return this.position;
199216
}
200217

218+
public Function<Long, Long> getOffsetComputeFunction() {
219+
return this.offsetComputeFunction;
220+
}
221+
201222
@Override
202223
public boolean equals(Object o) {
203224
if (this == o) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.concurrent.atomic.AtomicInteger;
5151
import java.util.concurrent.atomic.AtomicReference;
5252
import java.util.stream.Collectors;
53+
import java.util.stream.IntStream;
5354

5455
import org.aopalliance.intercept.MethodInterceptor;
5556
import org.apache.commons.logging.Log;
@@ -189,7 +190,7 @@
189190
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
190191
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
191192
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42",
192-
"annotated43", "annotated43reply"})
193+
"annotated43", "annotated43reply", "seekToComputeFn"})
193194
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
194195
public class EnableKafkaIntegrationTests {
195196

@@ -248,6 +249,9 @@ public class EnableKafkaIntegrationTests {
248249
@Autowired
249250
private SeekToLastOnIdleListener seekOnIdleListener;
250251

252+
@Autowired
253+
private SeekToOffsetFromComputeFunction seekToOffsetFromComputeFunction;
254+
251255
@Autowired
252256
private MeterRegistry meterRegistry;
253257

@@ -1069,6 +1073,13 @@ void classLevelTwoInstancesSameClass() {
10691073
assertThat(this.registry.getListenerContainer("multiTwoTwo")).isNotNull();
10701074
}
10711075

1076+
@Test
1077+
void seekToOffsetComputedFromFunction() throws InterruptedException {
1078+
IntStream.range(0, 10).forEach(i -> template.send("seekToComputeFn", 0, i, "my-data"));
1079+
assertThat(this.seekToOffsetFromComputeFunction.latch1.await(10, TimeUnit.SECONDS)).isTrue();
1080+
assertThat(this.seekToOffsetFromComputeFunction.latch2.await(10, TimeUnit.SECONDS)).isTrue();
1081+
}
1082+
10721083
@Configuration
10731084
@EnableKafka
10741085
@EnableTransactionManagement(proxyTargetClass = true)
@@ -1425,6 +1436,10 @@ public SeekToLastOnIdleListener seekOnIdle() {
14251436
return new SeekToLastOnIdleListener();
14261437
}
14271438

1439+
@Bean
1440+
public SeekToOffsetFromComputeFunction seekToOffsetFromComputeFunction() {
1441+
return new SeekToOffsetFromComputeFunction();
1442+
}
14281443
@Bean
14291444
public IfaceListener<String> ifaceListener() {
14301445
return new IfaceListenerImpl();
@@ -2333,6 +2348,31 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
23332348
}
23342349
}
23352350

2351+
public static class SeekToOffsetFromComputeFunction extends AbstractConsumerSeekAware {
2352+
2353+
CountDownLatch latch1 = new CountDownLatch(10);
2354+
CountDownLatch latch2 = new CountDownLatch(1);
2355+
2356+
@KafkaListener(id = "seekToComputeFn", topics = "seekToComputeFn")
2357+
public void listen(String in) throws InterruptedException {
2358+
if (latch2.getCount() > 0) { // if latch2 is zero, the test condition is met
2359+
if (latch1.getCount() == 0) { // Seek happened on the consumer
2360+
latch2.countDown();
2361+
}
2362+
if (latch1.getCount() > 0) {
2363+
latch1.countDown();
2364+
if (latch1.getCount() == 0) {
2365+
ConsumerSeekCallback seekToComputeFn = getSeekCallbackFor(
2366+
new org.apache.kafka.common.TopicPartition("seekToComputeFn", 0));
2367+
assertThat(seekToComputeFn).isNotNull();
2368+
seekToComputeFn.
2369+
seek("seekToComputeFn", 0, current -> 0L);
2370+
}
2371+
}
2372+
}
2373+
}
2374+
}
2375+
23362376
public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
23372377

23382378
final CountDownLatch latch1 = new CountDownLatch(10);

0 commit comments

Comments
 (0)