Skip to content

Commit cc1d11d

Browse files
committed
spring-projectsGH-2170: Add a way to create a custom OffsetAndMetadata
1 parent 1d13671 commit cc1d11d

File tree

6 files changed

+192
-9
lines changed

6 files changed

+192
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3535
import org.apache.kafka.clients.consumer.ConsumerConfig;
3636
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
37+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
3738
import org.apache.kafka.common.TopicPartition;
3839

3940
import org.springframework.beans.BeanUtils;
@@ -167,6 +168,10 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
167168
if (this.containerProperties.getConsumerRebalanceListener() == null) {
168169
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
169170
}
171+
final OffsetCommitCallback commitCallback = containerProperties.getCommitCallback();
172+
if (commitCallback != null) {
173+
this.containerProperties.setCommitCallback(commitCallback, containerProperties.getOffsetAndMetadataProvider());
174+
}
170175
}
171176

172177
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.regex.Pattern;
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2526
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2627

2728
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -89,6 +90,11 @@ public class ConsumerProperties {
8990
*/
9091
private OffsetCommitCallback commitCallback;
9192

93+
/**
94+
* A provider for {@link OffsetAndMetadata}. The provider allows customization of metadata.
95+
*/
96+
private OffsetAndMetadataProvider offsetAndMetadataProvider = (listenerMetadata, offset) -> new OffsetAndMetadata(offset);
97+
9298
/**
9399
* Whether or not to call consumer.commitSync() or commitAsync() when the
94100
* container is responsible for commits. Default true.
@@ -275,6 +281,20 @@ public void setCommitCallback(OffsetCommitCallback commitCallback) {
275281
this.commitCallback = commitCallback;
276282
}
277283

284+
/**
285+
* Set the commit callback and a metadata provider; by default a simple logging callback is used to log
286+
* success at DEBUG level and failures at ERROR level.
287+
* Used when {@link #setSyncCommits(boolean) syncCommits} is false.
288+
* @param commitCallback the callback.
289+
* @param offsetAndMetadataProvider an offset and metadata provider.
290+
* @since 2.8.5
291+
* @see #setSyncCommits(boolean)
292+
*/
293+
public void setCommitCallback(OffsetCommitCallback commitCallback, OffsetAndMetadataProvider offsetAndMetadataProvider) {
294+
this.commitCallback = commitCallback;
295+
this.offsetAndMetadataProvider = offsetAndMetadataProvider;
296+
}
297+
278298
/**
279299
* Return the commit callback.
280300
* @return the callback.
@@ -284,6 +304,15 @@ public OffsetCommitCallback getCommitCallback() {
284304
return this.commitCallback;
285305
}
286306

307+
/**
308+
* Return the offset and metadata provider.
309+
* @return the offset and metadata provider.
310+
*/
311+
@Nullable
312+
public OffsetAndMetadataProvider getOffsetAndMetadataProvider() {
313+
return this.offsetAndMetadataProvider;
314+
}
315+
287316
/**
288317
* Set whether or not to call consumer.commitSync() or commitAsync() when the
289318
* container is responsible for commits. Default true.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,7 +1438,7 @@ private void fixTxOffsetsIfNeeded() {
14381438
return;
14391439
}
14401440
if (position > oamd.offset()) {
1441-
toFix.put(tp, new OffsetAndMetadata(position));
1441+
toFix.put(tp, createOffsetAndMetadata(position));
14421442
}
14431443
});
14441444
if (toFix.size() > 0) {
@@ -1910,7 +1910,7 @@ else if (record.offset() < offs.get(0)) {
19101910
private void ackImmediate(ConsumerRecord<K, V> record) {
19111911
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
19121912
new TopicPartition(record.topic(), record.partition()),
1913-
new OffsetAndMetadata(record.offset() + 1));
1913+
createOffsetAndMetadata(record.offset() + 1));
19141914
this.commitLogger.log(() -> COMMITTING + commits);
19151915
if (this.producer != null) {
19161916
doSendOffsets(this.producer, commits);
@@ -1926,9 +1926,8 @@ else if (this.syncCommits) {
19261926
private void ackImmediate(ConsumerRecords<K, V> records) {
19271927
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
19281928
for (TopicPartition part : records.partitions()) {
1929-
commits.put(part,
1930-
new OffsetAndMetadata(records.records(part)
1931-
.get(records.records(part).size() - 1).offset() + 1));
1929+
commits.put(part, createOffsetAndMetadata(records.records(part)
1930+
.get(records.records(part).size() - 1).offset() + 1));
19321931
}
19331932
this.commitLogger.log(() -> COMMITTING + commits);
19341933
if (this.producer != null) {
@@ -2694,7 +2693,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
26942693
if (this.isRecordAck) {
26952694
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
26962695
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
2697-
new OffsetAndMetadata(record.offset() + 1));
2696+
createOffsetAndMetadata(record.offset() + 1));
26982697
if (this.producer == null) {
26992698
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
27002699
if (this.syncCommits) {
@@ -2996,7 +2995,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
29962995
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
29972996
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
29982997
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
2999-
new OffsetAndMetadata(offset.getValue() + 1));
2998+
createOffsetAndMetadata(offset.getValue() + 1));
30002999
}
30013000
}
30023001
this.offsets.clear();
@@ -3079,6 +3078,26 @@ public String toString() {
30793078
+ "\n]";
30803079
}
30813080

3081+
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3082+
final OffsetAndMetadataProvider metadataProvider = this.containerProperties.getOffsetAndMetadataProvider();
3083+
return metadataProvider == null
3084+
? new OffsetAndMetadata(offset)
3085+
: metadataProvider.provide(new ConsumerAwareListenerMetadata(), offset);
3086+
}
3087+
3088+
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3089+
3090+
@Override
3091+
public String getListenerId() {
3092+
return getBeanName();
3093+
}
3094+
3095+
@Override
3096+
public String getGroupId() {
3097+
return ListenerConsumer.this.consumerGroupId;
3098+
}
3099+
}
3100+
30823101
private final class ConsumerAcknowledgment implements Acknowledgment {
30833102

30843103
private final ConsumerRecord<K, V> record;
@@ -3272,8 +3291,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
32723291
for (TopicPartition partition : partitions) {
32733292
try {
32743293
if (committed.get(partition) == null) { // no existing commit for this group
3275-
offsetsToCommit.put(partition,
3276-
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
3294+
offsetsToCommit.put(partition, createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
32773295
}
32783296
}
32793297
catch (NoOffsetForPartitionException e) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* Metadata associated to a {@link org.springframework.kafka.annotation.KafkaListener}.
21+
*
22+
* @author Francois Rosiere
23+
* @since 2.8.5
24+
* @see org.springframework.kafka.annotation.KafkaListener
25+
*/
26+
public interface ListenerMetadata {
27+
28+
/**
29+
* Return the listener id.
30+
* @return the listener id.
31+
*/
32+
String getListenerId();
33+
34+
/**
35+
* Return the group id.
36+
* @return the group id.
37+
*/
38+
String getGroupId();
39+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
21+
/**
22+
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23+
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24+
* have more granularity in the way to create an {@link OffsetAndMetadata}.
25+
*
26+
* @author Francois Rosiere
27+
* @since 2.8.5
28+
* @see org.apache.kafka.clients.consumer.OffsetCommitCallback
29+
*/
30+
public interface OffsetAndMetadataProvider {
31+
32+
/**
33+
* Provide an offset and metadata object for the given listener metadata and offset.
34+
*
35+
* @param listenerMetadata metadata associated to a listener.
36+
* @param offset an offset.
37+
* @return an offset and metadata.
38+
*/
39+
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
40+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
2828
import static org.mockito.BDDMockito.willThrow;
29+
import static org.mockito.Mockito.doAnswer;
30+
import static org.mockito.Mockito.doCallRealMethod;
31+
import static org.mockito.Mockito.doNothing;
2932
import static org.mockito.Mockito.inOrder;
3033
import static org.mockito.Mockito.mock;
3134
import static org.mockito.Mockito.never;
3235
import static org.mockito.Mockito.spy;
3336
import static org.mockito.Mockito.times;
3437
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.when;
3539

3640
import java.time.Duration;
3741
import java.util.ArrayList;
@@ -3844,6 +3848,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38443848
container.stop();
38453849
}
38463850

3851+
@Test
3852+
public void testOffsetAndMetadataWithoutProvider() throws InterruptedException {
3853+
testOffsetAndMetadata(null, new OffsetAndMetadata(1));
3854+
}
3855+
3856+
@Test
3857+
public void testOffsetAndMetadataWithProvider() throws InterruptedException {
3858+
testOffsetAndMetadata((listenerMetadata, offset) ->
3859+
new OffsetAndMetadata(offset, listenerMetadata.getGroupId()),
3860+
new OffsetAndMetadata(1, "grp"));
3861+
}
3862+
3863+
@SuppressWarnings("unchecked")
3864+
private void testOffsetAndMetadata(OffsetAndMetadataProvider provider,
3865+
OffsetAndMetadata expectedOffsetAndMetadata) throws InterruptedException {
3866+
final ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3867+
final Consumer<Integer, String> consumer = mock(Consumer.class);
3868+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3869+
given(consumer.poll(any(Duration.class))).willAnswer(i -> new ConsumerRecords<>(
3870+
Map.of(
3871+
new TopicPartition("foo", 0),
3872+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))
3873+
)
3874+
));
3875+
final ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> offsetsCaptor = ArgumentCaptor.forClass(Map.class);
3876+
final CountDownLatch latch = new CountDownLatch(1);
3877+
doAnswer(invocation -> {
3878+
latch.countDown();
3879+
return null;
3880+
}).when(consumer).commitAsync(offsetsCaptor.capture(), any());
3881+
final ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
3882+
containerProps.setGroupId("grp");
3883+
containerProps.setClientId("clientId");
3884+
containerProps.setSyncCommits(false);
3885+
containerProps.setMessageListener((MessageListener<Integer, String>) data -> {
3886+
});
3887+
containerProps.setCommitCallback((offsets, exception) -> {
3888+
}, provider);
3889+
final KafkaMessageListenerContainer<Integer, String> container =
3890+
new KafkaMessageListenerContainer<>(cf, containerProps);
3891+
container.start();
3892+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3893+
assertThat(offsetsCaptor.getValue())
3894+
.hasSize(1)
3895+
.containsValue(expectedOffsetAndMetadata);
3896+
container.stop();
3897+
}
3898+
38473899
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
38483900
Consumer<?, ?> consumer =
38493901
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)