Skip to content

Commit fdde40e

Browse files
committed
Merge pull request #27480 from garyrussell
* gh-27480: Polish "Add auto-configuration for spring-rabbit-stream" Add auto-configuration for spring-rabbit-stream Closes gh-27480
2 parents d467de5 + 7a0fe0f commit fdde40e

File tree

6 files changed

+399
-11
lines changed

6 files changed

+399
-11
lines changed

spring-boot-project/spring-boot-autoconfigure/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ dependencies {
142142
optional("org.springframework.session:spring-session-hazelcast")
143143
optional("org.springframework.session:spring-session-jdbc")
144144
optional("org.springframework.amqp:spring-rabbit")
145+
optional("org.springframework.amqp:spring-rabbit-stream")
145146
optional("org.springframework.kafka:spring-kafka")
146147
optional("org.springframework.ws:spring-ws-core")
147148
optional("org.thymeleaf:thymeleaf")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
@Configuration(proxyBeanMethods = false)
8989
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
9090
@EnableConfigurationProperties(RabbitProperties.class)
91-
@Import(RabbitAnnotationDrivenConfiguration.class)
91+
@Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamConfiguration.class })
9292
public class RabbitAutoConfiguration {
9393

9494
@Configuration(proxyBeanMethods = false)

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class RabbitProperties {
5151

5252
private static final int DEFAULT_PORT_SECURE = 5671;
5353

54+
private static final int DEFAULT_STREAM_PORT = 5552;
55+
5456
/**
5557
* RabbitMQ host. Ignored if an address is set.
5658
*/
@@ -137,6 +139,8 @@ public class RabbitProperties {
137139

138140
private final Template template = new Template();
139141

142+
private final Stream stream = new Stream();
143+
140144
private List<Address> parsedAddresses;
141145

142146
public String getHost() {
@@ -361,6 +365,10 @@ public Template getTemplate() {
361365
return this.template;
362366
}
363367

368+
public Stream getStream() {
369+
return this.stream;
370+
}
371+
364372
public class Ssl {
365373

366374
private static final String SUN_X509 = "SunX509";
@@ -629,7 +637,12 @@ public enum ContainerType {
629637
* Container where the listener is invoked directly on the RabbitMQ consumer
630638
* thread.
631639
*/
632-
DIRECT
640+
DIRECT,
641+
642+
/**
643+
* Container that uses the RabbitMQ Stream Client.
644+
*/
645+
STREAM
633646

634647
}
635648

@@ -644,6 +657,8 @@ public static class Listener {
644657

645658
private final DirectContainer direct = new DirectContainer();
646659

660+
private final StreamContainer stream = new StreamContainer();
661+
647662
public ContainerType getType() {
648663
return this.type;
649664
}
@@ -660,15 +675,31 @@ public DirectContainer getDirect() {
660675
return this.direct;
661676
}
662677

678+
public StreamContainer getStream() {
679+
return this.stream;
680+
}
681+
663682
}
664683

665-
public abstract static class AmqpContainer {
684+
public abstract static class BaseContainer {
666685

667686
/**
668687
* Whether to start the container automatically on startup.
669688
*/
670689
private boolean autoStartup = true;
671690

691+
public boolean isAutoStartup() {
692+
return this.autoStartup;
693+
}
694+
695+
public void setAutoStartup(boolean autoStartup) {
696+
this.autoStartup = autoStartup;
697+
}
698+
699+
}
700+
701+
public abstract static class AmqpContainer extends BaseContainer {
702+
672703
/**
673704
* Acknowledge mode of container.
674705
*/
@@ -701,14 +732,6 @@ public abstract static class AmqpContainer {
701732
*/
702733
private final ListenerRetry retry = new ListenerRetry();
703734

704-
public boolean isAutoStartup() {
705-
return this.autoStartup;
706-
}
707-
708-
public void setAutoStartup(boolean autoStartup) {
709-
this.autoStartup = autoStartup;
710-
}
711-
712735
public AcknowledgeMode getAcknowledgeMode() {
713736
return this.acknowledgeMode;
714737
}
@@ -871,6 +894,24 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
871894

872895
}
873896

897+
public static class StreamContainer extends BaseContainer {
898+
899+
/**
900+
* Whether the container will support listeners that consume native stream
901+
* messages instead of Spring AMQP messages.
902+
*/
903+
boolean nativeListener;
904+
905+
public boolean isNativeListener() {
906+
return this.nativeListener;
907+
}
908+
909+
public void setNativeListener(boolean nativeListener) {
910+
this.nativeListener = nativeListener;
911+
}
912+
913+
}
914+
874915
public static class Template {
875916

876917
private final Retry retry = new Retry();
@@ -1128,4 +1169,62 @@ private boolean determineSslEnabled(boolean sslEnabled) {
11281169

11291170
}
11301171

1172+
public static final class Stream {
1173+
1174+
/**
1175+
* Host of a RabbitMQ instance with the Stream plugin enabled.
1176+
*/
1177+
private String host = "localhost";
1178+
1179+
/**
1180+
* Stream port of a RabbitMQ instance with the Stream plugin enabled.
1181+
*/
1182+
private int port = DEFAULT_STREAM_PORT;
1183+
1184+
/**
1185+
* Login user to authenticate to the broker. When not set,
1186+
* spring.rabbitmq.username is used.
1187+
*/
1188+
private String username;
1189+
1190+
/**
1191+
* Login password to authenticate to the broker. When not set
1192+
* spring.rabbitmq.password is used.
1193+
*/
1194+
private String password;
1195+
1196+
public String getHost() {
1197+
return this.host;
1198+
}
1199+
1200+
public void setHost(String host) {
1201+
this.host = host;
1202+
}
1203+
1204+
public int getPort() {
1205+
return this.port;
1206+
}
1207+
1208+
public void setPort(int port) {
1209+
this.port = port;
1210+
}
1211+
1212+
public String getUsername() {
1213+
return this.username;
1214+
}
1215+
1216+
public void setUsername(String username) {
1217+
this.username = username;
1218+
}
1219+
1220+
public String getPassword() {
1221+
return this.password;
1222+
}
1223+
1224+
public void setPassword(String password) {
1225+
this.password = password;
1226+
}
1227+
1228+
}
1229+
11311230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2021-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.amqp;
18+
19+
import java.util.function.Function;
20+
import java.util.function.Supplier;
21+
22+
import com.rabbitmq.stream.Environment;
23+
import com.rabbitmq.stream.EnvironmentBuilder;
24+
25+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
26+
import org.springframework.beans.factory.ObjectProvider;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
29+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
30+
import org.springframework.boot.context.properties.PropertyMapper;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
34+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
35+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
36+
37+
/**
38+
* Configuration for Spring RabbitMQ Stream plugin support.
39+
*
40+
* @author Gary Russell
41+
*/
42+
@Configuration(proxyBeanMethods = false)
43+
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
44+
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream")
45+
class RabbitStreamConfiguration {
46+
47+
@Bean(name = "rabbitListenerContainerFactory")
48+
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
49+
StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
50+
RabbitProperties properties, ObjectProvider<ConsumerCustomizer> consumerCustomizer,
51+
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
52+
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
53+
rabbitStreamEnvironment);
54+
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
55+
consumerCustomizer.ifUnique(factory::setConsumerCustomizer);
56+
containerCustomizer.ifUnique(factory::setContainerCustomizer);
57+
return factory;
58+
}
59+
60+
@Bean(name = "rabbitStreamEnvironment")
61+
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
62+
Environment rabbitStreamEnvironment(RabbitProperties properties) {
63+
return configure(Environment.builder(), properties).build();
64+
}
65+
66+
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
67+
builder.lazyInitialization(true);
68+
RabbitProperties.Stream stream = properties.getStream();
69+
PropertyMapper mapper = PropertyMapper.get();
70+
mapper.from(stream.getHost()).to(builder::host);
71+
mapper.from(stream.getPort()).to(builder::port);
72+
mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
73+
mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
74+
return builder;
75+
}
76+
77+
private static Function<String, String> withFallback(Supplier<String> fallback) {
78+
return (value) -> (value != null) ? value : fallback.get();
79+
}
80+
81+
}

0 commit comments

Comments
 (0)