Skip to content

Commit 8ec7ed0

Browse files
garyrussellartembilan
authored andcommitted
GH-2132: Deprecate ListenerUtils.recordToString
Resolves #2133 Apply to 2.8.x, and 2.7.x. In 3.0 they will be removed and `KafkaUtils` called directly instead.
1 parent 60fd3e8 commit 8ec7ed0

17 files changed

+91
-65
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2560,6 +2560,8 @@ See `monitorInterval`.
25602560
|[[onlyLogRecordMetadata]]<<onlyLogRecordMetadata,`onlyLogRecordMetadata`>>
25612561
|`false`
25622562
|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`.
2563+
Deprecated.
2564+
Replaced by `KafkaUtils.setConsumerRecordFormatter`.
25632565

25642566
|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
25652567
|5000
@@ -5720,3 +5722,10 @@ public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
57205722
}
57215723
----
57225724
====
5725+
5726+
[[record-logging]]
5727+
==== Producer and Consumer Record Logging
5728+
5729+
Starting with versions 2.7.12, 2.8.4, you can determine how these records will be rendered in debug logs, etc.
5730+
5731+
See `KafkaUtils.setProducerRecordFormatter()` and `KafkaUtils.setProducerRecordFormatter()` for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,13 +46,15 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
4646
this.ackAfterHandle = ackAfterHandle;
4747
}
4848

49+
@SuppressWarnings("deprecation")
4950
@Override
5051
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
5152
MessageListenerContainer container) {
5253

5354
LOGGER.error(thrownException, () -> "Error occured while processing: " + ListenerUtils.recordToString(record));
5455
}
5556

57+
@SuppressWarnings("deprecation")
5658
@Override
5759
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
5860
MessageListenerContainer container, Runnable invokeListener) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2525
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2626

27+
import org.springframework.kafka.support.KafkaUtils;
2728
import org.springframework.kafka.support.LogIfLevelEnabled;
2829
import org.springframework.kafka.support.TopicPartitionOffset;
2930
import org.springframework.lang.Nullable;
@@ -431,16 +432,20 @@ public void setCommitRetries(int commitRetries) {
431432
this.commitRetries = commitRetries;
432433
}
433434

435+
@Deprecated
434436
public boolean isOnlyLogRecordMetadata() {
435437
return this.onlyLogRecordMetadata;
436438
}
437439

438440
/**
439-
* Set to false to log {@code record.toString()} in log messages instead
440-
* of {@code topic-partition@offset}.
441+
* Set to false to log {@code record.toString()} in log messages instead of
442+
* {@code topic-partition@offset}.
441443
* @param onlyLogRecordMetadata false to log the entire record.
442444
* @since 2.2.14
445+
* @deprecated in favor of
446+
* {@link KafkaUtils#setConsumerRecordFormatter(java.util.function.Function)}.
443447
*/
448+
@Deprecated
444449
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
445450
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
446451
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -339,7 +339,7 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions
339339
this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;
340340
}
341341

342-
@SuppressWarnings("unchecked")
342+
@SuppressWarnings({ "unchecked", "deprecation" })
343343
@Override
344344
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
345345
TopicPartition tp = this.destinationResolver.apply(record, exception);
@@ -406,6 +406,7 @@ private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
406406
}
407407

408408
private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
409+
@SuppressWarnings("deprecation")
409410
String message = String.format("No destination returned for record %s and exception %s. " +
410411
"failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception,
411412
this.throwIfNoDestinationReturned);
@@ -518,6 +519,7 @@ protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?,
518519
* @param inRecord the consumer record.
519520
* @since 2.2.5
520521
*/
522+
@SuppressWarnings("deprecation")
521523
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
522524
ConsumerRecord<?, ?> inRecord) {
523525

@@ -559,6 +561,7 @@ private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
559561
}
560562
}
561563

564+
@SuppressWarnings("deprecation")
562565
private String pubFailMessage(ProducerRecord<Object, Object> outRecord, ConsumerRecord<?, ?> inRecord) {
563566
return "Dead-letter publication to "
564567
+ outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true);

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -105,6 +105,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
105105
* @param records the records.
106106
* @return the String.
107107
*/
108+
@SuppressWarnings("deprecation")
108109
public static String recordsToString(ConsumerRecords<?, ?> records) {
109110
StringBuffer sb = new StringBuffer();
110111
records.spliterator().forEachRemaining(rec -> sb

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -126,6 +126,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
126126
* @return the {@link BiPredicate}.
127127
* @deprecated in favor of {@link #getRecoveryStrategy(List, Exception)}.
128128
*/
129+
@SuppressWarnings("deprecation")
129130
@Deprecated
130131
protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
131132
Exception thrownException) {
@@ -169,6 +170,7 @@ protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> record
169170
* @return the {@link RecoveryStrategy}.
170171
* @since 2.8.4
171172
*/
173+
@SuppressWarnings("deprecation")
172174
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
173175
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
174176
if (getClassifier().classify(thrownException)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class FailedRecordTracker implements RecoveryStrategy {
6161

6262
private boolean resetStateOnExceptionChange;
6363

64+
@SuppressWarnings("deprecation")
6465
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
6566
LogAccessor logger) {
6667

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,7 @@ public boolean isLongLived() {
12291229
return true;
12301230
}
12311231

1232+
@SuppressWarnings("deprecation")
12321233
@Override // NOSONAR complexity
12331234
public void run() {
12341235
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
@@ -1811,6 +1812,7 @@ record = this.acks.poll();
18111812
}
18121813
}
18131814

1815+
@SuppressWarnings("deprecation")
18141816
private void traceAck(ConsumerRecord<K, V> record) {
18151817
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record, true));
18161818
}
@@ -1885,6 +1887,7 @@ private void processAcks(ConsumerRecords<K, V> records) {
18851887
}
18861888
}
18871889

1890+
@SuppressWarnings("deprecation")
18881891
private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
18891892
TopicPartition part = new TopicPartition(record.topic(), record.partition());
18901893
List<Long> offs = this.offsetsInThisBatch.get(part);
@@ -2303,7 +2306,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
23032306
* Invoke the listener with each record in a separate transaction.
23042307
* @param records the records.
23052308
*/
2306-
@SuppressWarnings(RAW_TYPES) // NOSONAR complexity
2309+
@SuppressWarnings("deprecation") // NOSONAR complexity
23072310
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23082311
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
23092312
while (iterator.hasNext()) {
@@ -2405,6 +2408,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
24052408
}
24062409
}
24072410

2411+
@SuppressWarnings("deprecation")
24082412
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
24092413
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
24102414
while (iterator.hasNext()) {
@@ -2440,6 +2444,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
24402444
return next;
24412445
}
24422446

2447+
@SuppressWarnings("deprecation")
24432448
@Nullable
24442449
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
24452450
internalHeaders(recordArg);
@@ -2600,14 +2605,15 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
26002605
}
26012606
}
26022607

2608+
@SuppressWarnings("deprecation")
26032609
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
26042610
ConsumerRecord<K, V> record = recordArg;
26052611
if (this.recordInterceptor != null) {
26062612
record = this.recordInterceptor.intercept(record, this.consumer);
26072613
}
26082614
if (record == null) {
2609-
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2610-
+ ListenerUtils.recordToString(recordArg));
2615+
this.logger.debug(() -> ("RecordInterceptor returned null, skipping: "
2616+
+ ListenerUtils.recordToString(recordArg)));
26112617
}
26122618
else {
26132619
try {
@@ -3165,6 +3171,7 @@ public void nack(long sleep) {
31653171
}
31663172

31673173
@Override
3174+
@SuppressWarnings("deprecation")
31683175
public String toString() {
31693176
return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true);
31703177
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,8 @@ public final class ListenerUtils {
4949
private ListenerUtils() {
5050
}
5151

52+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
53+
5254
private static final int DEFAULT_SLEEP_INTERVAL = 100;
5355

5456
private static final int SMALL_SLEEP_INTERVAL = 10;
@@ -150,10 +152,12 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas
150152
* Set to true to only log record metadata.
151153
* @param onlyMeta true to only log record metadata.
152154
* @since 2.2.14
155+
* @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}.
153156
* @see #recordToString(ConsumerRecord)
154157
*/
158+
@Deprecated
155159
public static void setLogOnlyMetadata(boolean onlyMeta) {
156-
KafkaUtils.setLogOnlyMetadata(onlyMeta);
160+
LOG_METADATA_ONLY.set(onlyMeta);
157161
}
158162

159163
/**
@@ -162,10 +166,12 @@ public static void setLogOnlyMetadata(boolean onlyMeta) {
162166
* @param record the record.
163167
* @return the rendered record.
164168
* @since 2.2.14
169+
* @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}.
165170
* @see #setLogOnlyMetadata(boolean)
166171
*/
172+
@Deprecated
167173
public static String recordToString(ConsumerRecord<?, ?> record) {
168-
return KafkaUtils.recordToString(record);
174+
return recordToString(record, Boolean.TRUE.equals(LOG_METADATA_ONLY.get()));
169175
}
170176

171177
/**
@@ -175,9 +181,11 @@ public static String recordToString(ConsumerRecord<?, ?> record) {
175181
* @param meta true to log just the metadata.
176182
* @return the rendered record.
177183
* @since 2.5.4
184+
* @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}.
178185
*/
186+
@Deprecated
179187
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
180-
return KafkaUtils.recordToString(record, meta);
188+
return KafkaUtils.format(record, !meta);
181189
}
182190

183191
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -92,6 +92,7 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
9292
* @param logger a {@link LogAccessor} for seek errors.
9393
* @return true if the failed record was skipped.
9494
*/
95+
@SuppressWarnings("deprecation")
9596
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
9697
boolean recoverable, RecoveryStrategy recovery, @Nullable MessageListenerContainer container,
9798
LogAccessor logger) {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
127127
data.forEach(record -> {
128128
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
129129
if (correlation == null) {
130-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
130+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
131131
+ " - to use request/reply semantics, the responding server must return the correlation id "
132132
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
133133
}

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
457457
correlationId = new CorrelationKey(correlationHeader.value());
458458
}
459459
if (correlationId == null) {
460-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
460+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
461461
+ " - to use request/reply semantics, the responding server must return the correlation id "
462462
+ " in the '" + this.correlationHeaderName + "' header");
463463
}
@@ -475,7 +475,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
475475
future.setException(exception);
476476
}
477477
if (ok) {
478-
this.logger.debug(() -> "Received: " + KafkaUtils.recordToString(record)
478+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
479479
+ WITH_CORRELATION_ID + correlationKey);
480480
future.set(record);
481481
}
@@ -543,7 +543,7 @@ protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correl
543543
}
544544

545545
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
546-
return "No pending reply: " + KafkaUtils.recordToString(record) + WITH_CORRELATION_ID
546+
return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID
547547
+ correlationId + ", perhaps timed out, or using a shared reply topic";
548548
}
549549

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ static class LoggingDltListenerHandlerMethod {
438438

439439
public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";
440440

441+
@SuppressWarnings("deprecation")
441442
public void logMessage(Object message) {
442443
if (message instanceof ConsumerRecord) {
443444
LOGGER.info(() -> "Received message in dlt listener: "

0 commit comments

Comments
 (0)