Skip to content

Commit 26816a3

Browse files
authored
GH-3626: RabbitMQ Stream Support (#3895)
Resolves #3626 - spec for stream listener container - move message handler from scst to here; add spec * Fix test. * Polishing and Docs. * Fix anchors in doc. * Don't Extend `AbstractMessageProducingHandler`
1 parent cbfa150 commit 26816a3

File tree

17 files changed

+988
-19
lines changed

17 files changed

+988
-19
lines changed

build.gradle

+5-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ ext {
9999
rsocketVersion = '1.1.3'
100100
servletApiVersion = '5.0.0'
101101
smackVersion = '4.4.6'
102-
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-M4'
102+
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-SNAPSHOT'
103103
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2022.0.0-M6'
104104
springGraphqlVersion = '1.1.0-M1'
105105
springKafkaVersion = '3.0.0-M6'
@@ -470,12 +470,16 @@ project('spring-integration-amqp') {
470470
api("org.springframework.amqp:spring-rabbit:$springAmqpVersion") {
471471
exclude group: 'org.springframework'
472472
}
473+
optionalApi("org.springframework.amqp:spring-rabbit-stream:$springAmqpVersion") {
474+
exclude group: 'org.springframework'
475+
}
473476

474477
testImplementation("org.springframework.amqp:spring-rabbit-junit:$springAmqpVersion") {
475478
exclude group: 'org.springframework'
476479
}
477480
testImplementation project(':spring-integration-stream')
478481
testImplementation 'org.springframework:spring-web'
482+
testImplementation 'org.testcontainers:rabbitmq'
479483
}
480484
}
481485

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,14 +28,14 @@
2828
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
2929
import org.springframework.amqp.support.ConditionalExceptionLogger;
3030
import org.springframework.amqp.support.ConsumerTagStrategy;
31-
import org.springframework.integration.dsl.IntegrationComponentSpec;
3231
import org.springframework.transaction.PlatformTransactionManager;
3332
import org.springframework.transaction.interceptor.TransactionAttribute;
3433
import org.springframework.util.ErrorHandler;
3534
import org.springframework.util.backoff.BackOff;
3635

3736
/**
38-
* Base class for container specs.
37+
* Base class for container specs for containers that extend
38+
* {@link AbstractMessageListenerContainer}.
3939
*
4040
* @param <S> the current spec extension type
4141
* @param <C> the listener container type
@@ -48,7 +48,7 @@
4848
*/
4949
public abstract class AbstractMessageListenerContainerSpec<S extends AbstractMessageListenerContainerSpec<S, C>,
5050
C extends AbstractMessageListenerContainer>
51-
extends IntegrationComponentSpec<S, C> {
51+
extends MessageListenerContainerSpec<S, C> {
5252

5353
public AbstractMessageListenerContainerSpec(C listenerContainer) {
5454
this.target = listenerContainer;

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.Map;
2121

22-
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
22+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
2323
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
2424
import org.springframework.integration.dsl.ComponentsRegistration;
2525

@@ -36,13 +36,13 @@
3636
* @since 5.0
3737
*/
3838
public abstract class AmqpInboundChannelAdapterSpec
39-
<S extends AmqpInboundChannelAdapterSpec<S, C>, C extends AbstractMessageListenerContainer>
39+
<S extends AmqpInboundChannelAdapterSpec<S, C>, C extends MessageListenerContainer>
4040
extends AmqpBaseInboundChannelAdapterSpec<S>
4141
implements ComponentsRegistration {
4242

43-
protected final AbstractMessageListenerContainerSpec<?, C> listenerContainerSpec; // NOSONAR final
43+
protected final MessageListenerContainerSpec<?, C> listenerContainerSpec; // NOSONAR final
4444

45-
protected AmqpInboundChannelAdapterSpec(AbstractMessageListenerContainerSpec<?, C> listenerContainerSpec) {
45+
protected AmqpInboundChannelAdapterSpec(MessageListenerContainerSpec<?, C> listenerContainerSpec) {
4646
super(new AmqpInboundChannelAdapter(listenerContainerSpec.get()));
4747
this.listenerContainerSpec = listenerContainerSpec;
4848
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
20+
import org.springframework.integration.dsl.IntegrationComponentSpec;
21+
22+
/**
23+
* Base class for container specs.
24+
*
25+
* @param <S> the current spec extension type
26+
* @param <C> the listener container type
27+
*
28+
* @author Gary Russell
29+
*
30+
* @since 6.0
31+
*
32+
*/
33+
public abstract class MessageListenerContainerSpec<S extends MessageListenerContainerSpec<S, C>,
34+
C extends MessageListenerContainer>
35+
extends IntegrationComponentSpec<S, C> {
36+
37+
/**
38+
* Set the queue names.
39+
* @param queueNames the queue names.
40+
* @return this spec.
41+
*/
42+
public S queueName(String... queueNames) {
43+
this.target.setQueueNames(queueNames);
44+
return _this();
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
20+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
21+
22+
import com.rabbitmq.stream.Codec;
23+
import com.rabbitmq.stream.Environment;
24+
25+
/**
26+
* Factory class for RabbitMQ components.
27+
*
28+
* @author Gary Russell
29+
* @since 6.0
30+
*
31+
*/
32+
public final class RabbitStream {
33+
34+
private RabbitStream() {
35+
}
36+
37+
/**
38+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
39+
* with the provided {@link StreamListenerContainer}.
40+
* Note: only endpoint options are available from spec.
41+
* The {@code listenerContainer} options should be specified
42+
* on the provided {@link StreamListenerContainer} using
43+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
44+
* @param listenerContainer the listenerContainer.
45+
* @return the RabbitInboundChannelAdapterSLCSpec.
46+
*/
47+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListenerContainer listenerContainer) {
48+
return new RabbitStreamInboundChannelAdapterSpec(listenerContainer);
49+
}
50+
51+
/**
52+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
53+
* with the provided {@link Environment}.
54+
* Note: only endpoint options are available from spec.
55+
* The {@code listenerContainer} options should be specified
56+
* on the provided {@link StreamListenerContainer} using
57+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
58+
* @param environment the environment.
59+
* @return the RabbitInboundChannelAdapterSLCSpec.
60+
*/
61+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) {
62+
return new RabbitStreamInboundChannelAdapterSpec(environment, null);
63+
}
64+
65+
/**
66+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
67+
* with the provided {@link Environment}.
68+
* Note: only endpoint options are available from spec.
69+
* The {@code listenerContainer} options should be specified
70+
* on the provided {@link StreamListenerContainer} using
71+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
72+
* @param environment the environment.
73+
* @param codec the codec.
74+
* @return the RabbitInboundChannelAdapterSLCSpec.
75+
*/
76+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) {
77+
return new RabbitStreamInboundChannelAdapterSpec(environment, codec);
78+
}
79+
80+
/**
81+
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
82+
* @param template the amqpTemplate.
83+
* @return the RabbitStreamMessageHandlerSpec.
84+
*/
85+
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) {
86+
return new RabbitStreamMessageHandlerSpec(template);
87+
}
88+
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2017-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import java.util.function.Consumer;
20+
21+
import org.springframework.lang.Nullable;
22+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
23+
24+
import com.rabbitmq.stream.Codec;
25+
import com.rabbitmq.stream.Environment;
26+
27+
/**
28+
* Spec for an inbound channel adapter with a {@link StreamListenerContainer}.
29+
*
30+
* @author Gary Russell
31+
* @author Artem Bilan
32+
*
33+
* @since 6.0
34+
*
35+
*/
36+
public class RabbitStreamInboundChannelAdapterSpec
37+
extends AmqpInboundChannelAdapterSpec<RabbitStreamInboundChannelAdapterSpec, StreamListenerContainer> {
38+
39+
protected RabbitStreamInboundChannelAdapterSpec(StreamListenerContainer listenerContainer) {
40+
super(new RabbitStreamMessageListenerContainerSpec(listenerContainer));
41+
}
42+
43+
protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullable Codec codec) {
44+
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
45+
}
46+
47+
public RabbitStreamInboundChannelAdapterSpec configureContainer(
48+
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {
49+
50+
configurer.accept((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec);
51+
return this;
52+
}
53+
54+
}

0 commit comments

Comments
 (0)