Skip to content

Commit c69791e

Browse files
szpakgaryrussell
authored andcommitted
Add Acknowledgment.nack() variants accepting Duration
To prevent confusion what unit should be used for "long sleep" arguments. As the old and new methods in the Acknowledgment interface have default implementations, the change itself is backward compatible. The old methods are marked as deprecated and intended to be removed in the future.
1 parent 43bc67a commit c69791e

File tree

7 files changed

+45
-11
lines changed

7 files changed

+45
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3340,11 +3340,11 @@ public void acknowledge() {
33403340
}
33413341

33423342
@Override
3343-
public void nack(long sleepMillis) {
3343+
public void nack(Duration sleep) {
33443344
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33453345
"nack() can only be called on the consumer thread");
3346-
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3347-
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
3346+
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
3347+
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
33483348
synchronized (ListenerConsumer.this) {
33493349
if (ListenerConsumer.this.offsetsInThisBatch != null) {
33503350
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
@@ -3388,13 +3388,13 @@ public void acknowledge() {
33883388
}
33893389

33903390
@Override
3391-
public void nack(int index, long sleepMillis) {
3391+
public void nack(int index, Duration sleep) {
33923392
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33933393
"nack() can only be called on the consumer thread");
3394-
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3394+
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33953395
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
33963396
ListenerConsumer.this.nackIndex = index;
3397-
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
3397+
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
33983398
synchronized (ListenerConsumer.this) {
33993399
if (ListenerConsumer.this.offsetsInThisBatch != null) {
34003400
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.support;
1818

19+
import java.time.Duration;
20+
1921
/**
2022
* Handle for acknowledging the processing of a
2123
* {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the
@@ -42,8 +44,23 @@ public interface Acknowledgment {
4244
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
4345
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
4446
* @since 2.3
47+
* @deprecated in favor of {@link #nack(Duration)}
4548
*/
49+
@Deprecated
4650
default void nack(long sleepMillis) {
51+
nack(Duration.ofMillis(sleepMillis));
52+
}
53+
54+
/**
55+
* Negatively acknowledge the current record - discard remaining records from the poll
56+
* and re-seek all partitions so that this record will be redelivered after the sleep
57+
* duration. Must be called on the consumer thread.
58+
* <p>
59+
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
60+
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
61+
* @since 2.8.7
62+
*/
63+
default void nack(Duration sleep) {
4764
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
4865
}
4966

@@ -57,8 +74,25 @@ default void nack(long sleepMillis) {
5774
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
5875
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
5976
* @since 2.3
77+
* @deprecated in favor of {@link #nack(int, Duration)}
6078
*/
79+
@Deprecated
6180
default void nack(int index, long sleepMillis) {
81+
nack(index, Duration.ofMillis(sleepMillis));
82+
}
83+
84+
/**
85+
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
86+
* records before the index and re-seek the partitions so that the record at the index
87+
* and subsequent records will be redelivered after the sleep duration.
88+
* Must be called on the consumer thread.
89+
* <p>
90+
* @param index the index of the failed record in the batch.
91+
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
92+
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
93+
* @since 2.8.7
94+
*/
95+
default void nack(int index, Duration sleep) {
6296
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
6397
}
6498

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void foo(List<String> in, Acknowledgment ack) {
146146
this.replayTime = System.currentTimeMillis() - this.replayTime;
147147
this.deliveryLatch.countDown();
148148
if (this.fail.getAndSet(false)) {
149-
ack.nack(3, 50);
149+
ack.nack(3, Duration.ofMillis(50));
150150
}
151151
else {
152152
ack.acknowledge();

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void foo(List<String> in, Acknowledgment ack) {
150150
this.replayTime = System.currentTimeMillis() - this.replayTime;
151151
this.deliveryLatch.countDown();
152152
if (++this.count == 1) { // part 1, offset 1, first time
153-
ack.nack(3, 50);
153+
ack.nack(3, Duration.ofMillis(50));
154154
}
155155
else {
156156
ack.acknowledge();

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void foo(String in, Acknowledgment ack) {
134134
}
135135
this.deliveryLatch.countDown();
136136
if (++this.count == 4) { // part 1, offset 1, first time
137-
ack.nack(50);
137+
ack.nack(Duration.ofMillis(50));
138138
}
139139
else {
140140
ack.acknowledge();

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void foo(String in, Acknowledgment ack) {
142142
}
143143
this.deliveryLatch.countDown();
144144
if (++this.count == 4) { // part 1, offset 1, first time
145-
ack.nack(50);
145+
ack.nack(Duration.ofMillis(50));
146146
}
147147
else {
148148
ack.acknowledge();

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordZeroSleepTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void foo(String in, Acknowledgment ack) {
136136
++this.count;
137137
if (this.contents.size() == 1 || this.count == 5 || this.count == 8) {
138138
// first, last record or part 1, offset 1, first time
139-
ack.nack(0);
139+
ack.nack(Duration.ofMillis(0));
140140
}
141141
else {
142142
ack.acknowledge();

0 commit comments

Comments
 (0)