Skip to content

Commit eef4389

Browse files
authored
GH-1827: Add ContainerGroupSequencer
Resolves #1827 Add a mechanism to automatically control a sequence of containers, starting a group of containers when the current group goes idle. * Improve test; fix race. * Fix typo and revert test logging level.
1 parent cf954d7 commit eef4389

File tree

12 files changed

+639
-14
lines changed

12 files changed

+639
-14
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2397,6 +2397,59 @@ If the function returns `null`, the handler's default `BackOff` will be used.
23972397
Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
23982398
By default, the exception type is not considered.
23992399

2400+
[[sequencing]]
2401+
===== Starting `@KafkaListener` s in Sequence
2402+
2403+
A common use case is to start a listener after another listener has consumed all the records in a topic.
2404+
For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics.
2405+
Starting with version 2.7.3, a new component `ContainerGroupSequencer` has been introduced.
2406+
It uses the `@KafkaListener` `containerGroup` property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.
2407+
2408+
It is best illustrated with an example.
2409+
2410+
====
2411+
[source, java]
2412+
----
2413+
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
2414+
public void listen1(String in) {
2415+
}
2416+
2417+
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
2418+
public void listen2(String in) {
2419+
}
2420+
2421+
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
2422+
public void listen3(String in) {
2423+
}
2424+
2425+
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
2426+
public void listen4(String in) {
2427+
}
2428+
2429+
@Bean
2430+
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
2431+
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
2432+
}
2433+
----
2434+
====
2435+
2436+
Here, we have 4 listeners in two groups, `g1` and `g2`.
2437+
2438+
During application context initialization, the sequencer, sets the `autoStartup` property of all the containers in the provided groups to `false`.
2439+
It also sets the `idleEventInterval` for any containers (that do not already have one set) to the supplied value (5000ms in this case).
2440+
Then, when the sequencer is started by the application context, the containers in the first group are started.
2441+
As `ListenerContainerIdleEvent` s are received, each individual child container in each container is stopped.
2442+
When all child containers in a `ConcurrentMessageListenerContainer` are stopped, the parent container is stopped.
2443+
When all containers in a group have been stopped, the containers in the next group are started.
2444+
There is no limit to the number of groups or containers in a group.
2445+
2446+
By default, the containers in the final group (`g2` above) are not stopped when they go idle.
2447+
To modify that behavior, set `stopLastGroupWhenIdle` to `true` on the sequencer.
2448+
2449+
As an aside; previously, containers in each group were added to a bean of type `Collection<MessageListenerContainer>` with the bean name being the `containerGroup`.
2450+
These collections are now deprecated in favor of beans of type `ContainerGroup` with a bean name that is the group name, suffixed with `.group`; in the example above, there would be 2 beans `g1.group` and `g2.group`.
2451+
The `Collection` beans will be removed in a future release.
2452+
24002453
[[container-props]]
24012454
==== Listener Container Properties
24022455

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,8 @@ See <<configuring-topics>> for more information.
9494

9595
It is now possible to add a `spring-messaging` `SmartMessageConverter` to the `MessagingMessageConverter`, allowing content negotiation based on the `contentType` header.
9696
See <<messaging-message-conversion>> for more information.
97+
98+
[[x27-sequencing]]
99+
==== Sequencing `@KafkaListener` s
100+
101+
See <<container-sequencing>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.lang.annotation.RetentionPolicy;
2424
import java.lang.annotation.Target;
2525

26+
import org.springframework.kafka.listener.ContainerGroup;
2627
import org.springframework.messaging.handler.annotation.MessageMapping;
2728

2829
/**
@@ -148,11 +149,14 @@
148149
TopicPartition[] topicPartitions() default {};
149150

150151
/**
151-
* If provided, the listener container for this listener will be added to a bean
152-
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
153-
* This allows, for example, iteration over the collection to start/stop a subset
154-
* of containers.
155-
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
152+
* If provided, the listener container for this listener will be added to a bean with
153+
* this value as its name, of type {@code Collection<MessageListenerContainer>}. This
154+
* allows, for example, iteration over the collection to start/stop a subset of
155+
* containers. The {@code Collection} beans are deprecated as of version 2.7.3 and
156+
* will be removed in 2.8. Instead, a bean with name {@code containerGroup + ".group"}
157+
* and type {@link ContainerGroup} should be used instead.
158+
* <p>
159+
* SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
156160
* @return the bean name for the group.
157161
*/
158162
String containerGroup() default "";

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
8282
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
8383
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
84+
import org.springframework.kafka.listener.ContainerGroupSequencer;
8485
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
8586
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
8687
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
@@ -305,6 +306,9 @@ public void afterSingletonsInstantiated() {
305306

306307
// Actually register all listeners
307308
this.registrar.afterPropertiesSet();
309+
Map<String, ContainerGroupSequencer> sequencers =
310+
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
311+
sequencers.values().forEach(seq -> seq.initialize());
308312
}
309313

