Skip to content

Commit c7ee2f4

Browse files
bky373sobychacko
authored andcommitted
GH-3589: Refactor toMessage batch method and fix logging/header issues
Remove unnecessary check Fix checkstyle violation Change log level from WARN to DEBUG Refactor headers conversion method Add natives condition for logging when no header mapper
1 parent 7b56e33 commit c7ee2f4

File tree

1 file changed

+39
-28
lines changed

1 file changed

+39
-28
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Gary Russell
6161
* @author Dariusz Szablinski
6262
* @author Biju Kunjummen
63+
* @author Borahm Lee
6364
* @since 1.1
6465
*/
6566
public class BatchMessagingMessageConverter implements BatchMessageConverter {
@@ -89,7 +90,7 @@ public BatchMessagingMessageConverter() {
8990
* @param recordConverter the converter.
9091
* @since 1.3.2
9192
*/
92-
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
93+
public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) {
9394
this.recordConverter = recordConverter;
9495
if (JacksonPresent.isJackson2Present()) {
9596
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -157,53 +158,42 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
157158
List<Headers> natives = new ArrayList<>();
158159
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
159160
List<ConversionException> conversionFailures = new ArrayList<>();
161+
160162
addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
161163
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
162164
timestamps);
163-
boolean logged = false;
164-
String info = null;
165+
166+
String listenerInfo = null;
165167
for (ConsumerRecord<?, ?> record : records) {
166-
payloads.add(obtainPayload(type, record, conversionFailures));
167-
keys.add(record.key());
168-
topics.add(record.topic());
169-
partitions.add(record.partition());
170-
offsets.add(record.offset());
171-
if (record.timestampType() != null) {
172-
timestampTypes.add(record.timestampType().name());
173-
}
174-
timestamps.add(record.timestamp());
168+
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
175169
if (this.headerMapper != null && record.headers() != null) {
176-
Map<String, Object> converted = new HashMap<>();
177-
this.headerMapper.toHeaders(record.headers(), converted);
178-
convertedHeaders.add(converted);
179-
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
180-
if (object instanceof String) {
181-
info = (String) object;
170+
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
171+
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
172+
if (obj instanceof String) {
173+
listenerInfo = (String) obj;
182174
}
183175
}
184176
else {
185-
if (!logged) {
186-
this.logger.debug(() ->
187-
"No header mapper is available; Jackson is required for the default mapper; "
188-
+ "headers (if present) are not mapped but provided raw in "
189-
+ KafkaHeaders.NATIVE_HEADERS);
190-
logged = true;
191-
}
192177
natives.add(record.headers());
193178
}
194179
if (this.rawRecordHeader) {
195180
raws.add(record);
196181
}
197182
}
198-
if (info != null) {
199-
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
183+
if (this.headerMapper == null && !natives.isEmpty()) {
184+
this.logger.debug(() ->
185+
"No header mapper is available; Jackson is required for the default mapper; "
186+
+ "headers (if present) are not mapped but provided raw in "
187+
+ KafkaHeaders.NATIVE_HEADERS);
188+
}
189+
if (listenerInfo != null) {
190+
rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo);
200191
}
201192
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
202193
}
203194

204195
private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
205196
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
206-
207197
if (this.headerMapper != null) {
208198
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
209199
}
@@ -216,12 +206,33 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
216206
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
217207
}
218208

209+
private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
210+
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
211+
List<Long> timestamps, List<ConversionException> conversionFailures) {
212+
payloads.add(obtainPayload(type, record, conversionFailures));
213+
keys.add(record.key());
214+
topics.add(record.topic());
215+
partitions.add(record.partition());
216+
offsets.add(record.offset());
217+
timestamps.add(record.timestamp());
218+
if (record.timestampType() != null) {
219+
timestampTypes.add(record.timestampType().name());
220+
}
221+
}
222+
219223
private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
220224
return this.recordConverter == null || !containerType(type)
221225
? extractAndConvertValue(record, type)
222226
: convert(record, type, conversionFailures);
223227
}
224228

229+
private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
230+
Map<String, Object> converted = new HashMap<>();
231+
this.headerMapper.toHeaders(headers, converted);
232+
convertedHeaders.add(converted);
233+
return converted;
234+
}
235+
225236
@Override
226237
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
227238
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)