Skip to content

Commit 989fb29

Browse files
committed
GH-2332: Fix Container.pause() with Manual Assign.
Resolves #2332 Use manual assignments, if present, for pause. Also, in the rebalance listener, use `isPaused()` rather than `consumerPaused` to determine whether the partitions should be paused. Add logs and events for pauses in the rebal listener. Revert errant log4j commit. **cherry-pick to 2.9.x, 2.8.x** * Use `CollectionUtils.isEmpty()`
1 parent b286250 commit 989fb29

File tree

3 files changed

+246
-7
lines changed

3 files changed

+246
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import org.springframework.transaction.support.TransactionTemplate;
121121
import org.springframework.util.Assert;
122122
import org.springframework.util.ClassUtils;
123+
import org.springframework.util.CollectionUtils;
123124
import org.springframework.util.StringUtils;
124125
import org.springframework.util.concurrent.ListenableFuture;
125126
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -1627,10 +1628,13 @@ private void doPauseConsumerIfNecessary() {
16271628
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
16281629
}
16291630
if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks)) {
1630-
this.consumer.pause(this.consumer.assignment());
1631-
this.consumerPaused = true;
1632-
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1633-
publishConsumerPausedEvent(this.consumer.assignment());
1631+
Collection<TopicPartition> assigned = getAssignedPartitions();
1632+
if (!CollectionUtils.isEmpty(assigned)) {
1633+
this.consumer.pause(assigned);
1634+
this.consumerPaused = true;
1635+
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1636+
publishConsumerPausedEvent(assigned);
1637+
}
16341638
}
16351639
}
16361640

@@ -3372,10 +3376,13 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
33723376
}
33733377

