Skip to content

Commit 50498f9

Browse files
frosieregaryrussell
authored andcommitted
GH-2170: Support Custom OffsetAndMetadata
Resolves #2170 Add a way to create a custom OffsetAndMetadata GH-2170: Remove useless initializations
1 parent bf36c64 commit 50498f9

File tree

5 files changed

+196
-9
lines changed

5 files changed

+196
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.regex.Pattern;
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2526
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2627

2728
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -89,6 +90,12 @@ public class ConsumerProperties {
8990
*/
9091
private OffsetCommitCallback commitCallback;
9192

93+
/**
94+
* A provider for {@link OffsetAndMetadata}; by default, the provider creates an offset and metadata with
95+
* empty metadata. The provider gives a way to customize the metadata.
96+
*/
97+
private OffsetAndMetadataProvider offsetAndMetadataProvider;
98+
9299
/**
93100
* Whether or not to call consumer.commitSync() or commitAsync() when the
94101
* container is responsible for commits. Default true.
@@ -275,6 +282,16 @@ public void setCommitCallback(OffsetCommitCallback commitCallback) {
275282
this.commitCallback = commitCallback;
276283
}
277284

285+
/**
286+
* Set the offset and metadata provider associated to a commit callback.
287+
* @param offsetAndMetadataProvider an offset and metadata provider.
288+
* @since 2.8.5
289+
* @see #setCommitCallback(OffsetCommitCallback)
290+
*/
291+
public void setOffsetAndMetadataProvider(OffsetAndMetadataProvider offsetAndMetadataProvider) {
292+
this.offsetAndMetadataProvider = offsetAndMetadataProvider;
293+
}
294+
278295
/**
279296
* Return the commit callback.
280297
* @return the callback.
@@ -284,6 +301,15 @@ public OffsetCommitCallback getCommitCallback() {
284301
return this.commitCallback;
285302
}
286303

304+
/**
305+
* Return the offset and metadata provider.
306+
* @return the offset and metadata provider.
307+
*/
308+
@Nullable
309+
public OffsetAndMetadataProvider getOffsetAndMetadataProvider() {
310+
return this.offsetAndMetadataProvider;
311+
}
312+
287313
/**
288314
* Set whether or not to call consumer.commitSync() or commitAsync() when the
289315
* container is responsible for commits. Default true.
@@ -491,6 +517,7 @@ protected final String renderProperties() {
491517
? "\n consumerRebalanceListener=" + this.consumerRebalanceListener
492518
: "")
493519
+ (this.commitCallback != null ? "\n commitCallback=" + this.commitCallback : "")
520+
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
494521
+ "\n syncCommits=" + this.syncCommits
495522
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
496523
+ (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "")

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
557557
? this.containerProperties.getCommitCallback()
558558
: new LoggingCommitCallback();
559559

560+
private final OffsetAndMetadataProvider offsetAndMetadataProvider = this.containerProperties.getOffsetAndMetadataProvider() == null
561+
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
562+
: this.containerProperties.getOffsetAndMetadataProvider();
563+
564+
private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
565+
560566
private final Consumer<K, V> consumer;
561567

562568
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
@@ -1438,7 +1444,7 @@ private void fixTxOffsetsIfNeeded() {
14381444
return;
14391445
}
14401446
if (position > oamd.offset()) {
1441-
toFix.put(tp, new OffsetAndMetadata(position));
1447+
toFix.put(tp, createOffsetAndMetadata(position));
14421448
}
14431449
});
14441450
if (toFix.size() > 0) {
@@ -1910,7 +1916,7 @@ else if (record.offset() < offs.get(0)) {
19101916
private void ackImmediate(ConsumerRecord<K, V> record) {
19111917
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
19121918
new TopicPartition(record.topic(), record.partition()),
1913-
new OffsetAndMetadata(record.offset() + 1));
1919+
createOffsetAndMetadata(record.offset() + 1));
19141920
this.commitLogger.log(() -> COMMITTING + commits);
19151921
if (this.producer != null) {
19161922
doSendOffsets(this.producer, commits);
@@ -1926,9 +1932,8 @@ else if (this.syncCommits) {
19261932
private void ackImmediate(ConsumerRecords<K, V> records) {
19271933
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
19281934
for (TopicPartition part : records.partitions()) {
1929-
commits.put(part,
1930-
new OffsetAndMetadata(records.records(part)
1931-
.get(records.records(part).size() - 1).offset() + 1));
1935+
commits.put(part, createOffsetAndMetadata(records.records(part)
1936+
.get(records.records(part).size() - 1).offset() + 1));
19321937
}
19331938
this.commitLogger.log(() -> COMMITTING + commits);
19341939
if (this.producer != null) {
@@ -2694,7 +2699,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
26942699
if (this.isRecordAck) {
26952700
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
26962701
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
2697-
new OffsetAndMetadata(record.offset() + 1));
2702+
createOffsetAndMetadata(record.offset() + 1));
26982703
if (this.producer == null) {
26992704
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
27002705
if (this.syncCommits) {
@@ -2996,7 +3001,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
29963001
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
29973002
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
29983003
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
2999-
new OffsetAndMetadata(offset.getValue() + 1));
3004+
createOffsetAndMetadata(offset.getValue() + 1));
30003005
}
30013006
}
30023007
this.offsets.clear();
@@ -3079,6 +3084,29 @@ public String toString() {
30793084
+ "\n]";
30803085
}
30813086

3087+
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3088+
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
3089+
}
3090+
3091+
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3092+
3093+
@Override
3094+
public String getListenerId() {
3095+
return getBeanName();
3096+
}
3097+
3098+
@Override
3099+
public String getGroupId() {
3100+
return ListenerConsumer.this.consumerGroupId;
3101+
}
3102+
3103+
@Override
3104+
public byte[] getListenerInfo() {
3105+
return ListenerConsumer.this.listenerinfo;
3106+
}
3107+
3108+
}
3109+
30823110
private final class ConsumerAcknowledgment implements Acknowledgment {
30833111

30843112
private final ConsumerRecord<K, V> record;
@@ -3272,8 +3300,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
32723300
for (TopicPartition partition : partitions) {
32733301
try {
32743302
if (committed.get(partition) == null) { // no existing commit for this group
3275-
offsetsToCommit.put(partition,
3276-
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
3303+
offsetsToCommit.put(partition, createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
32773304
}
32783305
}
32793306
catch (NoOffsetForPartitionException e) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* Metadata associated to a {@link org.springframework.kafka.annotation.KafkaListener}.
21+
*
22+
* @author Francois Rosiere
23+
* @since 2.8.5
24+
* @see org.springframework.kafka.annotation.KafkaListener
25+
*/
26+
public interface ListenerMetadata {
27+
28+
/**
29+
* Return the listener id.
30+
* @return the listener id.
31+
*/
32+
String getListenerId();
33+
34+
/**
35+
* Return the group id.
36+
* @return the group id.
37+
*/
38+
String getGroupId();
39+
40+
/**
41+
* Return the listener info.
42+
* @return the listener info.
43+
*/
44+
byte[] getListenerInfo();
45+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
21+
/**
22+
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23+
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24+
* have more granularity in the way to create an {@link OffsetAndMetadata}.
25+
*
26+
* @author Francois Rosiere
27+
* @since 2.8.5
28+
* @see org.apache.kafka.clients.consumer.OffsetCommitCallback
29+
*/
30+
public interface OffsetAndMetadataProvider {
31+
32+
/**
33+
* Provide an offset and metadata object for the given listener metadata and offset.
34+
*
35+
* @param listenerMetadata metadata associated to a listener.
36+
* @param offset an offset.
37+
* @return an offset and metadata.
38+
*/
39+
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
40+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3844,6 +3844,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38443844
container.stop();
38453845
}
38463846

3847+
@Test
3848+
public void testOffsetAndMetadataWithoutProvider() throws InterruptedException {
3849+
testOffsetAndMetadata(null, new OffsetAndMetadata(1));
3850+
}
3851+
3852+
@Test
3853+
public void testOffsetAndMetadataWithProvider() throws InterruptedException {
3854+
testOffsetAndMetadata((listenerMetadata, offset) ->
3855+
new OffsetAndMetadata(offset, listenerMetadata.getGroupId()),
3856+
new OffsetAndMetadata(1, "grp"));
3857+
}
3858+
3859+
@SuppressWarnings("unchecked")
3860+
private void testOffsetAndMetadata(OffsetAndMetadataProvider provider, OffsetAndMetadata expectedOffsetAndMetadata) throws InterruptedException {
3861+
final ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3862+
final Consumer<Integer, String> consumer = mock(Consumer.class);
3863+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3864+
given(consumer.poll(any(Duration.class))).willAnswer(i -> new ConsumerRecords<>(
3865+
Map.of(
3866+
new TopicPartition("foo", 0),
3867+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))
3868+
)
3869+
));
3870+
final ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> offsetsCaptor = ArgumentCaptor.forClass(Map.class);
3871+
final CountDownLatch latch = new CountDownLatch(1);
3872+
willAnswer(invocation -> {
3873+
latch.countDown();
3874+
return null;
3875+
}).given(consumer).commitAsync(offsetsCaptor.capture(), any());
3876+
final ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
3877+
containerProps.setGroupId("grp");
3878+
containerProps.setClientId("clientId");
3879+
containerProps.setSyncCommits(false);
3880+
containerProps.setMessageListener((MessageListener<Integer, String>) data -> {
3881+
});
3882+
containerProps.setCommitCallback((offsets, exception) -> {
3883+
});
3884+
containerProps.setOffsetAndMetadataProvider(provider);
3885+
final KafkaMessageListenerContainer<Integer, String> container =
3886+
new KafkaMessageListenerContainer<>(cf, containerProps);
3887+
container.start();
3888+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3889+
assertThat(offsetsCaptor.getValue())
3890+
.hasSize(1)
3891+
.containsValue(expectedOffsetAndMetadata);
3892+
container.stop();
3893+
}
3894+
38473895
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
38483896
Consumer<?, ?> consumer =
38493897
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)