Skip to content

Commit 1708bf4

Browse files
garyrussellartembilan
authored andcommitted
GH-2249: Batch Listener LISTENER_INFO Headers
Resolves #2249 * Single String parameter for batch listeners. * Polish docs for an empty batch. **cherry-pick to 2.9.x, 2.8.x**
1 parent fc544cd commit 1708bf4

File tree

4 files changed

+47
-5
lines changed

4 files changed

+47
-5
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5529,6 +5529,30 @@ When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the
55295529

55305530
The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.
55315531

5532+
For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single `String` parameter after conversion.
5533+
5534+
====
5535+
[source, java]
5536+
----
5537+
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
5538+
info = "info for batch")
5539+
public void listen(List<Thing> list,
5540+
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
5541+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
5542+
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
5543+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
5544+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
5545+
...
5546+
}
5547+
----
5548+
====
5549+
5550+
NOTE: If the batch listener has a filter and the filter results in an empty batch, you will need to add `required = false` to the `@Header` parameter because the info is not available for an empty batch.
5551+
5552+
If you receive `List<Message<Thing>>` the info is in the `KafkaHeaders.LISTENER_INFO` header of each `Message<?>`.
5553+
5554+
See <<batch-listeners>> for more information about consuming batches.
5555+
55325556
[[dead-letters]]
55335557
===== Publishing Dead-letter Records
55345558

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2238,6 +2238,9 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
22382238

22392239
ConsumerRecords<K, V> records = recordsArg;
22402240
List<ConsumerRecord<K, V>> recordList = recordListArg;
2241+
if (this.listenerinfo != null) {
2242+
records.iterator().forEachRemaining(rec -> listenerInfo(rec));
2243+
}
22412244
if (this.batchInterceptor != null) {
22422245
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
22432246
if (records == null) {
@@ -2475,10 +2478,14 @@ private void internalHeaders(final ConsumerRecord<K, V> record) {
24752478
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
24762479
}
24772480
if (this.listenerinfo != null) {
2478-
record.headers().add(this.infoHeader);
2481+
listenerInfo(record);
24792482
}
24802483
}
24812484

2485+
private void listenerInfo(final ConsumerRecord<K, V> record) {
2486+
record.headers().add(this.infoHeader);
2487+
}
2488+
24822489
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
24832490
if (!this.autoCommit && !this.isRecordAck) {
24842491
processCommits();

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -169,8 +169,8 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
169169
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
170170
timestamps);
171171
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
172-
173172
boolean logged = false;
173+
String info = null;
174174
for (ConsumerRecord<?, ?> record : records) {
175175
payloads.add(obtainPayload(type, record, conversionFailures));
176176
keys.add(record.key());
@@ -185,6 +185,10 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
185185
Map<String, Object> converted = new HashMap<>();
186186
this.headerMapper.toHeaders(record.headers(), converted);
187187
convertedHeaders.add(converted);
188+
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
189+
if (object instanceof String) {
190+
info = (String) object;
191+
}
188192
}
189193
else {
190194
if (!logged) {
@@ -200,6 +204,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
200204
raws.add(record);
201205
}
202206
}
207+
if (info != null) {
208+
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
209+
}
203210
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
204211
}
205212

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ public void testBatchWitHeaders() throws Exception {
590590
list = this.listener.offsets;
591591
assertThat(list.size()).isGreaterThan(0);
592592
assertThat(list.get(0)).isInstanceOf(Long.class);
593+
assertThat(this.listener.listenerInfo).isEqualTo("info for batch");
593594
}
594595

595596
@Test
@@ -1973,17 +1974,20 @@ public void listen10(List<String> list, @Header(KafkaHeaders.GROUP_ID) String gr
19731974
this.latch10.countDown();
19741975
}
19751976

1976-
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory")
1977+
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory",
1978+
info = "info for batch")
19771979
public void listen11(List<String> list,
19781980
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
19791981
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
19801982
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
1981-
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
1983+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
1984+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
19821985
this.payload = list;
19831986
this.keys = keys;
19841987
this.partitions = partitions;
19851988
this.topics = topics;
19861989
this.offsets = offsets;
1990+
this.listenerInfo = info;
19871991
this.latch11.countDown();
19881992
}
19891993

0 commit comments

Comments
 (0)