Skip to content

Commit d8e7299

Browse files
authored
GH-3135 : Add ListenerContainerRegistry.getListenerContainersMatchingId(Predicate)
Fixes: #3135 * Implement new `getListenerContainersMatching()` method * Add tests * Add @DisplayName for a test * Add documentation * Cover more getter documentation * Doc : add doc for `BiPredicate` matcher * Feature : `getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> idAndContainerMatcher)` * Doc : Remove duplication in whats'new adoc
1 parent 2284547 commit d8e7299

File tree

5 files changed

+262
-3
lines changed

5 files changed

+262
-3
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/kafkalistener-lifecycle.adoc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,49 @@ IMPORTANT: Endpoints registered after the application context has been refreshed
4141
An example of late registration is a bean with a `@KafkaListener` in prototype scope where an instance is created after the context is initialized.
4242
Starting with version 2.8.7, you can set the registry's `alwaysStartAfterRefresh` property to `false` and then the container's `autoStartup` property will define whether or not the container is started.
4343

44+
[[retrieving-message-listener-containers]]
45+
46+
== Retrieving MessageListenerContainers from KafkaListenerEndpointRegistry
47+
48+
The `KafkaListenerEndpointRegistry` provides methods for retrieving `MessageListenerContainer` instances to accommodate a range of management scenarios:
49+
50+
**All Containers**: For operations that cover all listener containers, use `getListenerContainers()` to retrieve a comprehensive collection.
51+
52+
[source, java]
53+
----
54+
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
55+
----
56+
57+
**Specific Container by ID**: To manage an individual container, `getListenerContainer(String id)` enables retrieval by its id.
58+
59+
[source, java]
60+
----
61+
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
62+
----
63+
64+
**Dynamic Container Filtering**: Introduced in version 3.2, two overloaded `getListenerContainersMatching` methods enable refined selection of containers.
65+
One method takes a `Predicate<String>` for ID-based filtering as a parameter, while the other takes a `BiPredicate<String, MessageListenerContainer>`
66+
for more advanced criteria that may include container properties or state as a parameter.
67+
68+
[source, java]
69+
----
70+
// Prefix matching (Predicate<String>)
71+
Collection<MessageListenerContainer> filteredContainers =
72+
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
73+
74+
// Regex matching (Predicate<String>)
75+
Collection<MessageListenerContainer> regexFilteredContainers =
76+
registry.getListenerContainersMatching(myPattern::matches);
77+
78+
// Pre-built Set of IDs (Predicate<String>)
79+
Collection<MessageListenerContainer> setFilteredContainers =
80+
registry.getListenerContainersMatching(myIdSet::contains);
81+
82+
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
83+
Collection<MessageListenerContainer> advancedFilteredContainers =
84+
registry.getListenerContainersMatching(
85+
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
86+
);
87+
----
88+
89+
Utilize these methods to efficiently manage and query `MessageListenerContainer` instances within your application.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,12 @@ See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
8787
For Spring Boot applications which define an application name, this name is now used
8888
as a default prefix for auto-generated client IDs for certain client types.
8989
See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID prefixes] for more details.
90+
91+
[[get-listener-containers-matching]]
92+
== Enhanced Retrieval of MessageListenerContainers
93+
94+
`ListenerContainerRegistry` provides two new API's dynamically find and filter `MessageListenerContainer` instances.
95+
`getListenerContainersMatching(Predicate<String> idMatcher)` to filter by ID and the other is
96+
`getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)` to filter by ID and container properties.
97+
98+
See xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc#retrieving-message-listener-containers[`@KafkaListener` Lifecycle Management's API Docs] for more information.

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.atomic.AtomicInteger;
2727
import java.util.concurrent.locks.ReentrantLock;
28+
import java.util.function.BiPredicate;
29+
import java.util.function.Predicate;
2830

2931
import org.apache.commons.logging.LogFactory;
3032

