Skip to content

Commit 5fb1562

Browse files
bky373sobychacko
authored andcommitted
GH-3589: Refactor toMessage batch method and fix logging/header issues
Remove unnecessary check Fix checkstyle violation Change log level from WARN to DEBUG Refactor headers conversion method Add natives condition for logging when no header mapper
1 parent 4a45905 commit 5fb1562

File tree

1 file changed

+48
-53
lines changed

1 file changed

+48
-53
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.Optional;
2726

2827
import org.apache.commons.logging.LogFactory;
2928
import org.apache.kafka.clients.consumer.Consumer;
@@ -63,6 +62,7 @@
6362
* @author Biju Kunjummen
6463
* @author Sanghyeok An
6564
* @author Hope Kim
65+
* @author Borahm Lee
6666
* @since 1.1
6767
*/
6868
public class BatchMessagingMessageConverter implements BatchMessageConverter {
@@ -93,7 +93,7 @@ public BatchMessagingMessageConverter() {
9393
* @param recordConverter the converter.
9494
* @since 1.3.2
9595
*/
96-
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
96+
public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) {
9797
this.recordConverter = recordConverter;
9898
if (JacksonPresent.isJackson2Present()) {
9999
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
144144
}
145145

146146
@Override // NOSONAR
147-
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type type) {
147+
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
148+
Consumer<?, ?> consumer, Type type) {
148149

149150
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
150151
this.generateTimestamp);
@@ -165,65 +166,38 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
165166
addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
166167
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
167168
timestamps);
168-
records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
169-
convertedHeaders, natives, raws, conversionFailures, rawHeaders, type));
170-
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
171-
}
172-
173-
private void processRecord(ConsumerRecord<?, ?> record, List<Object> payloads, List<Object> keys,
174-
List<String> topics, List<Integer> partitions, List<Long> offsets,
175-
List<String> timestampTypes, List<Long> timestamps, List<Map<String, Object>> convertedHeaders,
176-
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures,
177-
Map<String, Object> rawHeaders, Type type) {
178-
payloads.add(obtainPayload(type, record, conversionFailures));
179-
keys.add(record.key());
180-
topics.add(record.topic());
181-
partitions.add(record.partition());
182-
offsets.add(record.offset());
183-
184-
if (record.timestampType() != null) {
185-
timestampTypes.add(record.timestampType().name());
186-
}
187-
timestamps.add(record.timestamp());
188169

189-
boolean logged = false;
190-
String info = null;
191-
192-
if (this.headerMapper != null && record.headers() != null) {
193-
Map<String, Object> converted = new HashMap<>();
194-
this.headerMapper.toHeaders(record.headers(), converted);
195-
convertedHeaders.add(converted);
196-
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
197-
info = Optional.ofNullable(object)
198-
.filter(String.class::isInstance)
199-
.map(String.class::cast)
200-
.orElse(null);
201-
}
202-
else {
203-
if (!logged) {
204-
logHeaderWarningOnce();
205-
logged = true;
170+
String listenerInfo = null;
171+
for (ConsumerRecord<?, ?> record : records) {
172+
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
173+
if (this.headerMapper != null && record.headers() != null) {
174+
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
175+
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
176+
if (obj instanceof String) {
177+
listenerInfo = (String) obj;
178+
}
179+
}
180+
else {
181+
natives.add(record.headers());
182+
}
183+
if (this.rawRecordHeader) {
184+
raws.add(record);
206185
}
207-
natives.add(record.headers());
208186
}
209-
if (this.rawRecordHeader) {
210-
raws.add(record);
187+
if (this.headerMapper == null && !natives.isEmpty()) {
188+
this.logger.debug(() ->
189+
"No header mapper is available; Jackson is required for the default mapper; "
190+
+ "headers (if present) are not mapped but provided raw in "
191+
+ KafkaHeaders.NATIVE_HEADERS);
211192
}
212-
if (info != null) {
213-
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
193+
if (listenerInfo != null) {
194+
rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo);
214195
}
215-
}
216-
217-
private void logHeaderWarningOnce() {
218-
this.logger.debug(() ->
219-
"No header mapper is available; Jackson is required for the default mapper; "
220-
+ "headers (if present) are not mapped but provided raw in "
221-
+ KafkaHeaders.NATIVE_HEADERS);
196+
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
222197
}
223198

224199
private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
225200
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
226-
227201
if (this.headerMapper != null) {
228202
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
229203
}
@@ -236,12 +210,33 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
236210
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
237211
}
238212

213+
private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
214+
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
215+
List<Long> timestamps, List<ConversionException> conversionFailures) {
216+
payloads.add(obtainPayload(type, record, conversionFailures));
217+
keys.add(record.key());
218+
topics.add(record.topic());
219+
partitions.add(record.partition());
220+
offsets.add(record.offset());
221+
timestamps.add(record.timestamp());
222+
if (record.timestampType() != null) {
223+
timestampTypes.add(record.timestampType().name());
224+
}
225+
}
226+
239227
private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
240228
return this.recordConverter == null || !containerType(type)
241229
? extractAndConvertValue(record, type)
242230
: convert(record, type, conversionFailures);
243231
}
244232

233+
private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
234+
Map<String, Object> converted = new HashMap<>();
235+
this.headerMapper.toHeaders(headers, converted);
236+
convertedHeaders.add(converted);
237+
return converted;
238+
}
239+
245240
@Override
246241
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
247242
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)