33743378
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3375-
if (ListenerConsumer.this.consumerPaused) {
3379+
if (isPaused()) {
33763380
ListenerConsumer.this.consumer.pause(partitions);
3381+
ListenerConsumer.this.consumerPaused = true;
33773382
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
33783383
+ "consumer paused again, so the initial poll() will never return any records");
3384+
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
3385+
publishConsumerPausedEvent(partitions);
33793386
}
33803387
Collection<TopicPartition> toRepause = new LinkedList<>();
33813388
partitions.forEach(tp -> {
@@ -3385,6 +3392,8 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
33853392
});
33863393
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
33873394
ListenerConsumer.this.consumer.pause(toRepause);
3395+
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
3396+
publishConsumerPausedEvent(toRepause);
33883397
}
33893398
this.revoked.removeAll(toRepause);
33903399
this.revoked.forEach(tp -> resumePartition(tp));

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ private void testInOrderAckPauseUntilAcked(AckMode ackMode, boolean batch) throw
765765
final CountDownLatch pauseLatch = new CountDownLatch(1);
766766
willAnswer(inv -> {
767767
paused.set(true);
768-
pausedParts.set(inv.getArgument(0));
768+
pausedParts.set(new HashSet<>(inv.getArgument(0)));
769769
pauseLatch.countDown();
770770
return null;
771771
}).given(consumer).pause(any());
@@ -2581,7 +2581,7 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25812581
pauseLatch1.countDown();
25822582
pauseLatch2.countDown();
25832583
return null;
2584-
}).given(consumer).pause(records.keySet());
2584+
}).given(consumer).pause(any());
25852585
given(consumer.paused()).willReturn(pausedParts);
25862586
CountDownLatch pollWhilePausedLatch = new CountDownLatch(2);
25872587
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyMap;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.verify;
28+
29+
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.HashSet;
35+
import java.util.LinkedHashMap;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Optional;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.stream.Collectors;
43+
44+
import org.apache.kafka.clients.consumer.Consumer;
45+
import org.apache.kafka.clients.consumer.ConsumerRecord;
46+
import org.apache.kafka.clients.consumer.ConsumerRecords;
47+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
48+
import org.apache.kafka.common.TopicPartition;
49+
import org.apache.kafka.common.header.internals.RecordHeaders;
50+
import org.apache.kafka.common.record.TimestampType;
51+
import org.junit.jupiter.api.Test;
52+
import org.mockito.ArgumentCaptor;
53+
import org.mockito.InOrder;
54+
55+
import org.springframework.beans.factory.annotation.Autowired;
56+
import org.springframework.context.annotation.Bean;
57+
import org.springframework.context.annotation.Configuration;
58+
import org.springframework.kafka.annotation.EnableKafka;
59+
import org.springframework.kafka.annotation.KafkaListener;
60+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
61+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
62+
import org.springframework.kafka.core.ConsumerFactory;
63+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
64+
import org.springframework.kafka.test.utils.KafkaTestUtils;
65+
import org.springframework.test.annotation.DirtiesContext;
66+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
67+
68+
/**
69+
* @author Gary Russell
70+
* @since 2.9
71+
*
72+
*/
73+
@SpringJUnitConfig
74+
@DirtiesContext
75+
public class PauseContainerManualAssignmentTests {
76+
77+
@SuppressWarnings("rawtypes")
78+
@Autowired
79+
private Consumer consumer;
80+
81+
@Autowired
82+
private Config config;
83+
84+
@Autowired
85+
private KafkaListenerEndpointRegistry registry;
86+
87+
@SuppressWarnings("unchecked")
88+
@Test
89+
public void pausesWithManualAssignment() throws Exception {
90+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
91+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
92+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
93+
this.registry.stop();
94+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
95+
InOrder inOrder = inOrder(this.consumer);
96+
inOrder.verify(this.consumer).assign(any(Collection.class));
97+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
98+
inOrder.verify(this.consumer).commitSync(
99+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
100+
Duration.ofSeconds(60));
101+
inOrder.verify(this.consumer).commitSync(
102+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)),
103+
Duration.ofSeconds(60));
104+
inOrder.verify(this.consumer).commitSync(
105+
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
106+
Duration.ofSeconds(60));
107+
ArgumentCaptor<Collection<TopicPartition>> pauses = ArgumentCaptor.forClass(Collection.class);
108+
inOrder.verify(this.consumer).pause(pauses.capture());
109+
assertThat(pauses.getValue().stream().collect(Collectors.toList())).contains(new TopicPartition("foo", 0),
110+
new TopicPartition("foo", 1), new TopicPartition("foo", 2));
111+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
112+
verify(this.consumer, never()).resume(any());
113+
assertThat(this.config.count).isEqualTo(4);
114+
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
115+
}
116+
117+
@Configuration
118+
@EnableKafka
119+
public static class Config {
120+
121+
final List<String> contents = new ArrayList<>();
122+
123+
final CountDownLatch pollLatch = new CountDownLatch(4);
124+
125+
final CountDownLatch deliveryLatch = new CountDownLatch(4);
126+
127+
final CountDownLatch closeLatch = new CountDownLatch(1);
128+
129+
final CountDownLatch commitLatch = new CountDownLatch(3);
130+
131+
int count;
132+
133+
@KafkaListener(id = "id", groupId = "grp",
134+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
135+
partitions = "#{'0,1,2'.split(',')}"))
136+
public void foo(String in) {
137+
this.contents.add(in);
138+
this.deliveryLatch.countDown();
139+
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
140+
throw new RuntimeException("foo");
141+
}
142+
}
143+
144+
@SuppressWarnings({ "rawtypes" })
145+
@Bean
146+
public ConsumerFactory consumerFactory(KafkaListenerEndpointRegistry registry) {
147+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
148+
final Consumer consumer = consumer(registry);
149+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
150+
.willReturn(consumer);
151+
return consumerFactory;
152+
}
153+
154+
@SuppressWarnings({ "rawtypes", "unchecked" })
155+
@Bean
156+
public Consumer consumer(KafkaListenerEndpointRegistry registry) {
157+
final Consumer consumer = mock(Consumer.class);
158+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
159+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
160+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
161+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
162+
records1.put(topicPartition0, Arrays.asList(
163+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
164+
new RecordHeaders(), Optional.empty()),
165+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
166+
new RecordHeaders(), Optional.empty())));
167+
records1.put(topicPartition1, Arrays.asList(
168+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
169+
new RecordHeaders(), Optional.empty()),
170+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
171+
new RecordHeaders(), Optional.empty())));
172+
records1.put(topicPartition2, Arrays.asList(
173+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
174+
new RecordHeaders(), Optional.empty()),
175+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
176+
new RecordHeaders(), Optional.empty())));
177+
final AtomicInteger which = new AtomicInteger();
178+
willAnswer(i -> {
179+
this.pollLatch.countDown();
180+
switch (which.getAndIncrement()) {
181+
case 0:
182+
return new ConsumerRecords(records1);
183+
default:
184+
try {
185+
Thread.sleep(50);
186+
}
187+
catch (InterruptedException e) {
188+
Thread.currentThread().interrupt();
189+
}
190+
return new ConsumerRecords(Collections.emptyMap());
191+
}
192+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
193+
List<TopicPartition> paused = new ArrayList<>();
194+
willAnswer(i -> {
195+
this.commitLatch.countDown();
196+
registry.getListenerContainer("id").pause();
197+
return null;
198+
}).given(consumer).commitSync(anyMap(), any());
199+
willAnswer(i -> {
200+
this.closeLatch.countDown();
201+
return null;
202+
}).given(consumer).close();
203+
willAnswer(i -> {
204+
paused.addAll(i.getArgument(0));
205+
return null;
206+
}).given(consumer).pause(any());
207+
willAnswer(i -> {
208+
return new HashSet<>(paused);
209+
}).given(consumer).paused();
210+
willAnswer(i -> {
211+
paused.removeAll(i.getArgument(0));
212+
return null;
213+
}).given(consumer).resume(any());
214+
return consumer;
215+
}
216+
217+
@SuppressWarnings({ "rawtypes", "unchecked" })
218+
@Bean
219+
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListenerEndpointRegistry registry) {
220+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
221+
factory.setConsumerFactory(consumerFactory(registry));
222+
factory.getContainerProperties().setAckMode(AckMode.RECORD);
223+
DefaultErrorHandler eh = new DefaultErrorHandler();
224+
factory.setCommonErrorHandler(eh);
225+
return factory;
226+
}
227+
228+
}
229+
230+
}

0 commit comments

Comments
 (0)