Skip to content

Commit ed23236

Browse files
committed
Improve Java DSL for Rabbit Streams
* Expose simple properties for `streamName` and `superStream` on the `RabbitStreamInboundChannelAdapterSpec` * Add `superStream(String superStream, String name, int consumers)` option * Add `outboundStreamAdapter(Environment environment, String streamName)` factory for simple use-cases * Add `RabbitStreamTests` integration test to cover Rabbit Streams support and demonstrate respective Java DSL * Mention the change in the docs
1 parent fa178c3 commit ed23236

File tree

6 files changed

+262
-31
lines changed

6 files changed

+262
-31
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,13 +19,16 @@
1919
import com.rabbitmq.stream.Codec;
2020
import com.rabbitmq.stream.Environment;
2121

22+
import org.springframework.lang.Nullable;
2223
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
2324
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
2425

2526
/**
2627
* Factory class for RabbitMQ components.
2728
*
2829
* @author Gary Russell
30+
* @author Artem Bilan
31+
*
2932
* @since 6.0
3033
*
3134
*/
@@ -37,8 +40,8 @@ private RabbitStream() {
3740
/**
3841
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
3942
* with the provided {@link StreamListenerContainer}.
40-
* Note: only endpoint options are available from spec.
41-
* The {@code listenerContainer} options should be specified
43+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
44+
* or the {@code listenerContainer} options should be specified
4245
* on the provided {@link StreamListenerContainer} using
4346
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
4447
* @param listenerContainer the listenerContainer.
@@ -51,32 +54,43 @@ public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListene
5154
/**
5255
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
5356
* with the provided {@link Environment}.
54-
* Note: only endpoint options are available from spec.
55-
* The {@code listenerContainer} options should be specified
57+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
58+
* or the {@code listenerContainer} options should be specified
5659
* on the provided {@link StreamListenerContainer} using
5760
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
5861
* @param environment the environment.
5962
* @return the RabbitInboundChannelAdapterSLCSpec.
6063
*/
6164
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) {
62-
return new RabbitStreamInboundChannelAdapterSpec(environment, null);
65+
return inboundAdapter(environment, null);
6366
}
6467

6568
/**
6669
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
6770
* with the provided {@link Environment}.
68-
* Note: only endpoint options are available from spec.
69-
* The {@code listenerContainer} options should be specified
71+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
72+
* or the {@code listenerContainer} options should be specified
7073
* on the provided {@link StreamListenerContainer} using
7174
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
7275
* @param environment the environment.
7376
* @param codec the codec.
7477
* @return the RabbitInboundChannelAdapterSLCSpec.
7578
*/
76-
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) {
79+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, @Nullable Codec codec) {
7780
return new RabbitStreamInboundChannelAdapterSpec(environment, codec);
7881
}
7982

