Skip to content

Commit 71c8b6f

Browse files
committed
GH-3626: RabbitMQ Stream Support
Resolves #3626 - spec for stream listener container - move message handler from scst to here; add spec
1 parent cbfa150 commit 71c8b6f

File tree

14 files changed

+853
-18
lines changed

14 files changed

+853
-18
lines changed

build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ ext {
9999
rsocketVersion = '1.1.3'
100100
servletApiVersion = '5.0.0'
101101
smackVersion = '4.4.6'
102-
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-M4'
102+
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-SNAPSHOT'
103103
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2022.0.0-M6'
104104
springGraphqlVersion = '1.1.0-M1'
105105
springKafkaVersion = '3.0.0-M6'
@@ -470,12 +470,16 @@ project('spring-integration-amqp') {
470470
api("org.springframework.amqp:spring-rabbit:$springAmqpVersion") {
471471
exclude group: 'org.springframework'
472472
}
473+
optionalApi("org.springframework.amqp:spring-rabbit-stream:$springAmqpVersion") {
474+
exclude group: 'org.springframework'
475+
}
473476

474477
testImplementation("org.springframework.amqp:spring-rabbit-junit:$springAmqpVersion") {
475478
exclude group: 'org.springframework'
476479
}
477480
testImplementation project(':spring-integration-stream')
478481
testImplementation 'org.springframework:spring-web'
482+
testImplementation 'org.testcontainers:rabbitmq'
479483
}
480484
}
481485

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,14 +28,14 @@
2828
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
2929
import org.springframework.amqp.support.ConditionalExceptionLogger;
3030
import org.springframework.amqp.support.ConsumerTagStrategy;
31-
import org.springframework.integration.dsl.IntegrationComponentSpec;
3231
import org.springframework.transaction.PlatformTransactionManager;
3332
import org.springframework.transaction.interceptor.TransactionAttribute;
3433
import org.springframework.util.ErrorHandler;
3534
import org.springframework.util.backoff.BackOff;
3635

