Skip to content

Commit 10905dc

Browse files
authored
GH-2214: Fix Manual Nack with Zero Sleep
Resolves #2214 Regression; manual nack with zero sleep went into a tight loop since 2.8.4. Add a copy of the current test, with a zero sleep; also verify that nacking the first and last records work. **cherry-pick to 2.9.x, 2.8.x** * Remove unnecessary interceptor from new test.
1 parent f53d41b commit 10905dc

File tree

2 files changed

+254
-1
lines changed

2 files changed

+254
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,6 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
24642464
private void pauseForNackSleep() {
24652465
if (this.nackSleep > 0) {
24662466
this.nackWake = System.currentTimeMillis() + this.nackSleep;
2467-
this.nackSleep = -1;
24682467
Set<TopicPartition> alreadyPaused = this.consumer.paused();
24692468
Collection<TopicPartition> assigned = getAssignedPartitions();
24702469
if (assigned != null) {
@@ -2484,6 +2483,7 @@ private void pauseForNackSleep() {
24842483
this.consumer.resume(nowPaused);
24852484
}
24862485
}
2486+
this.nackSleep = -1;
24872487
}
24882488

24892489
/**
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyMap;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
25+
import static org.mockito.Mockito.mock;
26+
27+
import java.time.Duration;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
import java.util.HashMap;
33+
import java.util.LinkedHashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Optional;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
42+
import org.apache.kafka.clients.consumer.Consumer;
43+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
44+
import org.apache.kafka.clients.consumer.ConsumerRecord;
45+
import org.apache.kafka.clients.consumer.ConsumerRecords;
46+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
47+
import org.apache.kafka.common.TopicPartition;
48+
import org.apache.kafka.common.header.internals.RecordHeaders;
49+
import org.apache.kafka.common.record.TimestampType;
50+
import org.junit.jupiter.api.Test;
51+
import org.mockito.InOrder;
52+
53+
import org.springframework.beans.factory.annotation.Autowired;
54+
import org.springframework.context.annotation.Bean;
55+
import org.springframework.context.annotation.Configuration;
56+
import org.springframework.kafka.annotation.EnableKafka;
57+
import org.springframework.kafka.annotation.KafkaListener;
58+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
59+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
60+
import org.springframework.kafka.core.ConsumerFactory;
61+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
62+
import org.springframework.kafka.support.Acknowledgment;
63+
import org.springframework.kafka.test.utils.KafkaTestUtils;
64+
import org.springframework.test.annotation.DirtiesContext;
65+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
66+
67+
/**
68+
* @author Gary Russell
69+
* @since 2.8.5
70+
*
71+
*/
72+
@SpringJUnitConfig
73+
@DirtiesContext
74+
public class ManualNackRecordZeroSleepTests {
75+
76+
@SuppressWarnings("rawtypes")
77+
@Autowired
78+
private Consumer consumer;
79+
@Autowired
80+
private Config config;
81+
82+
@Autowired
83+
private KafkaListenerEndpointRegistry registry;
84+
85+
@SuppressWarnings({ "unchecked" })
86+
@Test
87+
public void zeroSleepNackFirstLastAndMiddleRecords() throws Exception {
88+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
89+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
90+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
91+
this.registry.stop();
92+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
93+
InOrder inOrder = inOrder(this.consumer);
94+
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
95+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
96+
HashMap<TopicPartition, OffsetAndMetadata> commit1 = new HashMap<>();
97+
commit1.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
98+
commit1.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
99+
HashMap<TopicPartition, OffsetAndMetadata> commit2 = new HashMap<>();
100+
commit2.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
101+
commit2.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
102+
HashMap<TopicPartition, OffsetAndMetadata> commit3 = new HashMap<>();
103+
commit3.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
104+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
105+
inOrder.verify(this.consumer).commitSync(commit1, Duration.ofSeconds(60));
106+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
107+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
108+
inOrder.verify(this.consumer).commitSync(commit2, Duration.ofSeconds(60));
109+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 1L);
110+
inOrder.verify(this.consumer).commitSync(commit3, Duration.ofSeconds(60));
111+
assertThat(this.config.count).isEqualTo(9);
112+
assertThat(this.config.contents.toArray()).isEqualTo(new String[]
113+
{ "foo", "foo", "bar", "baz", "qux", "qux", "fiz", "buz", "buz"});
114+
}
115+
116+
@Configuration
117+
@EnableKafka
118+
public static class Config {
119+
120+
final List<String> contents = new ArrayList<>();
121+
122+
final CountDownLatch pollLatch = new CountDownLatch(5);
123+
124+
final CountDownLatch deliveryLatch = new CountDownLatch(9);
125+
126+
final CountDownLatch closeLatch = new CountDownLatch(1);
127+
128+
final CountDownLatch commitLatch = new CountDownLatch(4);
129+
130+
volatile int count;
131+
132+
@KafkaListener(topics = "foo", groupId = "grp")
133+
public void foo(String in, Acknowledgment ack) {
134+
this.contents.add(in);
135+
this.deliveryLatch.countDown();
136+
++this.count;
137+
if (this.contents.size() == 1 || this.count == 5 || this.count == 8) {
138+
// first, last record or part 1, offset 1, first time
139+
ack.nack(0);
140+
}
141+
else {
142+
ack.acknowledge();
143+
}
144+
}
145+
146+
@SuppressWarnings({ "rawtypes" })
147+
@Bean
148+
public ConsumerFactory consumerFactory() {
149+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
150+
final Consumer consumer = consumer();
151+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
152+
.willReturn(consumer);
153+
return consumerFactory;
154+
}
155+
156+
@SuppressWarnings({ "rawtypes", "unchecked" })
157+
@Bean
158+
public Consumer consumer() {
159+
final Consumer consumer = mock(Consumer.class);
160+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
161+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
162+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
163+
willAnswer(i -> {
164+
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
165+
Collections.singletonList(topicPartition1));
166+
return null;
167+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
168+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
169+
records1.put(topicPartition0, Arrays.asList(
170+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
171+
new RecordHeaders(), Optional.empty()),
172+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
173+
new RecordHeaders(), Optional.empty())));
174+
records1.put(topicPartition1, Arrays.asList(
175+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
176+
new RecordHeaders(), Optional.empty()),
177+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
178+
new RecordHeaders(), Optional.empty())));
179+
records1.put(topicPartition2, Arrays.asList(
180+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
181+
new RecordHeaders(), Optional.empty()),
182+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
183+
new RecordHeaders(), Optional.empty())));
184+
Map<TopicPartition, List<ConsumerRecord>> records2 = new LinkedHashMap<>(records1);
185+
records2.remove(topicPartition0);
186+
records2.put(topicPartition1, Arrays.asList(
187+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
188+
new RecordHeaders(), Optional.empty())));
189+
Map<TopicPartition, List<ConsumerRecord>> records3 = new LinkedHashMap<>();
190+
records3.put(topicPartition2, Arrays.asList(
191+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
192+
new RecordHeaders(), Optional.empty())));
193+
final AtomicInteger which = new AtomicInteger();
194+
final AtomicBoolean paused = new AtomicBoolean();
195+
willAnswer(i -> {
196+
if (paused.get()) {
197+
Thread.sleep(10);
198+
return ConsumerRecords.empty();
199+
}
200+
this.pollLatch.countDown();
201+
switch (which.getAndIncrement()) {
202+
case 0:
203+
case 1:
204+
return new ConsumerRecords(records1);
205+
case 2:
206+
return new ConsumerRecords(records2);
207+
case 3:
208+
return new ConsumerRecords(records3);
209+
default:
210+
try {
211+
Thread.sleep(1000);
212+
}
213+
catch (InterruptedException e) {
214+
Thread.currentThread().interrupt();
215+
}
216+
return new ConsumerRecords(Collections.emptyMap());
217+
}
218+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
219+
willAnswer(i -> {
220+
return Collections.emptySet();
221+
}).given(consumer).paused();
222+
willAnswer(i -> {
223+
paused.set(true);
224+
return null;
225+
}).given(consumer).pause(any());
226+
willAnswer(i -> {
227+
paused.set(false);
228+
return null;
229+
}).given(consumer).resume(any());
230+
willAnswer(i -> {
231+
this.commitLatch.countDown();
232+
return null;
233+
}).given(consumer).commitSync(anyMap(), any());
234+
willAnswer(i -> {
235+
this.closeLatch.countDown();
236+
return null;
237+
}).given(consumer).close();
238+
return consumer;
239+
}
240+
241+
@SuppressWarnings({ "rawtypes", "unchecked" })
242+
@Bean
243+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
244+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
245+
factory.setConsumerFactory(consumerFactory());
246+
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
247+
factory.getContainerProperties().setMissingTopicsFatal(false);
248+
return factory;
249+
}
250+
251+
}
252+
253+
}

0 commit comments

Comments
 (0)