Skip to content

Commit 3850a01

Browse files
garyrussellartembilan
authored andcommitted
GH-2419: DLPR: Protect Against Non-Compliant PF
Resolves #2419 We try to obtain the configured request timeout using the producer factory's configuration properties. Some wrappers (e.g. open tracing) do not implement this method, so fallback to the default instead of throwing an USOE. **cherry-pick to 2.9.x, 2.8.x** * Fix `CompletableFuture` conflicts in the tests
1 parent 5c776cd commit 3850a01

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,13 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
703703
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
704704
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
705705
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
706-
Map<String, Object> props = producerFactory.getConfigurationProperties();
706+
Map<String, Object> props;
707+
try {
708+
props = producerFactory.getConfigurationProperties();
709+
}
710+
catch (UnsupportedOperationException ex) {
711+
props = Collections.emptyMap();
712+
}
707713
if (props != null) { // NOSONAR - will only occur in mock tests
708714
return KafkaUtils.determineSendTimeout(props, this.timeoutBuffer,
709715
this.waitForSendResultTimeout.toMillis());

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.given;
2525
import static org.mockito.BDDMockito.then;
2626
import static org.mockito.BDDMockito.willAnswer;
27+
import static org.mockito.BDDMockito.willCallRealMethod;
2728
import static org.mockito.BDDMockito.willReturn;
2829
import static org.mockito.Mockito.atLeastOnce;
2930
import static org.mockito.Mockito.mock;
@@ -315,7 +316,7 @@ private byte[] header(boolean isKey, DeserializationException deserEx) {
315316
return baos.toByteArray();
316317
}
317318

318-
@SuppressWarnings({"unchecked", "rawtypes"})
319+
@SuppressWarnings({ "unchecked", "rawtypes" })
319320
@Test
320321
void allOriginalHeaders() {
321322
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -335,7 +336,7 @@ void allOriginalHeaders() {
335336
assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull();
336337
}
337338

338-
@SuppressWarnings({"unchecked", "rawtypes"})
339+
@SuppressWarnings({ "unchecked", "rawtypes" })
339340
@Test
340341
void dontAppendOriginalHeaders() {
341342
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -386,7 +387,7 @@ void dontAppendOriginalHeaders() {
386387
assertThat(exceptionHeaders.hasNext()).isFalse();
387388
}
388389

389-
@SuppressWarnings({"unchecked", "rawtypes"})
390+
@SuppressWarnings({ "unchecked", "rawtypes" })
390391
@Test
391392
void appendOriginalHeaders() {
392393
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
@@ -825,13 +826,13 @@ void addHeaderFunctionsProcessedInOrder() {
825826
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
826827
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
827828
recoverer.setHeadersFunction((rec, ex) -> {
828-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) });
829+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "one".getBytes()) });
829830
});
830831
recoverer.addHeadersFunction((rec, ex) -> {
831-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) });
832+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("bar", "two".getBytes()) });
832833
});
833834
recoverer.addHeadersFunction((rec, ex) -> {
834-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "three".getBytes()) });
835+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "three".getBytes()) });
835836
});
836837
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
837838
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -862,12 +863,12 @@ void immutableHeaders() {
862863
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
863864
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
864865
recoverer.setHeadersFunction((rec, ex) -> {
865-
RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) });
866+
RecordHeaders headers = new RecordHeaders(new RecordHeader[]{ new RecordHeader("foo", "one".getBytes()) });
866867
headers.setReadOnly();
867868
return headers;
868869
});
869870
recoverer.addHeadersFunction((rec, ex) -> {
870-
return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) });
871+
return new RecordHeaders(new RecordHeader[]{ new RecordHeader("bar", "two".getBytes()) });
871872
});
872873
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
873874
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
@@ -877,4 +878,25 @@ void immutableHeaders() {
877878
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
878879
}
879880

881+
@SuppressWarnings("unchecked")
882+
@Test
883+
void nonCompliantProducerFactory() throws Exception {
884+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
885+
ProducerFactory pf = mock(ProducerFactory.class);
886+
887+
willCallRealMethod().given(pf).getConfigurationProperties();
888+
889+
given(template.getProducerFactory()).willReturn(pf);
890+
ListenableFuture<?> future = mock(ListenableFuture.class);
891+
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
892+
given(template.send(any(ProducerRecord.class))).willReturn(future);
893+
given(future.get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS))).willThrow(new TimeoutException());
894+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
895+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
896+
recoverer.setFailIfSendResultIsError(true);
897+
assertThatThrownBy(() -> recoverer.accept(record, new RuntimeException()))
898+
.isExactlyInstanceOf(KafkaException.class);
899+
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
900+
}
901+
880902
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import org.springframework.context.ApplicationEvent;
4343
import org.springframework.context.ApplicationEventPublisher;
44+
import org.springframework.kafka.core.ConsumerFactory;
4445
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4546
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4647
import org.springframework.kafka.core.KafkaOperations;

0 commit comments

Comments
 (0)