310314
private void buildEnhancer() {

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.context.event.ContextRefreshedEvent;
4040
import org.springframework.core.log.LogAccessor;
4141
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
42+
import org.springframework.kafka.listener.ContainerGroup;
4243
import org.springframework.kafka.listener.ListenerContainerRegistry;
4344
import org.springframework.kafka.listener.MessageListenerContainer;
4445
import org.springframework.lang.Nullable;
@@ -181,16 +182,22 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
181182
MessageListenerContainer container = createListenerContainer(endpoint, factory);
182183
this.listenerContainers.put(id, container);
183184
ConfigurableApplicationContext appContext = this.applicationContext;
184-
if (StringUtils.hasText(endpoint.getGroup()) && appContext != null) {
185+
String groupName = endpoint.getGroup();
186+
if (StringUtils.hasText(groupName) && appContext != null) {
185187
List<MessageListenerContainer> containerGroup;
186-
if (appContext.containsBean(endpoint.getGroup())) { // NOSONAR - hasText
187-
containerGroup = appContext.getBean(endpoint.getGroup(), List.class); // NOSONAR - hasText
188+
ContainerGroup group;
189+
if (appContext.containsBean(groupName)) { // NOSONAR - hasText
190+
containerGroup = appContext.getBean(groupName, List.class); // NOSONAR - hasText
191+
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
188192
}
189193
else {
190194
containerGroup = new ArrayList<MessageListenerContainer>();
191-
appContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); // NOSONAR - hasText
195+
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
196+
group = new ContainerGroup(groupName);
197+
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
192198
}
193199
containerGroup.add(container);
200+
group.addContainers(container);
194201
}
195202
if (startImmediately) {
196203
startIfNecessary(container);

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,19 @@ public boolean isContainerPaused() {
154154
}
155155
}
156156

157+
@Override
158+
public boolean isChildRunning() {
159+
if (!isRunning()) {
160+
return false;
161+
}
162+
for (MessageListenerContainer container : this.containers) {
163+
if (container.isRunning()) {
164+
return true;
165+
}
166+
}
167+
return false;
168+
}
169+
157170
@Override
158171
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
159172
synchronized (this.lifecycleMonitor) {
@@ -265,7 +278,11 @@ private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProp
265278
protected void doStop(final Runnable callback) {
266279
final AtomicInteger count = new AtomicInteger();
267280
if (isRunning()) {
281+
boolean childRunning = isChildRunning();
268282
setRunning(false);
283+
if (!childRunning) {
284+
callback.run();
285+
}
269286
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
270287
if (container.isRunning()) {
271288
count.incrementAndGet();
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
import java.util.LinkedHashSet;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
23+
24+
import org.apache.commons.logging.LogFactory;
25+
26+
import org.springframework.context.Lifecycle;
27+
import org.springframework.core.log.LogAccessor;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* A group of listener containers.
32+
*
33+
* @author Gary Russell
34+
* @since 2.7.3
35+
*
36+
*/
37+
public class ContainerGroup implements Lifecycle {
38+
39+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ContainerGroup.class));
40+
41+
private final String name;
42+
43+
private final Collection<MessageListenerContainer> containers = new LinkedHashSet<>();
44+
45+
private boolean running;
46+
47+
/**
48+
* Construct an instance with the provided name.
49+
* @param name the group name.
50+
*/
51+
public ContainerGroup(String name) {
52+
this.name = name;
53+
}
54+
55+
/**
56+
* Construct an instance with the provided name and containers.
57+
* @param name the group name.
58+
* @param containers the containers.
59+
*/
60+
public ContainerGroup(String name, List<MessageListenerContainer> containers) {
61+
this.name = name;
62+
this.containers.addAll(containers);
63+
}
64+
65+
/**
66+
* Construct an instance with the provided name and containers.
67+
* @param name the group name.
68+
* @param containers the containers.
69+
*/
70+
public ContainerGroup(String name, MessageListenerContainer...containers) {
71+
this.name = name;
72+
for (MessageListenerContainer container : containers) {
73+
this.containers.add(container);
74+
}
75+
}
76+
77+
/**
78+
* Return the group name.
79+
* @return the name.
80+
*/
81+
public String getName() {
82+
return this.name;
83+
}
84+
85+
/**
86+
* Return the listener ids of the containers in this group.
87+
* @return the listener ids.
88+
*/
89+
public Collection<String> getListenerIds() {
90+
return this.containers.stream()
91+
.map(container -> container.getListenerId())
92+
.map(id -> {
93+
Assert.state(id != null, "Containers must have listener ids to be used here");
94+
return id;
95+
})
96+
.collect(Collectors.toList());
97+
}
98+
99+
/**
100+
* Return true if the provided container is in this group.
101+
* @param container the container.
102+
* @return true if it is in this group.
103+
*/
104+
public boolean contains(MessageListenerContainer container) {
105+
return this.containers.contains(container);
106+
}
107+
108+
/**
109+
* Return true if all containers in this group are stopped.
110+
* @return true if all are stopped.
111+
*/
112+
public boolean allStopped() {
113+
return this.containers.stream()
114+
.allMatch(container -> !container.isRunning());
115+
}
116+
117+
/**
118+
* Add one or more containers to the group.
119+
* @param theContainers the container(s).
120+
*/
121+
public void addContainers(MessageListenerContainer... theContainers) {
122+
for (MessageListenerContainer container : theContainers) {
123+
this.containers.add(container);
124+
}
125+
}
126+
127+
/**
128+
* Remove a container from the group.
129+
* @param container the container.
130+
* @return true if the container was removed.
131+
*/
132+
public boolean removeContainer(MessageListenerContainer container) {
133+
return this.containers.remove(container);
134+
}
135+
136+
@Override
137+
public synchronized void start() {
138+
if (!this.running) {
139+
this.containers.forEach(container -> {
140+
LOGGER.debug(() -> "Starting: " + container);
141+
container.start();
142+
});
143+
this.running = true;
144+
}
145+
}
146+
147+
@Override
148+
public synchronized void stop() {
149+
if (this.running) {
150+
this.containers.forEach(container -> container.stop());
151+
this.running = false;
152+
}
153+
}
154+
155+
@Override
156+
public synchronized boolean isRunning() {
157+
return this.running;
158+
}
159+
160+
@Override
161+
public String toString() {
162+
return "ContainerGroup [name=" + this.name + ", containers=" + this.containers + "]";
163+
}
164+
165+
}

0 commit comments

Comments
 (0)