83+
/**
84+
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
85+
* @param environment the environment.
86+
* @param streamName the name of stream to produce.
87+
* @return the RabbitStreamMessageHandlerSpec.
88+
* @since 6.1
89+
*/
90+
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(Environment environment, String streamName) {
91+
return outboundStreamAdapter(new RabbitStreamTemplate(environment, streamName));
92+
}
93+
8094
/**
8195
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
8296
* @param template the amqpTemplate.

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,6 +44,42 @@ protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullab
4444
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
4545
}
4646

47+
/**
48+
* Configure a name for Rabbit stream to consume from.
49+
* @param streamName the name of Rabbit stream.
50+
* @return the spec
51+
* @since 6.1
52+
*/
53+
public RabbitStreamInboundChannelAdapterSpec streamName(String streamName) {
54+
this.listenerContainerSpec.queueName(streamName);
55+
return this;
56+
}
57+
58+
/**
59+
* Configure a name for Rabbit super stream to consume from.
60+
* @param superStream the name of Rabbit super stream.
61+
* @param consumerName the logical name to enable offset tracking.
62+
* @return the spec
63+
* @since 6.1
64+
*/
65+
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName) {
66+
return superName(superStream, consumerName, 1);
67+
}
68+
69+
/**
70+
* Configure a name for Rabbit super stream to consume from.
71+
* @param superStream the name of Rabbit super stream.
72+
* @param consumerName the logical name to enable offset tracking.
73+
* @param consumers the number of consumers.
74+
* @return the spec
75+
* @since 6.1
76+
*/
77+
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName, int consumers) {
78+
((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec)
79+
.superStream(superStream, consumerName, consumers);
80+
return this;
81+
}
82+
4783
public RabbitStreamInboundChannelAdapterSpec configureContainer(
4884
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {
4985

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,7 +55,20 @@ public class RabbitStreamMessageListenerContainerSpec extends
5555
* @return this spec.
5656
*/
5757
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name) {
58-
this.target.superStream(superStream, name);
58+
return superStream(superStream, name, 1);
59+
}
60+
61+
/**
62+
* Enable Single Active Consumer on a Super Stream.
63+
* Mutually exclusive with {@link #queueName(String...)}.
64+
* @param superStream the stream.
65+
* @param name the consumer name.
66+
* @param consumers the number of consumers.
67+
* @return this spec.
68+
* @since 6.1
69+
*/
70+
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name, int consumers) {
71+
this.target.superStream(superStream, name, consumers);
5972
return this;
6073
}
6174

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2014-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import com.rabbitmq.stream.Address;
20+
import com.rabbitmq.stream.Environment;
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.amqp.core.Queue;
24+
import org.springframework.amqp.core.QueueBuilder;
25+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
26+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
27+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
28+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.integration.amqp.support.RabbitTestContainer;
33+
import org.springframework.integration.annotation.ServiceActivator;
34+
import org.springframework.integration.channel.QueueChannel;
35+
import org.springframework.integration.config.EnableIntegration;
36+
import org.springframework.integration.dsl.IntegrationFlow;
37+
import org.springframework.messaging.Message;
38+
import org.springframework.messaging.MessageChannel;
39+
import org.springframework.messaging.PollableChannel;
40+
import org.springframework.messaging.support.GenericMessage;
41+
import org.springframework.rabbit.stream.config.SuperStream;
42+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
45+
46+
import static org.assertj.core.api.Assertions.assertThat;
47+
48+
/**
49+
* @author Artem Bilan
50+
*
51+
* @since 6.1
52+
*/
53+
@SpringJUnitConfig
54+
@DirtiesContext
55+
public class RabbitStreamTests implements RabbitTestContainer {
56+
57+
@Autowired
58+
MessageChannel sendToRabbitStreamChannel;
59+
60+
@Autowired
61+
PollableChannel results;
62+
63+
@Test
64+
void rabbitStreamWithSpringIntegrationChannelAdapters() {
65+
var testData = "test data";
66+
this.sendToRabbitStreamChannel.send(new GenericMessage<>(testData));
67+
68+
Message<?> receive = results.receive(10_000);
69+
70+
assertThat(receive).isNotNull()
71+
.extracting(Message::getPayload)
72+
.isEqualTo(testData);
73+
}
74+
75+
@Autowired
76+
MessageChannel sendToRabbitSuperStreamChannel;
77+
78+
@Test
79+
void superStreamWithSpringIntegrationChannelAdapters() {
80+
var testData = "test super data";
81+
this.sendToRabbitSuperStreamChannel.send(new GenericMessage<>(testData));
82+
83+
Message<?> receive = results.receive(10_000);
84+
85+
assertThat(receive).isNotNull()
86+
.extracting(Message::getPayload)
87+
.isEqualTo(testData);
88+
}
89+
90+
@Configuration
91+
@EnableIntegration
92+
public static class ContextConfiguration {
93+
94+
@Bean
95+
ConnectionFactory rabbitConnectionFactory() {
96+
return new CachingConnectionFactory(RabbitTestContainer.amqpPort());
97+
}
98+
99+
@Bean
100+
RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
101+
return new RabbitTemplate(rabbitConnectionFactory);
102+
}
103+
104+
@Bean
105+
Environment rabbitStreamEnvironment() {
106+
return Environment.builder()
107+
.addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort()))
108+
.build();
109+
}
110+
111+
@Bean(initMethod = "initialize")
112+
RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory) {
113+
return new RabbitAdmin(rabbitConnectionFactory);
114+
}
115+
116+
@Bean
117+
Queue stream() {
118+
return QueueBuilder.durable("test.stream1")
119+
.stream()
120+
.build();
121+
}
122+
123+
@Bean
124+
@ServiceActivator(inputChannel = "sendToRabbitStreamChannel")
125+
RabbitStreamMessageHandlerSpec rabbitStreamMessageHandler(Environment env, Queue stream) {
126+
return RabbitStream.outboundStreamAdapter(env, stream.getName()).sync(true);
127+
}
128+
129+
@Bean
130+
IntegrationFlow rabbitStreamConsumer(Environment env, Queue stream) {
131+
return IntegrationFlow.from(
132+
RabbitStream.inboundAdapter(env)
133+
.streamName(stream.getName()))
134+
.channel("results")
135+
.get();
136+
}
137+
138+
@Bean
139+
QueueChannel results() {
140+
return new QueueChannel();
141+
}
142+
143+
@Bean
144+
SuperStream superStream() {
145+
return new SuperStream("test.superStream1", 3);
146+
}
147+
148+
@Bean
149+
@ServiceActivator(inputChannel = "sendToRabbitSuperStreamChannel")
150+
AmqpOutboundChannelAdapterSpec rabbitSuperStreamMessageHandler(RabbitTemplate rabbitTemplate) {
151+
return Amqp.outboundAdapter(rabbitTemplate)
152+
.exchangeName("test.superStream1")
153+
.routingKey("1");
154+
}
155+
156+
@Bean
157+
IntegrationFlow superStreamConsumer(Environment env) {
158+
return IntegrationFlow.from(
159+
RabbitStream.inboundAdapter(env)
160+
.superName("test.superStream1", "mySuperConsumer"))
161+
.channel("results")
162+
.get();
163+
}
164+
165+
}
166+
167+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,7 @@ image::images/spring-integration-amqp-sample-graph.png[align="center"]
14261426

