Skip to content

Commit 5bd08c0

Browse files
garyrussellartembilan
authored andcommitted
GH-2419: DLPR: Protect Against Non-Compliant PF
Resolves #2419 We try to obtain the configured request timeout using the producer factory's configuration properties. Some wrappers (e.g. open tracing) do not implement this method, so fallback to the default instead of throwing an USOE. **cherry-pick to 2.9.x, 2.8.x** * Fix `CompletableFuture` conflicts in the tests
1 parent f6cbf11 commit 5bd08c0

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,13 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
703703
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
704704
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
705705
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
706-
Map<String, Object> props = producerFactory.getConfigurationProperties();
706+
Map<String, Object> props;
707+
try {
708+
props = producerFactory.getConfigurationProperties();
709+
}
710+
catch (UnsupportedOperationException ex) {
711+
props = Collections.emptyMap();
712+
}
707713
if (props != null) { // NOSONAR - will only occur in mock tests
708714
return KafkaUtils.determineSendTimeout(props, this.timeoutBuffer,
709715
this.waitForSendResultTimeout.toMillis());

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.given;
2525
import static org.mockito.BDDMockito.then;
2626
import static org.mockito.BDDMockito.willAnswer;
27+
import static org.mockito.BDDMockito.willCallRealMethod;
2728
import static org.mockito.BDDMockito.willReturn;
2829
import static org.mockito.Mockito.atLeastOnce;
2930
import static org.mockito.Mockito.mock;
@@ -314,7 +315,7 @@ private byte[] header(boolean isKey, DeserializationException deserEx) {
314315
return baos.toByteArray();
315316
}
316317

317-
@SuppressWarnings({"unchecked", "rawtypes"})
318+
@SuppressWarnings({ "unchecked", "rawtypes" })
318319
@Test
319320
void allOriginalHeaders() {
320321
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -334,7 +335,7 @@ void allOriginalHeaders() {
334335
assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull();
335336
}
336337

337-
@SuppressWarnings({"unchecked", "rawtypes"})
338+
@SuppressWarnings({ "unchecked", "rawtypes" })
338339
@Test
339340
void dontAppendOriginalHeaders() {
340341
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -385,7 +386,7 @@ void dontAppendOriginalHeaders() {
385386
assertThat(exceptionHeaders.hasNext()).isFalse();
386387
}
387388

388-
@SuppressWarnings({"unchecked", "rawtypes"})
389+
@SuppressWarnings({ "unchecked", "rawtypes" })
389390
@Test
390391
void appendOriginalHeaders() {
391392
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -824,13 +825,13 @@ void addHeaderFunctionsProcessedInOrder() {
824825
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
825826
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
826827
recoverer.setHeadersFunction((rec, ex) -> {
827-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) });
828+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "one".getBytes()) });
828829
});
829830
recoverer.addHeadersFunction((rec, ex) -> {
830-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) });
831+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("bar", "two".getBytes()) });
831832
});
832833
recoverer.addHeadersFunction((rec, ex) -> {
833-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "three".getBytes()) });
834+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "three".getBytes()) });
834835
});
835836
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
836837
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -861,12 +862,12 @@ void immutableHeaders() {
861862
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
862863
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
863864
recoverer.setHeadersFunction((rec, ex) -> {
864-
RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) });
865+
RecordHeaders headers = new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "one".getBytes()) });
865866
headers.setReadOnly();
866867
return headers;
867868
});
868869
recoverer.addHeadersFunction((rec, ex) -> {
869-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) });
870+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("bar", "two".getBytes()) });
870871
});
871872
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
872873
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -876,4 +877,25 @@ void immutableHeaders() {
876877
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
877878
}
878879

880+
@SuppressWarnings("unchecked")
881+
@Test
882+
void nonCompliantProducerFactory() throws Exception {
883+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
884+
ProducerFactory pf = mock(ProducerFactory.class);
885+
886+
willCallRealMethod().given(pf).getConfigurationProperties();
887+
888+
given(template.getProducerFactory()).willReturn(pf);
889+
ListenableFuture<?> future = mock(ListenableFuture.class);
890+
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
891+
given(template.send(any(ProducerRecord.class))).willReturn(future);
892+
given(future.get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS))).willThrow(new TimeoutException());
893+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
894+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
895+
recoverer.setFailIfSendResultIsError(true);
896+
assertThatThrownBy(() -> recoverer.accept(record, new RuntimeException()))
897+
.isExactlyInstanceOf(KafkaException.class);
898+
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
899+
}
900+
879901
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import org.springframework.context.ApplicationEvent;
4343
import org.springframework.context.ApplicationEventPublisher;
44+
import org.springframework.kafka.core.ConsumerFactory;
4445
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4546
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4647
import org.springframework.kafka.core.KafkaOperations;

0 commit comments

Comments
 (0)