Skip to content

Commit bd3d43f

Browse files
garyrussellartembilan
authored andcommitted
Partial Revert "GH-2132: Producer Record Log Fo..."
Resolves #2132 This reverts commit 297f52d. Just the consumer record part.
1 parent fccd680 commit bd3d43f

File tree

4 files changed

+23
-51
lines changed

4 files changed

+23
-51
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.kafka.common.header.internals.RecordHeaders;
3030

3131
import org.springframework.core.log.LogAccessor;
32-
import org.springframework.kafka.support.KafkaUtils;
3332
import org.springframework.kafka.support.serializer.DeserializationException;
3433
import org.springframework.lang.Nullable;
3534
import org.springframework.util.Assert;
@@ -48,6 +47,8 @@ public final class ListenerUtils {
4847
private ListenerUtils() {
4948
}
5049

50+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
51+
5152
private static final int DEFAULT_SLEEP_INTERVAL = 100;
5253

5354
private static final int SMALL_SLEEP_INTERVAL = 10;
@@ -120,7 +121,7 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
120121
* @see #recordToString(ConsumerRecord)
121122
*/
122123
public static void setLogOnlyMetadata(boolean onlyMeta) {
123-
KafkaUtils.setLogOnlyMetadata(onlyMeta);
124+
LOG_METADATA_ONLY.set(onlyMeta);
124125
}
125126

126127
/**
@@ -132,7 +133,12 @@ public static void setLogOnlyMetadata(boolean onlyMeta) {
132133
* @see #setLogOnlyMetadata(boolean)
133134
*/
134135
public static String recordToString(ConsumerRecord<?, ?> record) {
135-
return KafkaUtils.recordToString(record);
136+
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
137+
return record.topic() + "-" + record.partition() + "@" + record.offset();
138+
}
139+
else {
140+
return record.toString();
141+
}
136142
}
137143

138144
/**
@@ -144,7 +150,12 @@ public static String recordToString(ConsumerRecord<?, ?> record) {
144150
* @since 2.5.4
145151
*/
146152
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
147-
return KafkaUtils.recordToString(record, meta);
153+
if (meta) {
154+
return record.topic() + "-" + record.partition() + "@" + record.offset();
155+
}
156+
else {
157+
return record.toString();
158+
}
148159
}
149160

150161
/**

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
127127
data.forEach(record -> {
128128
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
129129
if (correlation == null) {
130-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
130+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
131131
+ " - to use request/reply semantics, the responding server must return the correlation id "
132132
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
133133
}

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
457457
correlationId = new CorrelationKey(correlationHeader.value());
458458
}
459459
if (correlationId == null) {
460-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record)
460+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
461461
+ " - to use request/reply semantics, the responding server must return the correlation id "
462462
+ " in the '" + this.correlationHeaderName + "' header");
463463
}
@@ -475,7 +475,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
475475
future.setException(exception);
476476
}
477477
if (ok) {
478-
this.logger.debug(() -> "Received: " + KafkaUtils.recordToString(record)
478+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
479479
+ WITH_CORRELATION_ID + correlationKey);
480480
future.set(record);
481481
}
@@ -543,7 +543,7 @@ protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correl
543543
}
544544

545545
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
546-
return "No pending reply: " + KafkaUtils.recordToString(record) + WITH_CORRELATION_ID
546+
return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID
547547
+ correlationId + ", perhaps timed out, or using a shared reply topic";
548548
}
549549

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
*/
4343
public final class KafkaUtils {
4444

45-
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
46-
4745
private static Function<ProducerRecord<?, ?>, String> prFormatter = rec -> rec.toString();
4846

4947
private static Function<ConsumerRecord<?, ?>, String> crFormatter =
@@ -145,49 +143,10 @@ else if (dt instanceof String) {
145143
min));
146144
}
147145

148-
/**
149-
* Set to true to only log record metadata.
150-
* @param onlyMeta true to only log record metadata.
151-
* @since 2.7.12
152-
* @see #recordToString(ConsumerRecord)
153-
*/
154-
public static void setLogOnlyMetadata(boolean onlyMeta) {
155-
LOG_METADATA_ONLY.set(onlyMeta);
156-
}
157-
158-
/**
159-
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
160-
* {@code topic-partition@offset}.
161-
* @param record the record.
162-
* @return the rendered record.
163-
* @since 2.7.12
164-
* @see #setLogOnlyMetadata(boolean)
165-
*/
166-
public static String recordToString(ConsumerRecord<?, ?> record) {
167-
return recordToString(record, Boolean.TRUE.equals(LOG_METADATA_ONLY.get()));
168-
}
169-
170-
/**
171-
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
172-
* {@code topic-partition@offset}.
173-
* @param record the record.
174-
* @param meta true to log just the metadata.
175-
* @return the rendered record.
176-
* @since 2.7.12
177-
*/
178-
public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
179-
if (meta) {
180-
return crFormatter.apply(record);
181-
}
182-
else {
183-
return record.toString();
184-
}
185-
}
186-
187146
/**
188147
* Set a formatter for logging {@link ConsumerRecord}s.
189148
* @param formatter a function to format the record as a String
190-
* @since 2.7.11
149+
* @since 2.7.12
191150
*/
192151
public static void setConsumerRecordFormatter(Function<ConsumerRecord<?, ?>, String> formatter) {
193152
Assert.notNull(formatter, "'formatter' cannot be null");
@@ -197,7 +156,7 @@ public static void setConsumerRecordFormatter(Function<ConsumerRecord<?, ?>, Str
197156
/**
198157
* Set a formatter for logging {@link ProducerRecord}s.
199158
* @param formatter a function to format the record as a String
200-
* @since 2.7.11
159+
* @since 2.7.12
201160
*/
202161
public static void setProducerRecordFormatter(Function<ProducerRecord<?, ?>, String> formatter) {
203162
Assert.notNull(formatter, "'formatter' cannot be null");
@@ -209,6 +168,7 @@ public static void setProducerRecordFormatter(Function<ProducerRecord<?, ?>, Str
209168
* {@code topic-partition@offset}.
210169
* @param record the record to format.
211170
* @return the formatted String.
171+
* @since 2.7.12
212172
*/
213173
public static String format(ConsumerRecord<?, ?> record) {
214174
return crFormatter.apply(record);
@@ -219,6 +179,7 @@ public static String format(ConsumerRecord<?, ?> record) {
219179
* {@link ProducerRecord}{@link #toString()}.
220180
* @param record the record to format.
221181
* @return the formatted String.
182+
* @since 2.7.12
222183
*/
223184
public static String format(ProducerRecord<?, ?> record) {
224185
return prFormatter.apply(record);

0 commit comments

Comments
 (0)