1
1
/*
2
- * Copyright 2018-2021 the original author or authors.
2
+ * Copyright 2018-2022 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
55
55
import org .springframework .kafka .core .DefaultKafkaConsumerFactory ;
56
56
import org .springframework .kafka .listener .ConsumerAwareRebalanceListener ;
57
57
import org .springframework .kafka .listener .ConsumerProperties ;
58
- import org .springframework .kafka .listener .ListenerUtils ;
59
58
import org .springframework .kafka .listener .LoggingCommitCallback ;
60
59
import org .springframework .kafka .support .Acknowledgment ;
61
60
import org .springframework .kafka .support .KafkaHeaders ;
61
+ import org .springframework .kafka .support .KafkaUtils ;
62
62
import org .springframework .kafka .support .LogIfLevelEnabled ;
63
63
import org .springframework .kafka .support .TopicPartitionOffset ;
64
64
import org .springframework .kafka .support .converter .KafkaMessageHeaders ;
@@ -656,21 +656,13 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
656
656
657
657
/**
658
658
* AcknowledgmentCallbackFactory for KafkaAckInfo.
659
+ * @param consumerProperties the properties.
659
660
* @param <K> the key type.
660
661
* @param <V> the value type.
661
662
*
662
663
*/
663
- public static class KafkaAckCallbackFactory <K , V > implements AcknowledgmentCallbackFactory <KafkaAckInfo <K , V >> {
664
-
665
- private final ConsumerProperties consumerProperties ;
666
-
667
- /**
668
- * Construct an instance with the provided properties.
669
- * @param consumerProperties the properties.
670
- */
671
- public KafkaAckCallbackFactory (ConsumerProperties consumerProperties ) {
672
- this .consumerProperties = consumerProperties ;
673
- }
664
+ public record KafkaAckCallbackFactory <K , V >(ConsumerProperties consumerProperties )
665
+ implements AcknowledgmentCallbackFactory <KafkaAckInfo <K , V >> {
674
666
675
667
@ Override
676
668
public AcknowledgmentCallback createCallback (KafkaAckInfo <K , V > info ) {
@@ -737,15 +729,10 @@ public void acknowledge(Status status) {
737
729
try {
738
730
ConsumerRecord <K , V > record = this .ackInfo .getRecord ();
739
731
switch (status ) {
740
- case ACCEPT :
741
- case REJECT :
742
- commitIfPossible (record );
743
- break ;
744
- case REQUEUE :
745
- rollback (record );
746
- break ;
747
- default :
748
- break ;
732
+ case ACCEPT , REJECT -> commitIfPossible (record );
733
+ case REQUEUE -> rollback (record );
734
+ default -> {
735
+ }
749
736
}
750
737
}
751
738
catch (WakeupException e ) {
@@ -774,7 +761,8 @@ private void rollback(ConsumerRecord<K, V> record) {
774
761
})
775
762
.collect (Collectors .toList ());
776
763
if (rewound .size () > 0 ) {
777
- this .logger .warn (() -> "Rolled back " + ListenerUtils .recordToString (record , this .logOnlyMetadata )
764
+ KafkaUtils .setLogOnlyMetadata (this .logOnlyMetadata );
765
+ this .logger .warn (() -> "Rolled back " + KafkaUtils .format (record )
778
766
+ " later in-flight offsets "
779
767
+ rewound + " will also be re-fetched" );
780
768
}
@@ -783,9 +771,10 @@ private void rollback(ConsumerRecord<K, V> record) {
783
771
}
784
772
785
773
private void commitIfPossible (ConsumerRecord <K , V > record ) { // NOSONAR
774
+ KafkaUtils .setLogOnlyMetadata (this .logOnlyMetadata );
786
775
if (this .ackInfo .isRolledBack ()) {
787
776
this .logger .warn (() -> "Cannot commit offset for "
788
- + ListenerUtils . recordToString (record , this . logOnlyMetadata )
777
+ + KafkaUtils . format (record )
789
778
+ "; an earlier offset was rolled back" );
790
779
}
791
780
else {
@@ -809,16 +798,14 @@ private void commitIfPossible(ConsumerRecord<K, V> record) { // NOSONAR
809
798
ackInformation = toCommit .get (toCommit .size () - 1 );
810
799
KafkaAckInfo <K , V > ackInformationToLog = ackInformation ;
811
800
this .commitLogger .log (() -> "Committing pending offsets for "
812
- + ListenerUtils . recordToString (record , this . logOnlyMetadata )
801
+ + KafkaUtils . format (record )
813
802
+ " and all deferred to "
814
- + ListenerUtils .recordToString (ackInformationToLog .getRecord (),
815
- this .logOnlyMetadata ));
816
- candidates .removeAll (toCommit );
803
+ + KafkaUtils .format (ackInformationToLog .getRecord ()));
804
+ toCommit .forEach (candidates ::remove );
817
805
}
818
806
else {
819
807
ackInformation = this .ackInfo ;
820
- this .commitLogger .log (() -> "Committing offset for "
821
- + ListenerUtils .recordToString (record , this .logOnlyMetadata ));
808
+ this .commitLogger .log (() -> "Committing offset for " + KafkaUtils .format (record ));
822
809
}
823
810
}
824
811
else { // earlier offsets present
0 commit comments