Skip to content

Commit feb4705

Browse files
authored
Improve Java DSL for Rabbit Streams (#8598)
* Improve Java DSL for Rabbit Streams * Expose simple properties for `streamName` and `superStream` on the `RabbitStreamInboundChannelAdapterSpec` * Add `superStream(String superStream, String name, int consumers)` option * Add `outboundStreamAdapter(Environment environment, String streamName)` factory for simple use-cases * Add `RabbitStreamTests` integration test to cover Rabbit Streams support and demonstrate respective Java DSL * Mention the change in the docs * * Fix typos in code and docs
1 parent fa178c3 commit feb4705

File tree

6 files changed

+261
-31
lines changed

6 files changed

+261
-31
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,13 +19,16 @@
1919
import com.rabbitmq.stream.Codec;
2020
import com.rabbitmq.stream.Environment;
2121

22+
import org.springframework.lang.Nullable;
2223
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
2324
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
2425

2526
/**
2627
* Factory class for RabbitMQ components.
2728
*
2829
* @author Gary Russell
30+
* @author Artem Bilan
31+
*
2932
* @since 6.0
3033
*
3134
*/
@@ -37,8 +40,8 @@ private RabbitStream() {
3740
/**
3841
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
3942
* with the provided {@link StreamListenerContainer}.
40-
* Note: only endpoint options are available from spec.
41-
* The {@code listenerContainer} options should be specified
43+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
44+
* or the {@code listenerContainer} options should be specified
4245
* on the provided {@link StreamListenerContainer} using
4346
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
4447
* @param listenerContainer the listenerContainer.
@@ -51,32 +54,43 @@ public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListene
5154
/**
5255
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
5356
* with the provided {@link Environment}.
54-
* Note: only endpoint options are available from spec.
55-
* The {@code listenerContainer} options should be specified
57+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
58+
* or the {@code listenerContainer} options should be specified
5659
* on the provided {@link StreamListenerContainer} using
5760
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
5861
* @param environment the environment.
5962
* @return the RabbitInboundChannelAdapterSLCSpec.
6063
*/
6164
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) {
62-
return new RabbitStreamInboundChannelAdapterSpec(environment, null);
65+
return inboundAdapter(environment, null);
6366
}
6467

6568
/**
6669
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
6770
* with the provided {@link Environment}.
68-
* Note: only endpoint options are available from spec.
69-
* The {@code listenerContainer} options should be specified
71+
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
72+
* or the {@code listenerContainer} options should be specified
7073
* on the provided {@link StreamListenerContainer} using
7174
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
7275
* @param environment the environment.
7376
* @param codec the codec.
7477
* @return the RabbitInboundChannelAdapterSLCSpec.
7578
*/
76-
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) {
79+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, @Nullable Codec codec) {
7780
return new RabbitStreamInboundChannelAdapterSpec(environment, codec);
7881
}
7982

