diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 9fe9e75574..698d3ac895 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -839,17 +839,17 @@ private Callback buildCallback(final ProducerRecord producerRecord, final } } catch (Exception e) { - KafkaTemplate.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); + this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); } try { if (exception == null) { successTimer(sample, producerRecord); observation.stop(); future.complete(new SendResult<>(producerRecord, metadata)); - if (KafkaTemplate.this.producerListener != null) { - KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata); + if (this.producerListener != null) { + this.producerListener.onSuccess(producerRecord, metadata); } - KafkaTemplate.this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord) + this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord) + ", metadata: " + metadata); } else { @@ -858,17 +858,14 @@ private Callback buildCallback(final ProducerRecord producerRecord, final observation.stop(); future.completeExceptionally( new KafkaProducerException(producerRecord, "Failed to send", exception)); - if (KafkaTemplate.this.producerListener != null) { - KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception); + if (this.producerListener != null) { + this.producerListener.onError(producerRecord, metadata, exception); } - KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " - + KafkaUtils.format(producerRecord)); + this.logger.debug(exception, () -> "Failed to send: " + KafkaUtils.format(producerRecord)); } } finally { - if (!KafkaTemplate.this.transactional) { - closeProducer(producer, false); - } + closeProducer(producer, this.transactional); } }; } @@ -985,7 +982,6 @@ public void destroy() { } } - @SuppressWarnings("serial") private static final class SkipAbortException extends RuntimeException { SkipAbortException(Throwable cause) {