Skip to content

Commit e37697f

Browse files
committed
spring-projectsGH-1880: SpEL Improvements for Property Overrides
Resolves spring-projects#1880 SpEL can resolve to `String`, `String[]` or `Collection<String>`.
1 parent c9d0b48 commit e37697f

File tree

3 files changed

+49
-8
lines changed

3 files changed

+49
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@
247247
* <li>{@code key value}</li>
248248
* </ul>
249249
* {@code group.id} and {@code client.id} are ignored.
250+
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
251+
* SpEL expressions must resolve to a {@link String}, a @{link String[]} or a
252+
* {@code Collection<String>} where each member of the array or collection is a
253+
* property name + value with the above formats.
250254
* @return the properties.
251255
* @since 2.2.4
252256
* @see org.apache.kafka.clients.consumer.ConsumerConfig

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -676,24 +676,46 @@ protected String noBeanFoundMessage(Object target, String listenerBeanName, Stri
676676
+ requestedBeanName + "' was found in the application context";
677677
}
678678

679+
@SuppressWarnings("unchecked")
679680
private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
680681
if (propertyStrings.length > 0) {
681682
Properties properties = new Properties();
682683
for (String property : propertyStrings) {
683-
String value = resolveExpressionAsString(property, "property");
684-
if (value != null) {
685-
try {
686-
properties.load(new StringReader(value));
684+
Object value = resolveExpression(property);
685+
if (value instanceof String) {
686+
loadProperty(properties, property, value);
687+
}
688+
else if (value instanceof String[]) {
689+
for (String prop : (String[]) value) {
690+
loadProperty(properties, prop, prop);
687691
}
688-
catch (IOException e) {
689-
this.logger.error(e, () -> "Failed to load property " + property + ", continuing...");
692+
}
693+
else if (value instanceof Collection) {
694+
Collection<?> values = (Collection<?>) value;
695+
if (values.size() > 0 && values.iterator().next() instanceof String) {
696+
for (String prop : (Collection<String>) value) {
697+
loadProperty(properties, prop, prop);
698+
}
690699
}
691700
}
701+
else {
702+
throw new IllegalStateException("'properties' must resolve to a String, a String[] or "
703+
+ "Collection<String>");
704+
}
692705
}
693706
endpoint.setConsumerProperties(properties);
694707
}
695708
}
696709

710+
private void loadProperty(Properties properties, String property, Object value) {
711+
try {
712+
properties.load(new StringReader((String) value));
713+
}
714+
catch (IOException e) {
715+
this.logger.error(e, () -> "Failed to load property " + property + ", continuing...");
716+
}
717+
}
718+
697719
private String getEndpointId(KafkaListener kafkaListener) {
698720
if (StringUtils.hasText(kafkaListener.id())) {
699721
return resolveExpressionAsString(kafkaListener.id(), "id");

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
import org.springframework.retry.support.RetryTemplate;
145145
import org.springframework.stereotype.Component;
146146
import org.springframework.test.annotation.DirtiesContext;
147+
import org.springframework.test.context.TestPropertySource;
147148
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
148149
import org.springframework.transaction.PlatformTransactionManager;
149150
import org.springframework.transaction.annotation.EnableTransactionManagement;
@@ -177,6 +178,7 @@
177178
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
178179
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
179180
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41" })
181+
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
180182
public class EnableKafkaIntegrationTests {
181183

182184
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
@@ -350,6 +352,10 @@ public void testSimple() throws Exception {
350352
.isEqualTo("fiz");
351353
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
352354
.isEqualTo("clientIdViaAnnotation-0");
355+
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.fetcher.maxPollRecords"))
356+
.isEqualTo(10);
357+
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.fetcher.minBytes"))
358+
.isEqualTo(420000);
353359

354360
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
355361
assertThat(rebalanceConcurrentContainer).isNotNull();
@@ -488,6 +494,10 @@ public void testJson() throws Exception {
488494
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
489495
"listenerConsumer.consumer.groupId", Optional.class).get())
490496
.isEqualTo("buz.explicitGroupId");
497+
assertThat(KafkaTestUtils.getPropertyValue(buzContainer, "listenerConsumer.consumer.fetcher.maxPollRecords"))
498+
.isEqualTo(5);
499+
assertThat(KafkaTestUtils.getPropertyValue(buzContainer, "listenerConsumer.consumer.fetcher.minBytes"))
500+
.isEqualTo(123456);
491501
}
492502

493503
@Test
@@ -1649,6 +1659,11 @@ protected Object convertToInternal(Object payload, @Nullable MessageHeaders head
16491659
};
16501660
}
16511661

1662+
@Bean
1663+
List<String> buzProps() {
1664+
return List.of("max.poll.records: 5", "fetch.min.bytes: 123456");
1665+
}
1666+
16521667
}
16531668

16541669
@Component
@@ -1845,14 +1860,14 @@ public void eventHandler(ListenerContainerNoLongerIdleEvent event) {
18451860
@TopicPartition(topic = "annotated6", partitions = "0",
18461861
partitionOffsets = @PartitionOffset(partition = "${xxx:1}", initialOffset = "${yyy:0}",
18471862
relativeToCurrent = "${zzz:true}"))
1848-
}, clientIdPrefix = "${foo.xxx:clientIdViaAnnotation}")
1863+
}, clientIdPrefix = "${foo.xxx:clientIdViaAnnotation}", properties = "#{'${spel.props}'.split(',')}")
18491864
public void listen5(ConsumerRecord<?, ?> record) {
18501865
this.capturedRecord = record;
18511866
this.latch5.countDown();
18521867
}
18531868

18541869
@KafkaListener(id = "buz", topics = "annotated10", containerFactory = "kafkaJsonListenerContainerFactory",
1855-
groupId = "buz.explicitGroupId")
1870+
groupId = "buz.explicitGroupId", properties = "#{@buzProps}")
18561871
public void listen6(Foo foo) {
18571872
this.foo = foo;
18581873
this.latch6.countDown();

0 commit comments

Comments
 (0)