@@ -68,6 +70,7 @@
6870
* @author Gary Russell
6971
* @author Asi Bross
7072
* @author Wang Zhiyang
73+
* @author Joo Hyuk Kim
7174
*
7275
* @see KafkaListenerEndpoint
7376
* @see MessageListenerContainer
@@ -116,6 +119,47 @@ public MessageListenerContainer getListenerContainer(String id) {
116119
return this.listenerContainers.get(id);
117120
}
118121

122+
/**
123+
* Return all {@link MessageListenerContainer} instances with id matching the predicate or
124+
* empty {@link Collection} if no such container exists.
125+
* @param idMatcher the predicate to match container id with
126+
* @return the containers or empty {@link Collection} if no container with that id exists
127+
* @since 3.2
128+
* @see #getListenerContainerIds()
129+
* @see #getListenerContainer(String)
130+
*/
131+
@Override
132+
public Collection<MessageListenerContainer> getListenerContainersMatching(Predicate<String> idMatcher) {
133+
Assert.notNull(idMatcher, "'idMatcher' cannot be null");
134+
return this.listenerContainers.entrySet()
135+
.stream()
136+
.filter(entry -> idMatcher.test(entry.getKey()))
137+
.map(Map.Entry::getValue)
138+
.toList();
139+
}
140+
141+
/**
142+
* Return all {@link MessageListenerContainer} instances that satisfy the given bi-predicate.
143+
* The {@code BiPredicate<String, MessageListenerContainer>} takes the container id and the container itself as arguments.
144+
* This allows for more sophisticated filtering, including properties or state of the container itself.
145+
* @param idAndContainerMatcher the bi-predicate to match the container id and the container
146+
* @return the containers that match the bi-predicate criteria or an empty {@link Collection} if no matching containers exist
147+
* @since 3.2
148+
* @see #getListenerContainerIds()
149+
* @see #getListenerContainersMatching(Predicate)
150+
*/
151+
@Override
152+
public Collection<MessageListenerContainer> getListenerContainersMatching(
153+
BiPredicate<String, MessageListenerContainer> idAndContainerMatcher
154+
) {
155+
Assert.notNull(idAndContainerMatcher, "'idAndContainerMatcher' cannot be null");
156+
return this.listenerContainers.entrySet()
157+
.stream()
158+
.filter(entry -> idAndContainerMatcher.test(entry.getKey(), entry.getValue()))
159+
.map(Map.Entry::getValue)
160+
.toList();
161+
}
162+
119163
@Override
120164
@Nullable
121165
public MessageListenerContainer getUnregisteredListenerContainer(String id) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,13 +18,16 @@
1818

1919
import java.util.Collection;
2020
import java.util.Set;
21+
import java.util.function.BiPredicate;
22+
import java.util.function.Predicate;
2123

2224
import org.springframework.lang.Nullable;
2325