14271427
Version 6.0 introduced support for RabbitMQ Stream Queues.
14281428

1429-
The DSL factory class for these endpoints is `Rabbit`.
1429+
The DSL factory class for these endpoints is a `RabbitStream`.
14301430

14311431
[[rmq-stream-inbound-channel-adapter]]
14321432
==== RabbitMQ Stream Inbound Channel Adapter
@@ -1435,22 +1435,17 @@ The DSL factory class for these endpoints is `Rabbit`.
14351435
[source, java]
14361436
----
14371437
@Bean
1438-
IntegrationFlow flow(Environment env) {
1439-
@Bean
1440-
IntegrationFlow simpleStream(Environment env) {
1441-
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
1442-
.configureContainer(container -> container.queueName("my.stream")))
1443-
// ...
1444-
.get();
1445-
}
1446-
1447-
@Bean
1448-
IntegrationFlow superStream(Environment env) {
1449-
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
1450-
.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
1451-
// ...
1452-
.get();
1453-
}
1438+
IntegrationFlow simpleStream(Environment env) {
1439+
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).streamName("my.stream"))
1440+
// ...
1441+
.get();
1442+
}
1443+
1444+
@Bean
1445+
IntegrationFlow superStream(Environment env) {
1446+
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).superStream("my.super.stream", "my.consumer"))
1447+
// ...
1448+
.get();
14541449
}
14551450
----
14561451
====
@@ -1462,10 +1457,10 @@ IntegrationFlow flow(Environment env) {
14621457
[source, java]
14631458
----
14641459
@Bean
1465-
IntegrationFlow outbound(RabbitStreamTemplate template) {
1460+
IntegrationFlow outbound(Environment env) {
14661461
return f -> f
14671462
// ...
1468-
.handle(RabbitStream.outboundStreamAdapter(template));
1463+
.handle(RabbitStream.outboundStreamAdapter(env, "my.stream"));
14691464
14701465
}
14711466
----

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@ See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more inform
6969

7070
The `FileReadingMessageSource` now exposes `watchMaxDepth` and `watchDirPredicate` options for the `WatchService`.
7171
See <<./file.adoc#watch-service-directory-scanner, `WatchServiceDirectoryScanner`>> for more information.
72+
73+
[[x6.1-amqp]]
74+
=== AMQP Changes
75+
76+
The Java DSL API for Rabbit Streams (the `RabbitStream` factory) exposes additional properties for simple configurations.
77+
See <<./amqp.adoc#rmq-streams, `RabbitMQ Stream Queue Support`>> for more information.

0 commit comments

Comments
 (0)