Skip to content

Commit 6473c33

Browse files
garyrussellartembilan
authored andcommitted
GH-2269: Improve DLPR Extensibility
Resolves #2269 - add getters for fields used in protected methods - change more methods to protected **Cherry-pick to 2.9.x, 2.8.x, 2.7.x** If it doesn't cherry-pick cleanly, I will back-port.
1 parent 968df68 commit 6473c33

File tree

1 file changed

+50
-5
lines changed

1 file changed

+50
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,18 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
310310
}
311311

312312
/**
313-
* Set the minumum time to wait for message sending. Default is the producer
313+
* If true, wait for the send result and throw an exception if it fails.
314+
* It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
315+
* @return true to wait.
316+
* @since 2.7.14
317+
* @see #setWaitForSendResultTimeout(Duration)
318+
*/
319+
protected boolean isFailIfSendResultIsError() {
320+
return this.failIfSendResultIsError;
321+
}
322+
323+
/**
324+
* Set the minimum time to wait for message sending. Default is the producer
314325
* configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
315326
* @param waitForSendResultTimeout the timeout.
316327
* @since 2.7
@@ -322,8 +333,9 @@ public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
322333
}
323334

324335
/**
325-
* Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
326-
* property to avoid timing out before the Kafka producer. Default 5000.
336+
* Set the number of milliseconds to add to the producer configuration
337+
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
338+
* Default 5000.
327339
* @param buffer the buffer.
328340
* @since 2.7
329341
* @see #setWaitForSendResultTimeout(Duration)
@@ -332,6 +344,16 @@ public void setTimeoutBuffer(long buffer) {
332344
this.timeoutBuffer = buffer;
333345
}
334346

347+
/**
348+
* The number of milliseconds to add to the producer configuration
349+
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
350+
* @return the buffer.
351+
* @since 2.7.14
352+
*/
353+
protected long getTimeoutBuffer() {
354+
return this.timeoutBuffer;
355+
}
356+
335357
/**
336358
* Set to false to retain previous exception headers as well as headers for the
337359
* current exception. Default is true, which means only the current headers are
@@ -366,6 +388,15 @@ public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) {
366388
this.exceptionHeadersCreator = headersCreator;
367389
}
368390

391+
/**
392+
* True if publishing should run in a transaction.
393+
* @return true for transactional.
394+
* @since 2.7.14
395+
*/
396+
protected boolean isTransactional() {
397+
return this.transactional;
398+
}
399+
369400
/**
370401
* Clear the header inclusion bit for the header name.
371402
* @param headers the headers to clear.
@@ -629,7 +660,14 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
629660
}
630661
}
631662

632-
private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
663+
/**
664+
* Wait for the send future to complete.
665+
* @param kafkaTemplate the template used to send the record.
666+
* @param outRecord the record.
667+
* @param sendResult the future.
668+
* @param inRecord the original consumer record.
669+
*/
670+
protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
633671
ProducerRecord<Object, Object> outRecord,
634672
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult, ConsumerRecord<?, ?> inRecord) {
635673

@@ -655,7 +693,14 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
655693
+ outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true);
656694
}
657695

658-
private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
696+
/**
697+
* Determine the send timeout based on the template's producer factory and
698+
* {@link #setWaitForSendResultTimeout(Duration)}.
699+
* @param template the template.
700+
* @return the timeout.
701+
* @since 2.7.14
702+
*/
703+
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
659704
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
660705
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
661706
Map<String, Object> props = producerFactory.getConfigurationProperties();

0 commit comments

Comments
 (0)