2426
/**
2527
* A registry for listener containers.
2628
*
2729
* @author Gary Russell
30+
* @author Joo Hyuk Kim
2831
* @since 2.7
2932
*
3033
*/
@@ -41,6 +44,30 @@ public interface ListenerContainerRegistry {
4144
@Nullable
4245
MessageListenerContainer getListenerContainer(String id);
4346

47+
/**
48+
* Return all {@link MessageListenerContainer} instances with id matching the predicate or
49+
* empty {@link Collection} if no such container exists.
50+
* @param idMatcher the predicate to match the container id with
51+
* @return the containers or empty {@link Collection} if no container with that id exists
52+
* @since 3.2
53+
* @see #getListenerContainerIds()
54+
* @see #getListenerContainer(String)
55+
*/
56+
Collection<MessageListenerContainer> getListenerContainersMatching(Predicate<String> idMatcher);
57+
58+
/**
59+
* Return all {@link MessageListenerContainer} instances that satisfy the given bi-predicate.
60+
* The {@code BiPredicate<String, MessageListenerContainer>} takes the container id and the container itself as arguments.
61+
* This allows for more sophisticated filtering, including properties or state of the container itself.
62+
* @param idAndContainerMatcher the bi-predicate to match the container id and the container
63+
* @return the containers that match the bi-predicate criteria or an empty {@link Collection} if no matching containers exist
64+
* @since 3.2
65+
* @see #getListenerContainerIds()
66+
* @see #getListenerContainersMatching(Predicate)
67+
*/
68+
Collection<MessageListenerContainer> getListenerContainersMatching(
69+
BiPredicate<String, MessageListenerContainer> idAndContainerMatcher);
70+
4471
/**
4572
* Return the {@link MessageListenerContainer} with the specified id or {@code null}
4673
* if no such container exists. Returns containers that are not registered with the

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,17 +17,30 @@
1717
package org.springframework.kafka.config;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2022
import static org.mockito.BDDMockito.given;
2123
import static org.mockito.Mockito.mock;
2224

25+
import java.util.ArrayList;
26+
import java.util.Collection;
27+
import java.util.List;
28+
import java.util.function.BiPredicate;
29+
import java.util.function.Predicate;
30+
import java.util.stream.Stream;
31+
32+
import org.junit.jupiter.api.DisplayName;
2333
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.Arguments;
36+
import org.junit.jupiter.params.provider.MethodSource;
2437

2538
import org.springframework.kafka.listener.MessageListenerContainer;
2639

2740
/**
2841
* @author Gary Russell
42+
* @author Joo Hyuk Kim
2943
* @since 2.8.9
30-
*
3144
*/
3245
public class KafkaListenerEndpointRegistryTests {
3346

@@ -47,4 +60,124 @@ void unregister() {
4760
assertThat(unregistered).isSameAs(container);
4861
}
4962

63+
@DisplayName("getListenerContainersMatching throws on null predicate")
64+
@Test
65+
void getListenerContainersMatchingThrowsOnNullPredicate() {
66+
// Given
67+
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
68+
// When & Then
69+
assertThatIllegalArgumentException()
70+
.isThrownBy(() -> registry.getListenerContainersMatching((Predicate<String>) null))
71+
.withMessage("'idMatcher' cannot be null");
72+
}
73+
74+
75+
@DisplayName("getListenerContainersMatching with BiPredicate throws on null biPredicate")
76+
@Test
77+
void getListenerContainersMatchingBiPredicateThrowsOnNullBiPredicate() {
78+
// Given
79+
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
80+
// When & Then
81+
assertThatIllegalArgumentException()
82+
.isThrownBy(() -> registry.getListenerContainersMatching((BiPredicate<String, MessageListenerContainer>) null))
83+
.withMessage("'idAndContainerMatcher' cannot be null");
84+
}
85+
86+
@DisplayName("getListenerContainersMatching should return unmodifiable list")
87+
@Test
88+
void testGetListenerContainersMatchingReturnsUnmodifiableList() {
89+
// Given
90+
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
91+
registerListenerWithId(registry, "foo");
92+
// When
93+
Collection<MessageListenerContainer> listeners = registry.getListenerContainersMatching(s -> true);
94+
// Then
95+
assertThatExceptionOfType(UnsupportedOperationException.class)
96+
.isThrownBy(() -> listeners.add(mock(MessageListenerContainer.class)));
97+
}
98+
99+
@ParameterizedTest(name = "getListenerContainersMatching({0}, {1}) = {2}")
100+
@MethodSource("paramsForGetListenerContainersMatching")
101+
void getListenerContainersMatching(List<String> names, Predicate<String> idMatcher, int expectedCount) {
102+
// Given
103+
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
104+
registerWithListenerIds(registry, names);
105+
// When
106+
Collection<MessageListenerContainer> listeners = registry.getListenerContainersMatching(idMatcher);
107+
// Then
108+
assertThat(listeners).hasSize(expectedCount);
109+
}
110+
111+
/**
112+
* Provides parameters for the getListenerContainersMatching test.
113+
* Each set of parameters includes a list of names, a predicate, and the expected count of matching containers.
114+
*/
115+
private static Stream<Arguments> paramsForGetListenerContainersMatching() {
116+
List<String> names = List.of("foo", "bar", "baz");
117+
return Stream.of(
118+
// Case : Two names start with "b"
119+
Arguments.of(names, (Predicate<String>) id -> id.startsWith("b"), 2),
120+
// Case : One name starts with "f"
121+
Arguments.of(names, (Predicate<String>) id -> id.startsWith("f"), 1),
122+
// Case : Empty list
123+
Arguments.of(new ArrayList<>(), (Predicate<String>) id -> id.startsWith("b"), 0),
124+
// Case : All names match as the predicate always returns true
125+
Arguments.of(names, (Predicate<String>) id -> true, 3),
126+
// Case : No names match as the predicate always returns false
127+
Arguments.of(names, (Predicate<String>) id -> false, 0)
128+
);
129+
}
130+
131+
@ParameterizedTest(name = "getListenerContainersMatching with BiPredicate for {0}, expecting {2} matches")
132+
@MethodSource("paramsForGetListenerContainersMatchingBiPredicate")
133+
void getListenerContainersMatchingBiPredicate(List<String> names, BiPredicate<String, MessageListenerContainer> idAndContainerMatcher, int expectedCount) {
134+
// Given
135+
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
136+
registerWithListenerIds(registry, names);
137+
// When
138+
Collection<MessageListenerContainer> listeners = registry.getListenerContainersMatching(idAndContainerMatcher);
139+
// Then
140+
assertThat(listeners).hasSize(expectedCount);
141+
}
142+
143+
/**
144+
* Provides parameters for the getListenerContainersMatchingBiPredicate test.
145+
* Each set of parameters includes a list of names, a bi-predicate, and the expected count of matching containers.
146+
*/
147+
private static Stream<Arguments> paramsForGetListenerContainersMatchingBiPredicate() {
148+
List<String> names = List.of("foo", "bar", "baz");
149+
return Stream.of(
150+
// Case : Filters for names starting with "b" and containers that are "running"
151+
Arguments.of(names,
152+
(BiPredicate<String, MessageListenerContainer>) (id, container) -> id.startsWith("b") && container.isRunning(), 2),
153+
// Case : Filters for names starting with "f" and containers that are "running"
154+
Arguments.of(names,
155+
(BiPredicate<String, MessageListenerContainer>) (id, container) -> id.startsWith("f") && container.isRunning(), 1),
156+
// Case : Filters in an empty list of names
157+
Arguments.of(new ArrayList<>(),
158+
(BiPredicate<String, MessageListenerContainer>) (id, container) -> id.startsWith("b") && container.isRunning(), 0),
159+
// Case : Filters where all containers are considered "running"
160+
Arguments.of(names,
161+
(BiPredicate<String, MessageListenerContainer>) (id, container) -> container.isRunning(), 3),
162+
// Case : Filters where no containers are considered "running"
163+
Arguments.of(names,
164+
(BiPredicate<String, MessageListenerContainer>) (id, container) -> !container.isRunning(), 0)
165+
);
166+
}
167+
168+
private static void registerWithListenerIds(KafkaListenerEndpointRegistry registry, List<String> names) {
169+
names.forEach(name -> registerListenerWithId(registry, name));
170+
}
171+
172+
private static void registerListenerWithId(KafkaListenerEndpointRegistry registry, String id) {
173+
KafkaListenerEndpoint endpoint = mock(KafkaListenerEndpoint.class);
174+
@SuppressWarnings("unchecked")
175+
KafkaListenerContainerFactory<MessageListenerContainer> factory = mock(KafkaListenerContainerFactory.class);
176+
given(endpoint.getId()).willReturn(id);
177+
MessageListenerContainer container = mock(MessageListenerContainer.class);
178+
given(container.isRunning()).willReturn(true);
179+
given(factory.createListenerContainer(endpoint)).willReturn(container);
180+
registry.registerListenerContainer(endpoint, factory);
181+
}
182+
50183
}

0 commit comments

Comments
 (0)