Skip to content

Commit 90ce358

Browse files
committed
spring-projectsGH-1898: Fix OOM in LoggingProducerListener
Resolves spring-projects#1898 (comment) Large `byte[]` payloads could cause an OOM. **cherry-pick to 2.7.x, 2.6.x, 2.5.x**
1 parent b79994f commit 90ce358

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@ public class LoggingProducerListener<K, V> implements ProducerListener<K, V> {
4040
*/
4141
public static final int DEFAULT_MAX_CONTENT_LOGGED = 100;
4242

43-
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingProducerListener.class));
43+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
4444

4545
private boolean includeContents = true;
4646

@@ -68,15 +68,15 @@ public void setMaxContentLogged(int maxContentLogged) {
6868

6969
@Override
7070
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
71-
LOGGER.error(exception, () -> {
71+
this.logger.error(exception, () -> {
7272
StringBuffer logOutput = new StringBuffer();
7373
logOutput.append("Exception thrown when sending a message");
7474
if (this.includeContents) {
7575
logOutput.append(" with key='")
76-
.append(toDisplayString(ObjectUtils.nullSafeToString(record.key()), this.maxContentLogged))
76+
.append(keyOrValue(record.key()))
7777
.append("'")
7878
.append(" and payload='")
79-
.append(toDisplayString(ObjectUtils.nullSafeToString(record.value()), this.maxContentLogged))
79+
.append(keyOrValue(record.value()))
8080
.append("'");
8181
}
8282
logOutput.append(" to topic ").append(record.topic());
@@ -90,6 +90,15 @@ public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata record
9090
});
9191
}
9292

93+
private String keyOrValue(Object keyOrValue) {
94+
if (keyOrValue instanceof byte[]) {
95+
return "byte[" + ((byte[]) keyOrValue).length + "]";
96+
}
97+
else {
98+
return toDisplayString(ObjectUtils.nullSafeToString(keyOrValue), this.maxContentLogged);
99+
}
100+
}
101+
93102
private String toDisplayString(String original, int maxCharacters) {
94103
if (original.length() <= maxCharacters) {
95104
return original;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.willAnswer;
22+
import static org.mockito.Mockito.spy;
23+
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.Supplier;
26+
27+
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.beans.DirectFieldAccessor;
31+
import org.springframework.core.log.LogAccessor;
32+
import org.springframework.kafka.test.utils.KafkaTestUtils;
33+
34+
/**
35+
* @author Gary Russell
36+
* @since 2.5.16
37+
*
38+
*/
39+
public class LoggingProducerListenerTests {
40+
41+
@SuppressWarnings({ "unchecked", "rawtypes" })
42+
@Test
43+
void noBytesInLog() {
44+
LoggingProducerListener pl = new LoggingProducerListener();
45+
LogAccessor logger = (LogAccessor) spy(KafkaTestUtils.getPropertyValue(pl, "logger"));
46+
new DirectFieldAccessor(pl).setPropertyValue("logger", logger);
47+
AtomicReference<String> string = new AtomicReference<>();
48+
willAnswer(inv -> {
49+
Supplier<String> stringer = inv.getArgument(1);
50+
string.set(stringer.get());
51+
return null;
52+
}).given(logger).error(any(), any(Supplier.class));
53+
pl.onError(new ProducerRecord("foo", 0, new byte[3], new byte[1111]), null,
54+
new RuntimeException());
55+
assertThat(string.get()).contains("byte[3]");
56+
assertThat(string.get()).contains("byte[1111]");
57+
}
58+
59+
}

0 commit comments

Comments
 (0)