Skip to content

Commit ea3dd76

Browse files
szpakgaryrussell
authored andcommitted
Precise unit for sleep duration and wake time
Prior to, it was required to dig a few levels into to realize they are in millis.
1 parent 3832124 commit ea3dd76

File tree

2 files changed

+28
-35
lines changed

2 files changed

+28
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
734734

735735
private long lastAlertAt = this.lastReceive;
736736

737-
private long nackSleep = -1;
737+
private long nackSleepDurationMillis = -1;
738738

739-
private long nackWake;
739+
private long nackWakeTimeMillis;
740740

741741
private int nackIndex;
742742

@@ -1637,9 +1637,9 @@ private void doPauseConsumerIfNecessary() {
16371637
}
16381638

16391639
private void resumeConsumerIfNeccessary() {
1640-
if (this.nackWake > 0) {
1641-
if (System.currentTimeMillis() > this.nackWake) {
1642-
this.nackWake = 0;
1640+
if (this.nackWakeTimeMillis > 0) {
1641+
if (System.currentTimeMillis() > this.nackWakeTimeMillis) {
1642+
this.nackWakeTimeMillis = 0;
16431643
this.consumer.resume(this.pausedForNack);
16441644
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
16451645
this.pausedForNack.clear();
@@ -2237,7 +2237,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22372237

22382238
invokeBatchOnMessageWithRecordsOrList(records, recordList);
22392239
List<ConsumerRecord<?, ?>> toSeek = null;
2240-
if (this.nackSleep >= 0) {
2240+
if (this.nackSleepDurationMillis >= 0) {
22412241
int index = 0;
22422242
toSeek = new ArrayList<>();
22432243
for (ConsumerRecord<K, V> record : records) {
@@ -2247,7 +2247,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22472247
}
22482248
}
22492249
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
2250-
if (this.nackSleep < 0) {
2250+
if (this.nackSleepDurationMillis < 0) {
22512251
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
22522252
this.acks.put(record);
22532253
}
@@ -2391,7 +2391,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23912391
if (this.commonRecordInterceptor != null) {
23922392
this.commonRecordInterceptor.afterRecord(record, this.consumer);
23932393
}
2394-
if (this.nackSleep >= 0) {
2394+
if (this.nackSleepDurationMillis >= 0) {
23952395
handleNack(records, record);
23962396
break;
23972397
}
@@ -2475,7 +2475,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
24752475
if (this.commonRecordInterceptor != null) {
24762476
this.commonRecordInterceptor.afterRecord(record, this.consumer);
24772477
}
2478-
if (this.nackSleep >= 0) {
2478+
if (this.nackSleepDurationMillis >= 0) {
24792479
handleNack(records, record);
24802480
break;
24812481
}
@@ -2551,8 +2551,8 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
25512551
}
25522552

25532553
private void pauseForNackSleep() {
2554-
if (this.nackSleep > 0) {
2555-
this.nackWake = System.currentTimeMillis() + this.nackSleep;
2554+
if (this.nackSleepDurationMillis > 0) {
2555+
this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis;
25562556
Set<TopicPartition> alreadyPaused = this.consumer.paused();
25572557
Collection<TopicPartition> assigned = getAssignedPartitions();
25582558
if (assigned != null) {
@@ -2572,7 +2572,7 @@ private void pauseForNackSleep() {
25722572
this.consumer.resume(nowPaused);
25732573
}
25742574
}
2575-
this.nackSleep = -1;
2575+
this.nackSleepDurationMillis = -1;
25762576
}
25772577

25782578
/**
@@ -2667,7 +2667,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
26672667
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
26682668
}
26692669
doInvokeOnMessage(record);
2670-
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
2670+
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
26712671
ackCurrent(record);
26722672
}
26732673
}
@@ -3240,11 +3240,11 @@ public void acknowledge() {
32403240
}
32413241

32423242
@Override
3243-
public void nack(long sleep) {
3243+
public void nack(long sleepMillis) {
32443244
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32453245
"nack() can only be called on the consumer thread");
3246-
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
3247-
ListenerConsumer.this.nackSleep = sleep;
3246+
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
3247+
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
32483248
synchronized (ListenerConsumer.this) {
32493249
if (ListenerConsumer.this.offsetsInThisBatch != null) {
32503250
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
@@ -3288,13 +3288,13 @@ public void acknowledge() {
32883288
}
32893289

32903290
@Override
3291-
public void nack(int index, long sleep) {
3291+
public void nack(int index, long sleepMillis) {
32923292
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
32933293
"nack() can only be called on the consumer thread");
3294-
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
3294+
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
32953295
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
32963296
ListenerConsumer.this.nackIndex = index;
3297-
ListenerConsumer.this.nackSleep = sleep;
3297+
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
32983298
synchronized (ListenerConsumer.this) {
32993299
if (ListenerConsumer.this.offsetsInThisBatch != null) {
33003300
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,35 +37,28 @@ public interface Acknowledgment {
3737
/**
3838
* Negatively acknowledge the current record - discard remaining records from the poll
3939
* and re-seek all partitions so that this record will be redelivered after the sleep
40-
* time. Must be called on the consumer thread.
40+
* time (in milliseconds). Must be called on the consumer thread.
4141
* <p>
42-
* <b>When using group management,
43-
* {@code sleep + time spent processing the previous messages from the poll} must be
44-
* less than the consumer {@code max.poll.interval.ms} property, to avoid a
45-
* rebalance.</b>
46-
* @param sleep the time to sleep; the actual sleep time will be larger of this value
47-
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
42+
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
43+
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
4844
* @since 2.3
4945
*/
50-
default void nack(long sleep) {
46+
default void nack(long sleepMillis) {
5147
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
5248
}
5349

5450
/**
5551
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
5652
* records before the index and re-seek the partitions so that the record at the index
57-
* and subsequent records will be redelivered after the sleep time. Must be called on
58-
* the consumer thread.
53+
* and subsequent records will be redelivered after the sleep time (in milliseconds).
54+
* Must be called on the consumer thread.
5955
* <p>
60-
* <b>When using group management,
61-
* {@code sleep + time spent processing the records before the index} must be less
62-
* than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance.</b>
6356
* @param index the index of the failed record in the batch.
64-
* @param sleep the time to sleep; the actual sleep time will be larger of this value
65-
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
57+
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
58+
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
6659
* @since 2.3
6760
*/
68-
default void nack(int index, long sleep) {
61+
default void nack(int index, long sleepMillis) {
6962
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
7063
}
7164

0 commit comments

Comments
 (0)