Skip to content

Commit 968df68

Browse files
frosieregaryrussell
authored andcommitted
GH-2252: Keep offset metadata in case of batch reprocessing (#2253)
* GH-2252: Keep offset metadata in case of batch reprocessing * GH-2252: Applied code review * GH-2252: Added missing static keyword
1 parent c107857 commit 968df68

File tree

12 files changed

+133
-36
lines changed

12 files changed

+133
-36
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2474,6 +2474,10 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize
24742474
|`null`
24752475
|When present and `syncCommits` is `false` a callback invoked after the commit completes.
24762476

2477+
|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
2478+
|`null`
2479+
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.
2480+
24772481
|[[commitLogLevel]]<<commitLogLevel,`commitLogLevel`>>
24782482
|DEBUG
24792483
|The logging level for logs pertaining to committing offsets.

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,8 @@ public String getListenerId() {
375375
return this.beanName; // the container factory sets the bean name to the id attribute
376376
}
377377

378-
/**
379-
* Get arbitrary static information that will be added to the
380-
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
381-
* @return the info.
382-
* @since 2.8.4
383-
*/
384378
@Nullable
379+
@Override
385380
public byte[] getListenerInfo() {
386381
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
387382
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @param <V> the value type.
4545
*
4646
* @author Gary Russell
47+
* @author Francois Rosiere
4748
*
4849
* @since 1.3.5
4950
*
@@ -140,12 +141,12 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
140141
if (EOSMode.V1.equals(eosMode.getMode())) {
141142
this.kafkaTemplate.sendOffsetsToTransaction(
142143
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
143-
new OffsetAndMetadata(skipped.offset() + 1)));
144+
createOffsetAndMetadata(container, skipped.offset() + 1)));
144145
}
145146
else {
146147
this.kafkaTemplate.sendOffsetsToTransaction(
147148
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
148-
new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata());
149+
createOffsetAndMetadata(container, skipped.offset() + 1)), consumer.groupMetadata());
149150
}
150151
}
151152

@@ -172,4 +173,10 @@ public void clearThreadState() {
172173
this.lastIntervals.remove();
173174
}
174175

176+
private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) {
177+
if (container == null) {
178+
return new OffsetAndMetadata(offset);
179+
}
180+
return ListenerUtils.createOffsetAndMetadata(container, offset);
181+
}
175182
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2017-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* Default implementation for {@link ListenerMetadata}.
24+
* @author Francois Rosiere
25+
* @since 2.8.6
26+
*/
27+
class DefaultListenerMetadata implements ListenerMetadata {
28+
29+
private final MessageListenerContainer container;
30+
31+
DefaultListenerMetadata(MessageListenerContainer container) {
32+
Assert.notNull(container, "'container' must not be null");
33+
this.container = container;
34+
}
35+
36+
@Override
37+
@Nullable
38+
public String getListenerId() {
39+
return this.container.getListenerId();
40+
}
41+
42+
@Override
43+
@Nullable
44+
public String getGroupId() {
45+
return this.container.getGroupId();
46+
}
47+
48+
@Override
49+
@Nullable
50+
public byte[] getListenerInfo() {
51+
return this.container.getListenerInfo();
52+
}
53+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
* fallback handler.
4848
*
4949
* @author Gary Russell
50+
* @author Francois Rosiere
5051
* @since 2.8
5152
*
5253
*/
@@ -138,7 +139,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
138139
}
139140
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
140141
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
141-
(key, val) -> new OffsetAndMetadata(rec.offset() + 1)));
142+
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
142143
if (offsets.size() > 0) {
143144
commit(consumer, container, offsets);
144145
}
@@ -149,7 +150,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
149150
ConsumerRecord<?, ?> recovered = remaining.get(0);
150151
commit(consumer, container,
151152
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
152-
new OffsetAndMetadata(recovered.offset() + 1)));
153+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
153154
if (remaining.size() > 1) {
154155
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
155156
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
* @author Tom van den Berge
147147
* @author Lukasz Kaminski
148148
* @author Tomaz Fernandes
149+
* @author Francois Rosiere
149150
*/
150151
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
151152
extends AbstractMessageListenerContainer<K, V> {
@@ -564,7 +565,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
564565
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
565566
: this.containerProperties.getOffsetAndMetadataProvider();
566567

567-
private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
568+
private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);
568569

569570
private final Consumer<K, V> consumer;
570571

@@ -3217,26 +3218,7 @@ private String zombieFenceTxIdSuffix(String topic, int partition) {
32173218
}
32183219

32193220
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3220-
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
3221-
}
3222-
3223-
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3224-
3225-
@Override
3226-
public String getListenerId() {
3227-
return getBeanName();
3228-
}
3229-
3230-
@Override
3231-
public String getGroupId() {
3232-
return ListenerConsumer.this.consumerGroupId;
3233-
}
3234-
3235-
@Override
3236-
public byte[] getListenerInfo() {
3237-
return ListenerConsumer.this.listenerinfo;
3238-
}
3239-
3221+
return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
32403222
}
32413223

