Skip to content

Commit f7046d7

Browse files
committed
GH-1189: Clear offsetsInThisBatch on nack()
1 parent ff35062 commit f7046d7

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2900,12 +2900,17 @@ public void nack(long sleep) {
29002900
"nack() can only be called on the consumer thread");
29012901
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
29022902
ListenerConsumer.this.nackSleep = sleep;
2903-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
2903+
synchronized (ListenerConsumer.this) {
2904+
if (ListenerConsumer.this.offsetsInThisBatch != null) {
2905+
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
2906+
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
2907+
}
2908+
}
29042909
}
29052910

29062911
@Override
29072912
public String toString() {
2908-
return "Acknowledgment for " + this.record;
2913+
return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true);
29092914
}
29102915

29112916
}
@@ -2944,6 +2949,12 @@ public void nack(int index, long sleep) {
29442949
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
29452950
ListenerConsumer.this.nackIndex = index;
29462951
ListenerConsumer.this.nackSleep = sleep;
2952+
synchronized (ListenerConsumer.this) {
2953+
if (ListenerConsumer.this.offsetsInThisBatch != null) {
2954+
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
2955+
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
2956+
}
2957+
}
29472958
}
29482959

29492960
@Override

0 commit comments

Comments
 (0)