83+
/**
84+
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
85+
* @param environment the environment.
86+
* @param streamName the name of stream to produce.
87+
* @return the RabbitStreamMessageHandlerSpec.
88+
* @since 6.1
89+
*/
90+
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(Environment environment, String streamName) {
91+
return outboundStreamAdapter(new RabbitStreamTemplate(environment, streamName));
92+
}
93+
8094
/**
8195
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
8296
* @param template the amqpTemplate.

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,6 +44,42 @@ protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullab
4444
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
4545
}
4646

47+
/**
48+
* Configure a name for Rabbit stream to consume from.
49+
* @param streamName the name of Rabbit stream.
50+
* @return the spec
51+
* @since 6.1
52+
*/
53+
public RabbitStreamInboundChannelAdapterSpec streamName(String streamName) {
54+
this.listenerContainerSpec.queueName(streamName);
55+
return this;
56+
}
57+
58+
/**
59+
* Configure a name for Rabbit super stream to consume from.
60+
* @param superStream the name of Rabbit super stream.
61+
* @param consumerName the logical name to enable offset tracking.
62+
* @return the spec
63+
* @since 6.1
64+
*/
65+
public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName) {
66+
return superStream(superStream, consumerName, 1);
67+
}
68+
69+
/**
70+
* Configure a name for Rabbit super stream to consume from.
71+
* @param superStream the name of Rabbit super stream.
72+
* @param consumerName the logical name to enable offset tracking.
73+
* @param consumers the number of consumers.
74+
* @return the spec
75+
* @since 6.1
76+
*/
77+
public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName, int consumers) {
78+
((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec)
79+
.superStream(superStream, consumerName, consumers);
80+
return this;
81+
}
82+
4783
public RabbitStreamInboundChannelAdapterSpec configureContainer(
4884
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {
4985

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,7 +55,20 @@ public class RabbitStreamMessageListenerContainerSpec extends
5555
* @return this spec.
5656
*/
5757
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name) {
58-
this.target.superStream(superStream, name);
58+
return superStream(superStream, name, 1);
59+
}
60+
61+
/**
62+
* Enable Single Active Consumer on a Super Stream.
63+
* Mutually exclusive with {@link #queueName(String...)}.
64+
* @param superStream the stream.
65+
* @param name the consumer name.
66+
* @param consumers the number of consumers.
67+
* @return this spec.
68+
* @since 6.1
69+
*/
70+
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name, int consumers) {
71+
this.target.superStream(superStream, name, consumers);
5972
return this;
6073
}
6174

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright 2014-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import com.rabbitmq.stream.Address;
20+
import com.rabbitmq.stream.Environment;
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.amqp.core.Queue;
24+
import org.springframework.amqp.core.QueueBuilder;
25+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
26+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
27+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
28+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.integration.amqp.support.RabbitTestContainer;
33+
import org.springframework.integration.annotation.ServiceActivator;
34+
import org.springframework.integration.channel.QueueChannel;
35+
import org.springframework.integration.config.EnableIntegration;
36+
import org.springframework.integration.dsl.IntegrationFlow;
37+
import org.springframework.messaging.Message;
38+
import org.springframework.messaging.MessageChannel;
39+
import org.springframework.messaging.PollableChannel;
40+
import org.springframework.messaging.support.GenericMessage;
41+
import org.springframework.rabbit.stream.config.SuperStream;
42+
import org.springframework.test.annotation.DirtiesContext;
43+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
44+
45+
import static org.assertj.core.api.Assertions.assertThat;
46+
47+
/**
48+
* @author Artem Bilan
49+
*
50+
* @since 6.1
51+
*/
52+
@SpringJUnitConfig
53+
@DirtiesContext
54+
public class RabbitStreamTests implements RabbitTestContainer {
55+
56+
@Autowired
57+
MessageChannel sendToRabbitStreamChannel;
58+
59+
@Autowired
60+
PollableChannel results;
61+
62+
@Test
63+
void rabbitStreamWithSpringIntegrationChannelAdapters() {
64+
var testData = "test data";
65+
this.sendToRabbitStreamChannel.send(new GenericMessage<>(testData));
66+
67+
Message<?> receive = results.receive(10_000);
68+
69+
assertThat(receive).isNotNull()
70+
.extracting(Message::getPayload)
71+
.isEqualTo(testData);
72+
}
73+
74+
@Autowired
75+
MessageChannel sendToRabbitSuperStreamChannel;
76+
77+
@Test
78+
void superStreamWithSpringIntegrationChannelAdapters() {
79+
var testData = "test super data";
80+
this.sendToRabbitSuperStreamChannel.send(new GenericMessage<>(testData));
81+
82+
Message<?> receive = results.receive(10_000);
83+
84+
assertThat(receive).isNotNull()
85+
.extracting(Message::getPayload)
86+
.isEqualTo(testData);
87+
}
88+
89+
@Configuration
90+
@EnableIntegration
91+
public static class ContextConfiguration {
92+
93+
@Bean
94+
ConnectionFactory rabbitConnectionFactory() {
95+
return new CachingConnectionFactory(RabbitTestContainer.amqpPort());
96+
}
97+
98+
@Bean
99+
RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
100+
return new RabbitTemplate(rabbitConnectionFactory);
101+
}
102+
103+
@Bean
104+
Environment rabbitStreamEnvironment() {
105+
return Environment.builder()
106+
.addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort()))
107+
.build();
108+
}
109+
110+
@Bean(initMethod = "initialize")
111+
RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory) {
112+
return new RabbitAdmin(rabbitConnectionFactory);
113+
}
114+
115+
@Bean
116+
Queue stream() {
117+
return QueueBuilder.durable("test.stream1")
118+
.stream()
119+
.build();
120+
}
121+
122+
@Bean
123+
@ServiceActivator(inputChannel = "sendToRabbitStreamChannel")
124+
RabbitStreamMessageHandlerSpec rabbitStreamMessageHandler(Environment env, Queue stream) {
125+
return RabbitStream.outboundStreamAdapter(env, stream.getName()).sync(true);
126+
}
127+
128+
@Bean
129+
IntegrationFlow rabbitStreamConsumer(Environment env, Queue stream) {
130+
return IntegrationFlow.from(
131+
RabbitStream.inboundAdapter(env)
132+
.streamName(stream.getName()))
133+
.channel("results")
134+
.get();
135+
}
136+
137+
@Bean
138+
QueueChannel results() {
139+
return new QueueChannel();
140+
}
141+
142+
@Bean
143+
SuperStream superStream() {
144+
return new SuperStream("test.superStream1", 3);
145+
}
146+
147+
@Bean
148+
@ServiceActivator(inputChannel = "sendToRabbitSuperStreamChannel")
149+
AmqpOutboundChannelAdapterSpec rabbitSuperStreamMessageHandler(RabbitTemplate rabbitTemplate) {
150+
return Amqp.outboundAdapter(rabbitTemplate)
151+
.exchangeName("test.superStream1")
152+
.routingKey("1");
153+
}
154+
155+
@Bean
156+
IntegrationFlow superStreamConsumer(Environment env) {
157+
return IntegrationFlow.from(
158+
RabbitStream.inboundAdapter(env)
159+
.superStream("test.superStream1", "mySuperConsumer"))
160+
.channel("results")
161+
.get();
162+
}
163+
164+
}
165+
166+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,7 @@ image::images/spring-integration-amqp-sample-graph.png[align="center"]
14261426

