Skip to content

Commit 9784838

Browse files
garyrussellwilkinsona
authored andcommitted
Add auto-configuration for spring-rabbit-stream
See gh-27480
1 parent d467de5 commit 9784838

File tree

6 files changed

+284
-11
lines changed

6 files changed

+284
-11
lines changed

spring-boot-project/spring-boot-autoconfigure/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ dependencies {
142142
optional("org.springframework.session:spring-session-hazelcast")
143143
optional("org.springframework.session:spring-session-jdbc")
144144
optional("org.springframework.amqp:spring-rabbit")
145+
optional("org.springframework.amqp:spring-rabbit-stream")
145146
optional("org.springframework.kafka:spring-kafka")
146147
optional("org.springframework.ws:spring-ws-core")
147148
optional("org.thymeleaf:thymeleaf")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
@Configuration(proxyBeanMethods = false)
8989
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
9090
@EnableConfigurationProperties(RabbitProperties.class)
91-
@Import(RabbitAnnotationDrivenConfiguration.class)
91+
@Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamConfiguration.class })
9292
public class RabbitAutoConfiguration {
9393

9494
@Configuration(proxyBeanMethods = false)

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class RabbitProperties {
5151

5252
private static final int DEFAULT_PORT_SECURE = 5671;
5353

54+
private static final int DEFAULT_STREAM_PORT = 5552;
55+
5456
/**
5557
* RabbitMQ host. Ignored if an address is set.
5658
*/
@@ -137,6 +139,8 @@ public class RabbitProperties {
137139

138140
private final Template template = new Template();
139141

142+
private final Stream stream = new Stream();
143+
140144
private List<Address> parsedAddresses;
141145

142146
public String getHost() {
@@ -361,6 +365,10 @@ public Template getTemplate() {
361365
return this.template;
362366
}
363367

368+
public Stream getStream() {
369+
return this.stream;
370+
}
371+
364372
public class Ssl {
365373

366374
private static final String SUN_X509 = "SunX509";
@@ -629,7 +637,12 @@ public enum ContainerType {
629637
* Container where the listener is invoked directly on the RabbitMQ consumer
630638
* thread.
631639
*/
632-
DIRECT
640+
DIRECT,
641+
642+
/**
643+
* Container that uses the RabbitMQ Stream Client.
644+
*/
645+
STREAM
633646

634647
}
635648

@@ -644,6 +657,8 @@ public static class Listener {
644657

645658
private final DirectContainer direct = new DirectContainer();
646659

660+
private final StreamContainer stream = new StreamContainer();
661+
647662
public ContainerType getType() {
648663
return this.type;
649664
}
@@ -660,15 +675,31 @@ public DirectContainer getDirect() {
660675
return this.direct;
661676
}
662677

678+
public StreamContainer getStream() {
679+
return this.stream;
680+
}
681+
663682
}
664683

665-
public abstract static class AmqpContainer {
684+
public abstract static class BaseContainer {
666685

667686
/**
668687
* Whether to start the container automatically on startup.
669688
*/
670689
private boolean autoStartup = true;
671690

691+
public boolean isAutoStartup() {
692+
return this.autoStartup;
693+
}
694+
695+
public void setAutoStartup(boolean autoStartup) {
696+
this.autoStartup = autoStartup;
697+
}
698+
699+
}
700+
701+
public abstract static class AmqpContainer extends BaseContainer {
702+
672703
/**
673704
* Acknowledge mode of container.
674705
*/
@@ -701,14 +732,6 @@ public abstract static class AmqpContainer {
701732
*/
702733
private final ListenerRetry retry = new ListenerRetry();
703734

704-
public boolean isAutoStartup() {
705-
return this.autoStartup;
706-
}
707-
708-
public void setAutoStartup(boolean autoStartup) {
709-
this.autoStartup = autoStartup;
710-
}
711-
712735
public AcknowledgeMode getAcknowledgeMode() {
713736
return this.acknowledgeMode;
714737
}
@@ -871,6 +894,24 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
871894

872895
}
873896

897+
public static class StreamContainer extends BaseContainer {
898+
899+
/**
900+
* When true, the container factory will create containers that support listeners
901+
* that consume native stream messages instead of spring-amqp {@code Message}s.
902+
*/
903+
boolean nativeListener;
904+
905+
public boolean isNativeListener() {
906+
return this.nativeListener;
907+
}
908+
909+
public void setNativeListener(boolean nativeListener) {
910+
this.nativeListener = nativeListener;
911+
}
912+
913+
}
914+
874915
public static class Template {
875916

876917
private final Retry retry = new Retry();
@@ -1128,4 +1169,62 @@ private boolean determineSslEnabled(boolean sslEnabled) {
11281169

11291170
}
11301171

1172+
public static final class Stream {
1173+
1174+
/**
1175+
* Host of a RabbitMQ instance with the Stream Plugin Enabled
1176+
*/
1177+
private String host = "localhost";
1178+
1179+
/**
1180+
* Stream port of a RabbitMQ instance with the Stream Plugin Enabled
1181+
*/
1182+
private int port = DEFAULT_STREAM_PORT;
1183+
1184+
/**
1185+
* Login user to authenticate to the broker. If not set
1186+
* {@code spring.rabbitmq.username} will be used.
1187+
*/
1188+
private String username;
1189+
1190+
/**
1191+
* Login password to authenticate to the broker. If not set
1192+
* {@code spring.rabbitmq.password} will be used.
1193+
*/
1194+
private String password;
1195+
1196+
public String getHost() {
1197+
return this.host;
1198+
}
1199+
1200+
public void setHost(String host) {
1201+
this.host = host;
1202+
}
1203+
1204+
public int getPort() {
1205+
return this.port;
1206+
}
1207+
1208+
public void setPort(int port) {
1209+
this.port = port;
1210+
}
1211+
1212+
public String getUsername() {
1213+
return this.username;
1214+
}
1215+
1216+
public void setUsername(String username) {
1217+
this.username = username;
1218+
}
1219+
1220+
public String getPassword() {
1221+
return this.password;
1222+
}
1223+
1224+
public void setPassword(String password) {
1225+
this.password = password;
1226+
}
1227+
1228+
}
1229+
11311230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2021-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.amqp;
18+
19+
import com.rabbitmq.stream.Environment;
20+
import com.rabbitmq.stream.EnvironmentBuilder;
21+
22+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
23+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
24+
import org.springframework.beans.factory.ObjectProvider;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
26+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
31+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
32+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
33+
34+
/**
35+
* Configuration for Spring RabbitMQ Stream Plugin support.
36+
*
37+
* @author Gary Russell
38+
*/
39+
@Configuration(proxyBeanMethods = false)
40+
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
41+
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream")
42+
class RabbitStreamConfiguration {
43+
44+
@Bean(name = "rabbitListenerContainerFactory")
45+
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
46+
StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
47+
RabbitProperties properties, ObjectProvider<ConsumerCustomizer> consumerCustomizer,
48+
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
49+
50+
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
51+
rabbitStreamEnvironment);
52+
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
53+
consumerCustomizer.ifUnique(factory::setConsumerCustomizer);
54+
containerCustomizer.ifUnique(factory::setContainerCustomizer);
55+
return factory;
56+
}
57+
58+
@Bean(name = "rabbitStreamEnvironment")
59+
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
60+
Environment rabbitStreamEnvironment(RabbitProperties properties) {
61+
RabbitProperties.Stream stream = properties.getStream();
62+
String username = stream.getUsername();
63+
if (username == null) {
64+
username = properties.getUsername();
65+
}
66+
String password = stream.getPassword();
67+
if (password == null) {
68+
password = properties.getPassword();
69+
}
70+
EnvironmentBuilder builder = Environment.builder().lazyInitialization(true).host(stream.getHost())
71+
.port(stream.getPort());
72+
if (username != null) {
73+
builder.username(username);
74+
}
75+
if (password != null) {
76+
builder.password(password);
77+
}
78+
return builder.build();
79+
}
80+
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2012-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.amqp;
18+
19+
import org.junit.jupiter.api.Test;
20+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
21+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
22+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
23+
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
24+
import org.springframework.beans.DirectFieldAccessor;
25+
import org.springframework.boot.autoconfigure.AutoConfigurations;
26+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
30+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
31+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
/**
36+
* Tests for {@link RabbitStreamConfiguration}.
37+
*
38+
* @author Gary Russell
39+
*/
40+
class RabbitStreamConfigurationTests {
41+
42+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
43+
.withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class));
44+
45+
@Test
46+
void testContainerType() {
47+
this.contextRunner.withUserConfiguration(TestConfiguration.class)
48+
.withPropertyValues("spring.rabbitmq.listener.type:stream",
49+
"spring.rabbitmq.listener.stream.native-listener:true")
50+
.run((context) -> {
51+
RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
52+
assertThat(registry.getListenerContainer("test")).isInstanceOf(StreamListenerContainer.class);
53+
assertThat(new DirectFieldAccessor(registry.getListenerContainer("test"))
54+
.getPropertyValue("consumerCustomizer")).isNotNull();
55+
assertThat(new DirectFieldAccessor(context.getBean(StreamRabbitListenerContainerFactory.class))
56+
.getPropertyValue("nativeListener")).isEqualTo(Boolean.TRUE);
57+
assertThat(context.getBean(TestConfiguration.class).containerCustomizerCalled).isTrue();
58+
});
59+
}
60+
61+
@Configuration(proxyBeanMethods = false)
62+
@EnableRabbit
63+
static class TestConfiguration {
64+
65+
boolean containerCustomizerCalled;
66+
67+
@RabbitListener(id = "test", queues = "stream", autoStartup = "false")
68+
void listen(String in) {
69+
}
70+
71+
@Bean
72+
ConsumerCustomizer consumerCustomizer() {
73+
return (id, consumer) -> {
74+
};
75+
}
76+
77+
@Bean
78+
ContainerCustomizer<StreamListenerContainer> containerCustomizer() {
79+
return (container) -> this.containerCustomizerCalled = true;
80+
}
81+
82+
}
83+
84+
}

spring-boot-project/spring-boot-dependencies/build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,13 @@ bom {
14161416
]
14171417
}
14181418
}
1419+
library("Rabbit Stream Client", "0.3.0") {
1420+
group("com.rabbitmq") {
1421+
modules = [
1422+
"stream-client"
1423+
]
1424+
}
1425+
}
14191426
library("Reactive Streams", "1.0.3") {
14201427
group("org.reactivestreams") {
14211428
modules = [
@@ -1644,6 +1651,7 @@ bom {
16441651
modules = [
16451652
"spring-amqp",
16461653
"spring-rabbit",
1654+
"spring-rabbit-stream",
16471655
"spring-rabbit-junit",
16481656
"spring-rabbit-test"
16491657
]

0 commit comments

Comments
 (0)