Skip to content

Commit a957b39

Browse files
authored
GH-2249: Batch Listener LISTENER_INFO Headers
Resolves #2249 * Single String parameter for batch listeners. * Polish docs for an empty batch. **cherry-pick to 2.9.x, 2.8.x**
1 parent 03b0009 commit a957b39

File tree

4 files changed

+47
-5
lines changed

4 files changed

+47
-5
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5567,6 +5567,30 @@ When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the
55675567

55685568
The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.
55695569

5570+
For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single `String` parameter after conversion.
5571+
5572+
====
5573+
[source, java]
5574+
----
5575+
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
5576+
info = "info for batch")
5577+
public void listen(List<Thing> list,
5578+
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
5579+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
5580+
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
5581+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
5582+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
5583+
...
5584+
}
5585+
----
5586+
====
5587+
5588+
NOTE: If the batch listener has a filter and the filter results in an empty batch, you will need to add `required = false` to the `@Header` parameter because the info is not available for an empty batch.
5589+
5590+
If you receive `List<Message<Thing>>` the info is in the `KafkaHeaders.LISTENER_INFO` header of each `Message<?>`.
5591+
5592+
See <<batch-listeners>> for more information about consuming batches.
5593+
55705594
[[dead-letters]]
55715595
===== Publishing Dead-letter Records
55725596

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2239,6 +2239,9 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
22392239

22402240
ConsumerRecords<K, V> records = recordsArg;
22412241
List<ConsumerRecord<K, V>> recordList = recordListArg;
2242+
if (this.listenerinfo != null) {
2243+
records.iterator().forEachRemaining(rec -> listenerInfo(rec));
2244+
}
22422245
if (this.batchInterceptor != null) {
22432246
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
22442247
if (records == null) {
@@ -2475,10 +2478,14 @@ private void internalHeaders(final ConsumerRecord<K, V> record) {
24752478
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
24762479
}
24772480
if (this.listenerinfo != null) {
2478-
record.headers().add(this.infoHeader);
2481+
listenerInfo(record);
24792482
}
24802483
}
24812484

2485+
private void listenerInfo(final ConsumerRecord<K, V> record) {
2486+
record.headers().add(this.infoHeader);
2487+
}
2488+
24822489
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
24832490
if (!this.autoCommit && !this.isRecordAck) {
24842491
processCommits();

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -169,8 +169,8 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
169169
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
170170
timestamps);
171171
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
172-
173172
boolean logged = false;
173+
String info = null;
174174
for (ConsumerRecord<?, ?> record : records) {
175175
payloads.add(obtainPayload(type, record, conversionFailures));
176176
keys.add(record.key());
@@ -185,6 +185,10 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
185185
Map<String, Object> converted = new HashMap<>();
186186
this.headerMapper.toHeaders(record.headers(), converted);
187187
convertedHeaders.add(converted);
188+
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
189+
if (object instanceof String) {
190+
info = (String) object;
191+
}
188192
}
189193
else {
190194
if (!logged) {
@@ -200,6 +204,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
200204
raws.add(record);
201205
}
202206
}
207+
if (info != null) {
208+
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
209+
}
203210
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
204211
}
205212

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ public void testBatchWitHeaders() throws Exception {
584584
list = this.listener.offsets;
585585
assertThat(list.size()).isGreaterThan(0);
586586
assertThat(list.get(0)).isInstanceOf(Long.class);
587+
assertThat(this.listener.listenerInfo).isEqualTo("info for batch");
587588
}
588589

589590
@Test
@@ -1980,17 +1981,20 @@ public void listen10(List<String> list, @Header(KafkaHeaders.GROUP_ID) String gr
19801981
this.latch10.countDown();
19811982
}
19821983

1983-
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory")
1984+
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory",
1985+
info = "info for batch")
19841986
public void listen11(List<String> list,
19851987
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
19861988
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
19871989
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
1988-
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
1990+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
1991+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
19891992
this.payload = list;
19901993
this.keys = keys;
19911994
this.partitions = partitions;
19921995
this.topics = topics;
19931996
this.offsets = offsets;
1997+
this.listenerInfo = info;
19941998
this.latch11.countDown();
19951999
}
19962000

0 commit comments

Comments
 (0)