3736
/**
38-
* Base class for container specs.
37+
* Base class for container specs for containers that extend
38+
* {@link AbstractMessageListenerContainer}.
3939
*
4040
* @param <S> the current spec extension type
4141
* @param <C> the listener container type
@@ -48,7 +48,7 @@
4848
*/
4949
public abstract class AbstractMessageListenerContainerSpec<S extends AbstractMessageListenerContainerSpec<S, C>,
5050
C extends AbstractMessageListenerContainer>
51-
extends IntegrationComponentSpec<S, C> {
51+
extends MessageListenerContainerSpec<S, C> {
5252

5353
public AbstractMessageListenerContainerSpec(C listenerContainer) {
5454
this.target = listenerContainer;

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,11 @@
2525
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
2626
import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory;
2727
import org.springframework.lang.Nullable;
28+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
29+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
30+
31+
import com.rabbitmq.stream.Codec;
32+
import com.rabbitmq.stream.Environment;
2833

2934
/**
3035
* Factory class for AMQP components.
@@ -239,6 +244,49 @@ public static AmqpInboundChannelAdapterDMLCSpec inboundAdapter(DirectMessageList
239244
return new AmqpInboundChannelAdapterDMLCSpec(listenerContainer);
240245
}
241246

247+
/**
248+
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
249+
* with the provided {@link StreamListenerContainer}.
250+
* Note: only endpoint options are available from spec.
251+
* The {@code listenerContainer} options should be specified
252+
* on the provided {@link StreamListenerContainer} using
253+
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
254+
* @param listenerContainer the listenerContainer.
255+
* @return the RabbitInboundChannelAdapterSLCSpec.
256+
*/
257+
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(StreamListenerContainer listenerContainer) {
258+
return new RabbitInboundChannelAdapterSLCSpec(listenerContainer);
259+
}
260+
261+
/**
262+
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
263+
* with the provided {@link Environment}.
264+
* Note: only endpoint options are available from spec.
265+
* The {@code listenerContainer} options should be specified
266+
* on the provided {@link StreamListenerContainer} using
267+
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
268+
* @param environment the environment.
269+
* @return the RabbitInboundChannelAdapterSLCSpec.
270+
*/
271+
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment) {
272+
return new RabbitInboundChannelAdapterSLCSpec(environment, null);
273+
}
274+
275+
/**
276+
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
277+
* with the provided {@link Environment}.
278+
* Note: only endpoint options are available from spec.
279+
* The {@code listenerContainer} options should be specified
280+
* on the provided {@link StreamListenerContainer} using
281+
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
282+
* @param environment the environment.
283+
* @param codec the codec.
284+
* @return the RabbitInboundChannelAdapterSLCSpec.
285+
*/
286+
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment, Codec codec) {
287+
return new RabbitInboundChannelAdapterSLCSpec(environment, codec);
288+
}
289+
242290
/**
243291
* Create an initial AmqpOutboundEndpointSpec (adapter).
244292
* @param amqpTemplate the amqpTemplate.
@@ -248,6 +296,15 @@ public static AmqpOutboundChannelAdapterSpec outboundAdapter(AmqpTemplate amqpTe
248296
return new AmqpOutboundChannelAdapterSpec(amqpTemplate);
249297
}
250298

299+
/**
300+
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
301+
* @param template the amqpTemplate.
302+
* @return the RabbitStreamMessageHandlerSpec.
303+
*/
304+
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) {
305+
return new RabbitStreamMessageHandlerSpec(template);
306+
}
307+
251308
/**
252309
* Create an initial AmqpOutboundEndpointSpec (gateway).
253310
* @param amqpTemplate the amqpTemplate.

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.Map;
2121

22-
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
22+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
2323
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
2424
import org.springframework.integration.dsl.ComponentsRegistration;
2525

@@ -36,13 +36,13 @@
3636
* @since 5.0
3737
*/
3838
public abstract class AmqpInboundChannelAdapterSpec
39-
<S extends AmqpInboundChannelAdapterSpec<S, C>, C extends AbstractMessageListenerContainer>
39+
<S extends AmqpInboundChannelAdapterSpec<S, C>, C extends MessageListenerContainer>
4040
extends AmqpBaseInboundChannelAdapterSpec<S>
4141
implements ComponentsRegistration {
4242

43-
protected final AbstractMessageListenerContainerSpec<?, C> listenerContainerSpec; // NOSONAR final
43+
protected final MessageListenerContainerSpec<?, C> listenerContainerSpec; // NOSONAR final
4444

45-
protected AmqpInboundChannelAdapterSpec(AbstractMessageListenerContainerSpec<?, C> listenerContainerSpec) {
45+
protected AmqpInboundChannelAdapterSpec(MessageListenerContainerSpec<?, C> listenerContainerSpec) {
4646
super(new AmqpInboundChannelAdapter(listenerContainerSpec.get()));
4747
this.listenerContainerSpec = listenerContainerSpec;
4848
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
20+
import org.springframework.integration.dsl.IntegrationComponentSpec;
21+
22+
/**
23+
* Base class for container specs.
24+
*
25+
* @param <S> the current spec extension type
26+
* @param <C> the listener container type
27+
*
28+
* @author Gary Russell
29+
*
30+
* @since 6.0
31+
*
32+
*/
33+
public abstract class MessageListenerContainerSpec<S extends MessageListenerContainerSpec<S, C>,
34+
C extends MessageListenerContainer>
35+
extends IntegrationComponentSpec<S, C> {
36+
37+
/**
38+
* Set the queue names.
39+
* @param queueNames the queue names.
40+
* @return this spec.
41+
*/
42+
public S queueName(String... queueNames) {
43+
this.target.setQueueNames(queueNames);
44+
return _this();
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import java.util.function.Consumer;
20+
21+
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
24+
25+
import com.rabbitmq.stream.Codec;
26+
import com.rabbitmq.stream.Environment;
27+
28+
/**
29+
* Spec for an inbound channel adapter with a {@link DirectMessageListenerContainer}.
30+
*
31+
* @author Gary Russell
32+
* @author Artem Bilan
33+
*
34+
* @since 6.0
35+
*
36+
*/
37+
public class RabbitInboundChannelAdapterSLCSpec
38+
extends AmqpInboundChannelAdapterSpec<RabbitInboundChannelAdapterSLCSpec, StreamListenerContainer> {
39+
40+
protected RabbitInboundChannelAdapterSLCSpec(StreamListenerContainer listenerContainer) {
41+
super(new RabbitStreamMessageListenerContainerSpec(listenerContainer));
42+
}
43+
44+
protected RabbitInboundChannelAdapterSLCSpec(Environment environment, @Nullable Codec codec) {
45+
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
46+
}
47+
48+
public RabbitInboundChannelAdapterSLCSpec configureContainer(
49+
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {
50+
51+
configurer.accept((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec);
52+
return this;
53+
}
54+
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2016-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint;
20+
import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler;
21+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
22+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
23+
import org.springframework.integration.dsl.MessageHandlerSpec;
24+
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
25+
26+
/**
27+
* The base {@link MessageHandlerSpec} for {@link AbstractAmqpOutboundEndpoint}s.
28+
*
29+
* @author Gary Russell
30+
*
31+
* @since 6.0
32+
*/
33+
public class RabbitStreamMessageHandlerSpec
34+
extends MessageHandlerSpec<RabbitStreamMessageHandlerSpec, RabbitStreamMessageHandler> {
35+
36+
private final DefaultAmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();
37+
38+
RabbitStreamMessageHandlerSpec(RabbitStreamOperations operations) {
39+
this.target = new RabbitStreamMessageHandler(operations);
40+
}
41+
42+
/**
43+
* Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers.
44+
* @param headerMapper the {@link AmqpHeaderMapper} to use.
45+
* @return the spec
46+
*/
47+
public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper) {
48+
this.target.setHeaderMapper(headerMapper);
49+
return this;
50+
}
51+
52+
/**
53+
* Provide the header names that should be mapped from a request to a
54+
* {@link org.springframework.messaging.MessageHeaders}.
55+
* @param headers The request header names.
56+
* @return the spec
57+
*/
58+
public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) {
59+
this.headerMapper.setRequestHeaderNames(headers);
60+
return this;
61+
}
62+
63+
/**
64+
* Determine whether the headers are
65+
* mapped before the message is converted, or afterwards.
66+
* @param headersLast true to map headers last.
67+
* @return the spec.
68+
* @see AbstractAmqpOutboundEndpoint#setHeadersMappedLast(boolean)
69+
*/
70+
public RabbitStreamMessageHandlerSpec headersMappedLast(boolean headersLast) {
71+
this.target.setHeadersMappedLast(headersLast);
72+
return this;
73+
}
74+
75+
/**
76+
* Set a callback to be invoked when a send is successful.
77+
* @param callback the callback.
78+
*/
79+
public RabbitStreamMessageHandlerSpec successCallback(RabbitStreamMessageHandler.SuccessCallback callback) {
80+
this.target.setSuccessCallback(callback);
81+
return this;
82+
}
83+
84+
/**
85+
* Set a callback to be invoked when a send fails.
86+
* @param callback the callback.
87+
*/
88+
public RabbitStreamMessageHandlerSpec failureCallback(RabbitStreamMessageHandler.FailureCallback callback) {
89+
this.target.setFailureCallback(callback);
90+
return this;
91+
}
92+
93+
/**
94+
* Set to true to wait for a confirmation.
95+
* @param sync true to wait.
96+
* @see #setConfirmTimeout(long)
97+
*/
98+
public RabbitStreamMessageHandlerSpec sync(boolean sync) {
99+
this.target.setSync(sync);
100+
return this;
101+
}
102+
103+
/**
104+
* Set a timeout for the confirm result.
105+
* @param timeout the approximate timeout.
106+
* @return the spec.
107+
* @see #sync(boolean)
108+
*/
109+
public RabbitStreamMessageHandlerSpec confirmTimeout(long timeout) {
110+
this.target.setConfirmTimeout(timeout);
111+
return this;
112+
}
113+
114+
}

0 commit comments

Comments
 (0)