Skip to content

Commit cb5c36b

Browse files
garyrussellartembilan
authored andcommitted
GH-2410: Disallow nack() with Out of Order Commits
Resolves #2410 `nack()` cannot be used with out of order commits - the contract means commit all previous offsets and redeliver remaining. - the appliation might nack multiple records. **cherry-pick to 2.9.x, 2.8.x** # Conflicts: # spring-kafka-docs/src/main/asciidoc/kafka.adoc
1 parent 91507a9 commit cb5c36b

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,6 +1283,8 @@ NOTE: If you want to commit a partial batch, using `nack()`, When using transact
12831283

12841284
IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener.
12851285

1286+
IMPORTANT: `nack()` is not allowed when using <<ooo-commits, Out of Order Commits>>.
1287+
12861288
With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
12871289
The consumer can be paused before redelivery, by setting the `sleep` argument.
12881290
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3343,14 +3343,10 @@ public void acknowledge() {
33433343
public void nack(Duration sleep) {
33443344
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33453345
"nack() can only be called on the consumer thread");
3346+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3347+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
33463348
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33473349
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3348-
synchronized (ListenerConsumer.this) {
3349-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3350-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3351-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3352-
}
3353-
}
33543350
}
33553351

33563352
@Override
@@ -3391,16 +3387,12 @@ public void acknowledge() {
33913387
public void nack(int index, Duration sleep) {
33923388
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33933389
"nack() can only be called on the consumer thread");
3390+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3391+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
33943392
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33953393
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
33963394
ListenerConsumer.this.nackIndex = index;
33973395
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3398-
synchronized (ListenerConsumer.this) {
3399-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3400-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3401-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3402-
}
3403-
}
34043396
int i = 0;
34053397
List<ConsumerRecord<K, V>> toAck = new LinkedList<>();
34063398
for (ConsumerRecord<K, V> record : this.records) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,16 @@ private void testInOrderAck(AckMode ackMode) throws Exception {
689689
containerProps.setAsyncAcks(true);
690690
final CountDownLatch latch = new CountDownLatch(4);
691691
final List<Acknowledgment> acks = new ArrayList<>();
692+
final AtomicReference<IllegalStateException> illegal = new AtomicReference<>();
692693
AcknowledgingMessageListener<Integer, String> messageListener = (data, ack) -> {
694+
if (latch.getCount() == 4) {
695+
try {
696+
ack.nack(Duration.ofSeconds(1));
697+
}
698+
catch (IllegalStateException ex) {
699+
illegal.set(ex);
700+
}
701+
}
693702
latch.countDown();
694703
acks.add(ack);
695704
if (latch.getCount() == 0) {
@@ -720,6 +729,7 @@ private void testInOrderAck(AckMode ackMode) throws Exception {
720729
verify(consumer).commitSync(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(4L)),
721730
Duration.ofMinutes(1));
722731
container.stop();
732+
assertThat(illegal.get()).isNotNull();
723733
}
724734

725735
@Test

0 commit comments

Comments
 (0)