@@ -839,17 +839,17 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
839
839
}
840
840
}
841
841
catch (Exception e ) {
842
- KafkaTemplate . this .logger .warn (e , () -> "Error executing interceptor onAcknowledgement callback" );
842
+ this .logger .warn (e , () -> "Error executing interceptor onAcknowledgement callback" );
843
843
}
844
844
try {
845
845
if (exception == null ) {
846
846
successTimer (sample , producerRecord );
847
847
observation .stop ();
848
848
future .complete (new SendResult <>(producerRecord , metadata ));
849
- if (KafkaTemplate . this .producerListener != null ) {
850
- KafkaTemplate . this .producerListener .onSuccess (producerRecord , metadata );
849
+ if (this .producerListener != null ) {
850
+ this .producerListener .onSuccess (producerRecord , metadata );
851
851
}
852
- KafkaTemplate . this .logger .trace (() -> "Sent ok: " + KafkaUtils .format (producerRecord )
852
+ this .logger .trace (() -> "Sent ok: " + KafkaUtils .format (producerRecord )
853
853
+ ", metadata: " + metadata );
854
854
}
855
855
else {
@@ -858,17 +858,14 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
858
858
observation .stop ();
859
859
future .completeExceptionally (
860
860
new KafkaProducerException (producerRecord , "Failed to send" , exception ));
861
- if (KafkaTemplate . this .producerListener != null ) {
862
- KafkaTemplate . this .producerListener .onError (producerRecord , metadata , exception );
861
+ if (this .producerListener != null ) {
862
+ this .producerListener .onError (producerRecord , metadata , exception );
863
863
}
864
- KafkaTemplate .this .logger .debug (exception , () -> "Failed to send: "
865
- + KafkaUtils .format (producerRecord ));
864
+ this .logger .debug (exception , () -> "Failed to send: " + KafkaUtils .format (producerRecord ));
866
865
}
867
866
}
868
867
finally {
869
- if (!KafkaTemplate .this .transactional ) {
870
- closeProducer (producer , false );
871
- }
868
+ closeProducer (producer , this .transactional );
872
869
}
873
870
};
874
871
}
@@ -985,7 +982,6 @@ public void destroy() {
985
982
}
986
983
}
987
984
988
- @ SuppressWarnings ("serial" )
989
985
private static final class SkipAbortException extends RuntimeException {
990
986
991
987
SkipAbortException (Throwable cause ) {
0 commit comments