32423224
private final class ConsumerAcknowledgment implements Acknowledgment {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2627
import org.apache.kafka.common.Metric;
2728
import org.apache.kafka.common.MetricName;
2829
import org.apache.kafka.common.header.Header;
@@ -41,6 +42,7 @@
4142
* Listener utilities.
4243
*
4344
* @author Gary Russell
45+
* @author Francois Rosiere
4446
* @since 2.0
4547
*
4648
*/
@@ -287,5 +289,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter
287289
while (System.currentTimeMillis() < timeout);
288290
}
289291

292+
/**
293+
* Create a new {@link OffsetAndMetadata} using the given container and offset.
294+
* @param container a container.
295+
* @param offset an offset.
296+
* @return an offset and metadata.
297+
* @since 2.8.6
298+
*/
299+
public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container,
300+
long offset) {
301+
final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties()
302+
.getOffsetAndMetadataProvider();
303+
if (metadataProvider != null) {
304+
return metadataProvider.provide(new DefaultListenerMetadata(container), offset);
305+
}
306+
return new OffsetAndMetadata(offset);
307+
}
290308
}
291309

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.context.SmartLifecycle;
28+
import org.springframework.kafka.support.KafkaHeaders;
2829
import org.springframework.lang.Nullable;
2930

3031
/**
@@ -35,6 +36,7 @@
3536
* @author Gary Russell
3637
* @author Vladimir Tsanev
3738
* @author Tomaz Fernandes
39+
* @author Francois Rosiere
3840
*/
3941
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
4042

@@ -194,6 +196,17 @@ default String getListenerId() {
194196
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
195197
}
196198

199+
/**
200+
* Get arbitrary static information that will be added to the
201+
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
202+
* @return the info.
203+
* @since 2.8.6
204+
*/
205+
@Nullable
206+
default byte[] getListenerInfo() {
207+
throw new UnsupportedOperationException("This container does not support retrieving the listener info");
208+
}
209+
197210
/**
198211
* If this container has child containers, return true if at least one child is running. If there are not
199212
* child containers, returns {@link #isRunning()}.

spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2020

2121
/**
22-
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23-
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24-
* have more granularity in the way to create an {@link OffsetAndMetadata}.
22+
* Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
23+
* {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
2524
*
2625
* @author Francois Rosiere
2726
* @since 2.8.5

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* Seek utilities.
4444
*
4545
* @author Gary Russell
46+
* @author Francois Rosiere
4647
* @since 2.2
4748
*
4849
*/
@@ -211,7 +212,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
211212
ConsumerRecord<?, ?> record = records.get(0);
212213
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
213214
new TopicPartition(record.topic(), record.partition()),
214-
new OffsetAndMetadata(record.offset() + 1));
215+
ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
215216
if (container.getContainerProperties().isSyncCommits()) {
216217
consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
217218
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
/**
5151
* @author Gary Russell
52+
* @author Francois Rosiere
5253
* @since 2.3.1
5354
*
5455
*/
@@ -76,6 +77,7 @@ void testClassifier() {
7677
Consumer<String, String> consumer = mock(Consumer.class);
7778
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
7879
MessageListenerContainer container = mock(MessageListenerContainer.class);
80+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
7981
processor.process(records, consumer, container, illegalState, true, EOSMode.V1);
8082
processor.process(records, consumer, container,
8183
new DeserializationException("intended", null, false, illegalState), true, EOSMode.V1);

spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,10 +20,12 @@
2020
import static org.mockito.BDDMockito.given;
2121
import static org.mockito.Mockito.mock;
2222

23+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2324
import org.junit.jupiter.api.Test;
2425

2526
/**
2627
* @author Gary Russell
28+
* @author Francois Rosiere
2729
* @since 2.7.1
2830
*
2931
*/
@@ -38,4 +40,24 @@ void stoppableSleep() throws InterruptedException {
3840
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
3941
}
4042

43+
@Test
44+
void testCreationOfOffsetAndMetadataWithoutProvider() {
45+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
46+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
47+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
48+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
49+
assertThat(offsetAndMetadata.metadata()).isEmpty();
50+
}
51+
52+
@Test
53+
void testCreationOfOffsetAndMetadataWithProvider() {
54+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
55+
final ContainerProperties properties = new ContainerProperties("foo");
56+
properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata"));
57+
given(container.getContainerProperties()).willReturn(properties);
58+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
59+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
60+
assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata");
61+
}
4162
}
63+

0 commit comments

Comments
 (0)