55
55
import org .springframework .kafka .core .DefaultKafkaConsumerFactory ;
56
56
import org .springframework .kafka .listener .ConsumerAwareRebalanceListener ;
57
57
import org .springframework .kafka .listener .ConsumerProperties ;
58
+ import org .springframework .kafka .listener .ListenerUtils ;
58
59
import org .springframework .kafka .listener .LoggingCommitCallback ;
59
60
import org .springframework .kafka .support .Acknowledgment ;
60
61
import org .springframework .kafka .support .KafkaHeaders ;
@@ -667,6 +668,8 @@ public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Ac
667
668
668
669
private final boolean isSyncCommits ;
669
670
671
+ private final boolean logOnlyMetadata ;
672
+
670
673
private volatile boolean acknowledged ;
671
674
672
675
private boolean autoAckEnabled = true ;
@@ -690,6 +693,7 @@ public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties
690
693
consumerProperties != null
691
694
? consumerProperties .getCommitLogLevel ()
692
695
: LogIfLevelEnabled .Level .DEBUG );
696
+ this .logOnlyMetadata = consumerProperties .isOnlyLogRecordMetadata ();
693
697
}
694
698
695
699
@ Override
@@ -739,7 +743,8 @@ private void rollback(ConsumerRecord<K, V> record) {
739
743
})
740
744
.collect (Collectors .toList ());
741
745
if (rewound .size () > 0 && this .logger .isWarnEnabled ()) {
742
- this .logger .warn ("Rolled back " + record + " later in-flight offsets "
746
+ this .logger .warn ("Rolled back " + ListenerUtils .recordToString (record , this .logOnlyMetadata )
747
+ + " later in-flight offsets "
743
748
+ rewound + " will also be re-fetched" );
744
749
}
745
750
}
@@ -749,7 +754,8 @@ private void rollback(ConsumerRecord<K, V> record) {
749
754
private void commitIfPossible (ConsumerRecord <K , V > record ) {
750
755
if (this .ackInfo .isRolledBack ()) {
751
756
if (this .logger .isWarnEnabled ()) {
752
- this .logger .warn ("Cannot commit offset for " + record
757
+ this .logger .warn ("Cannot commit offset for "
758
+ + ListenerUtils .recordToString (record , this .logOnlyMetadata )
753
759
+ "; an earlier offset was rolled back" );
754
760
}
755
761
}
@@ -773,13 +779,17 @@ private void commitIfPossible(ConsumerRecord<K, V> record) {
773
779
if (toCommit .size () > 0 ) {
774
780
ackInformation = toCommit .get (toCommit .size () - 1 );
775
781
KafkaAckInfo <K , V > ackInformationToLog = ackInformation ;
776
- this .commitLogger .log (() -> "Committing pending offsets for " + record
777
- + " and all deferred to " + ackInformationToLog .getRecord ());
782
+ this .commitLogger .log (() -> "Committing pending offsets for "
783
+ + ListenerUtils .recordToString (record , this .logOnlyMetadata )
784
+ + " and all deferred to "
785
+ + ListenerUtils .recordToString (ackInformationToLog .getRecord (),
786
+ this .logOnlyMetadata ));
778
787
candidates .removeAll (toCommit );
779
788
}
780
789
else {
781
790
ackInformation = this .ackInfo ;
782
- this .commitLogger .log (() -> "Committing offset for " + record );
791
+ this .commitLogger .log (() -> "Committing offset for "
792
+ + ListenerUtils .recordToString (record , this .logOnlyMetadata ));
783
793
}
784
794
}
785
795
else { // earlier offsets present
0 commit comments