Skip to content

Commit 76dd53f

Browse files
committed
Initial Auto Config for spring-rabbit-stream
- new container factory (not in existing hierarchy) - also add `ContainerCustomizer` to other factories
1 parent 289ebef commit 76dd53f

File tree

9 files changed

+351
-14
lines changed

9 files changed

+351
-14
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ allprojects {
3030
}
3131
if (version.endsWith('-SNAPSHOT')) {
3232
maven { url "https://repo.spring.io/snapshot" }
33+
maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
3334
}
3435
}
3536

spring-boot-project/spring-boot-autoconfigure/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ dependencies {
5858
optional("org.apache.commons:commons-dbcp2")
5959
optional("org.apache.httpcomponents.client5:httpclient5")
6060
optional("org.apache.kafka:kafka-streams")
61+
optional("org.apache.qpid:proton-j")
6162
optional("org.apache.solr:solr-solrj")
6263
optional("org.apache.tomcat.embed:tomcat-embed-core")
6364
optional("org.apache.tomcat.embed:tomcat-embed-el")
@@ -142,6 +143,7 @@ dependencies {
142143
optional("org.springframework.session:spring-session-hazelcast")
143144
optional("org.springframework.session:spring-session-jdbc")
144145
optional("org.springframework.amqp:spring-rabbit")
146+
optional("org.springframework.amqp:spring-rabbit-stream")
145147
optional("org.springframework.kafka:spring-kafka")
146148
optional("org.springframework.ws:spring-ws-core")
147149
optional("org.thymeleaf:thymeleaf")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import java.util.stream.Collectors;
2020

2121
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
22+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
2223
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
2324
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
2425
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
2526
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
27+
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
28+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
2629
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
2730
import org.springframework.amqp.support.converter.MessageConverter;
2831
import org.springframework.beans.factory.ObjectProvider;
@@ -48,14 +51,23 @@ class RabbitAnnotationDrivenConfiguration {
4851

4952
private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
5053

54+
private final ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizers;
55+
56+
private final ObjectProvider<ContainerCustomizer<DirectMessageListenerContainer>> directContainerCustomizers;
57+
5158
private final RabbitProperties properties;
5259

5360
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
5461
ObjectProvider<MessageRecoverer> messageRecoverer,
55-
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers, RabbitProperties properties) {
62+
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,
63+
ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizers,
64+
ObjectProvider<ContainerCustomizer<DirectMessageListenerContainer>> directContainerCustomizers,
65+
RabbitProperties properties) {
5666
this.messageConverter = messageConverter;
5767
this.messageRecoverer = messageRecoverer;
5868
this.retryTemplateCustomizers = retryTemplateCustomizers;
69+
this.simpleContainerCustomizers = simpleContainerCustomizers;
70+
this.directContainerCustomizers = directContainerCustomizers;
5971
this.properties = properties;
6072
}
6173

@@ -79,6 +91,7 @@ SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
7991
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
8092
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
8193
configurer.configure(factory, connectionFactory);
94+
factory.setContainerCustomizer(this.simpleContainerCustomizers.getIfUnique());
8295
return factory;
8396
}
8497

@@ -101,6 +114,7 @@ DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
101114
DirectRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
102115
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
103116
configurer.configure(factory, connectionFactory);
117+
factory.setContainerCustomizer(this.directContainerCustomizers.getIfUnique());
104118
return factory;
105119
}
106120

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
@Configuration(proxyBeanMethods = false)
8989
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
9090
@EnableConfigurationProperties(RabbitProperties.class)
91-
@Import(RabbitAnnotationDrivenConfiguration.class)
91+
@Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamAutoConfiguration.class })
9292
public class RabbitAutoConfiguration {
9393

9494
@Configuration(proxyBeanMethods = false)

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class RabbitProperties {
5151

5252
private static final int DEFAULT_PORT_SECURE = 5671;
5353

54+
private static final int DEFAULT_STREAM_PORT = 5552;
55+
5456
/**
5557
* RabbitMQ host. Ignored if an address is set.
5658
*/
@@ -137,6 +139,8 @@ public class RabbitProperties {
137139

138140
private final Template template = new Template();
139141

142+
private final Stream stream = new Stream();
143+
140144
private List<Address> parsedAddresses;
141145

142146
public String getHost() {
@@ -361,6 +365,10 @@ public Template getTemplate() {
361365
return this.template;
362366
}
363367

368+
public Stream getStream() {
369+
return this.stream;
370+
}
371+
364372
public class Ssl {
365373

366374
private static final String SUN_X509 = "SunX509";
@@ -629,7 +637,12 @@ public enum ContainerType {
629637
* Container where the listener is invoked directly on the RabbitMQ consumer
630638
* thread.
631639
*/
632-
DIRECT
640+
DIRECT,
641+
642+
/**
643+
* Container that uses the RabbitMQ Stream Client.
644+
*/
645+
STREAM
633646

634647
}
635648

@@ -644,6 +657,8 @@ public static class Listener {
644657

645658
private final DirectContainer direct = new DirectContainer();
646659

660+
private final StreamContainer stream = new StreamContainer();
661+
647662
public ContainerType getType() {
648663
return this.type;
649664
}
@@ -660,15 +675,31 @@ public DirectContainer getDirect() {
660675
return this.direct;
661676
}
662677

678+
public StreamContainer getStream() {
679+
return this.stream;
680+
}
681+
663682
}
664683

665-
public abstract static class AmqpContainer {
684+
public abstract static class BaseContainer {
666685

667686
/**
668687
* Whether to start the container automatically on startup.
669688
*/
670689
private boolean autoStartup = true;
671690

691+
public boolean isAutoStartup() {
692+
return this.autoStartup;
693+
}
694+
695+
public void setAutoStartup(boolean autoStartup) {
696+
this.autoStartup = autoStartup;
697+
}
698+
699+
}
700+
701+
public abstract static class AmqpContainer extends BaseContainer {
702+
672703
/**
673704
* Acknowledge mode of container.
674705
*/
@@ -701,14 +732,6 @@ public abstract static class AmqpContainer {
701732
*/
702733
private final ListenerRetry retry = new ListenerRetry();
703734

704-
public boolean isAutoStartup() {
705-
return this.autoStartup;
706-
}
707-
708-
public void setAutoStartup(boolean autoStartup) {
709-
this.autoStartup = autoStartup;
710-
}
711-
712735
public AcknowledgeMode getAcknowledgeMode() {
713736
return this.acknowledgeMode;
714737
}
@@ -871,6 +894,20 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
871894

872895
}
873896

897+
public static class StreamContainer extends BaseContainer {
898+
899+
boolean nativeListener;
900+
901+
public boolean isNativeListener() {
902+
return this.nativeListener;
903+
}
904+
905+
public void setNativeListener(boolean nativeListener) {
906+
this.nativeListener = nativeListener;
907+
}
908+
909+
}
910+
874911
public static class Template {
875912

876913
private final Retry retry = new Retry();
@@ -1128,4 +1165,48 @@ private boolean determineSslEnabled(boolean sslEnabled) {
11281165

11291166
}
11301167

1168+
public static final class Stream {
1169+
1170+
private String host = "localhost";
1171+
1172+
private int port = DEFAULT_STREAM_PORT;
1173+
1174+
private String userName;
1175+
1176+
private String password;
1177+
1178+
public String getHost() {
1179+
return this.host;
1180+
}
1181+
1182+
public void setHost(String host) {
1183+
this.host = host;
1184+
}
1185+
1186+
public int getPort() {
1187+
return this.port;
1188+
}
1189+
1190+
public void setPort(int port) {
1191+
this.port = port;
1192+
}
1193+
1194+
public String getUserName() {
1195+
return this.userName;
1196+
}
1197+
1198+
public void setUserName(String userName) {
1199+
this.userName = userName;
1200+
}
1201+
1202+
public String getPassword() {
1203+
return this.password;
1204+
}
1205+
1206+
public void setPassword(String password) {
1207+
this.password = password;
1208+
}
1209+
1210+
}
1211+
11311212
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2021-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.amqp;
18+
19+
import com.rabbitmq.stream.Environment;
20+
import com.rabbitmq.stream.EnvironmentBuilder;
21+
22+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
23+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
24+
import org.springframework.beans.factory.ObjectProvider;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
26+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
31+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
32+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
33+
34+
/**
35+
* Configuration for Spring RabbitMQ Stream Plugin support.
36+
*
37+
* @author Gary Russell
38+
* @since 2.6
39+
*/
40+
@Configuration(proxyBeanMethods = false)
41+
@ConditionalOnClass(EnableRabbit.class)
42+
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream")
43+
public class RabbitStreamAutoConfiguration {
44+
45+
@Bean(name = "rabbitListenerContainerFactory")
46+
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
47+
StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
48+
RabbitProperties properties, ObjectProvider<ConsumerCustomizer> consumerCustomizers,
49+
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizers) {
50+
51+
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
52+
rabbitStreamEnvironment);
53+
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
54+
if (consumerCustomizers.getIfUnique() != null) {
55+
factory.setConsumerCustomizer(consumerCustomizers.getIfUnique());
56+
}
57+
if (containerCustomizers.getIfUnique() != null) {
58+
factory.setContainerCustomizer(containerCustomizers.getIfUnique());
59+
}
60+
return factory;
61+
}
62+
63+
@Bean(name = "rabbitStreamEnvironment")
64+
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
65+
Environment rabbitStreamEnvironment(RabbitProperties properties) {
66+
RabbitProperties.Stream stream = properties.getStream();
67+
String username = stream.getUserName();
68+
if (username == null) {
69+
username = properties.getUsername();
70+
}
71+
String password = stream.getPassword();
72+
if (password == null) {
73+
password = properties.getPassword();
74+
}
75+
EnvironmentBuilder builder = Environment.builder().lazyInitialization(true).host(stream.getHost())
76+
.port(stream.getPort());
77+
if (username != null) {
78+
builder.username(username);
79+
}
80+
if (password != null) {
81+
builder.password(password);
82+
}
83+
return builder.build();
84+
}
85+
86+
}

0 commit comments

Comments
 (0)