14271427
Version 6.0 introduced support for RabbitMQ Stream Queues.
14281428

1429-
The DSL factory class for these endpoints is `Rabbit`.
1429+
The DSL factory class for these endpoints is `RabbitStream`.
14301430

14311431
[[rmq-stream-inbound-channel-adapter]]
14321432
==== RabbitMQ Stream Inbound Channel Adapter
@@ -1435,22 +1435,17 @@ The DSL factory class for these endpoints is `Rabbit`.
14351435
[source, java]
14361436
----
14371437
@Bean
1438-
IntegrationFlow flow(Environment env) {
1439-
@Bean
1440-
IntegrationFlow simpleStream(Environment env) {
1441-
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
1442-
.configureContainer(container -> container.queueName("my.stream")))
1443-
// ...
1444-
.get();
1445-
}
1446-
1447-
@Bean
1448-
IntegrationFlow superStream(Environment env) {
1449-
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
1450-
.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
1451-
// ...
1452-
.get();
1453-
}
1438+
IntegrationFlow simpleStream(Environment env) {
1439+
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).streamName("my.stream"))
1440+
// ...
1441+
.get();
1442+
}
1443+
1444+
@Bean
1445+
IntegrationFlow superStream(Environment env) {
1446+
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).superStream("my.super.stream", "my.consumer"))
1447+
// ...
1448+
.get();
14541449
}
14551450
----
14561451
====
@@ -1462,10 +1457,10 @@ IntegrationFlow flow(Environment env) {
14621457
[source, java]
14631458
----
14641459
@Bean
1465-
IntegrationFlow outbound(RabbitStreamTemplate template) {
1460+
IntegrationFlow outbound(Environment env) {
14661461
return f -> f
14671462
// ...
1468-
.handle(RabbitStream.outboundStreamAdapter(template));
1463+
.handle(RabbitStream.outboundStreamAdapter(env, "my.stream"));
14691464
14701465
}
14711466
----

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@ See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more inform
6969

7070
The `FileReadingMessageSource` now exposes `watchMaxDepth` and `watchDirPredicate` options for the `WatchService`.
7171
See <<./file.adoc#watch-service-directory-scanner, `WatchServiceDirectoryScanner`>> for more information.
72+
73+
[[x6.1-amqp]]
74+
=== AMQP Changes
75+
76+
The Java DSL API for Rabbit Streams (the `RabbitStream` factory) exposes additional properties for simple configurations.
77+
See <<./amqp.adoc#rmq-streams, `RabbitMQ Stream Queue Support`>> for more information.

0 commit comments

Comments
 (0)