Skip to content

Commit 68e0783

Browse files
committed
Polishing and Docs.
1 parent aa73703 commit 68e0783

File tree

10 files changed

+295
-165
lines changed

10 files changed

+295
-165
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,6 @@
2525
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
2626
import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory;
2727
import org.springframework.lang.Nullable;
28-
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
29-
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
30-
31-
import com.rabbitmq.stream.Codec;
32-
import com.rabbitmq.stream.Environment;
3328

3429
/**
3530
* Factory class for AMQP components.
@@ -244,49 +239,6 @@ public static AmqpInboundChannelAdapterDMLCSpec inboundAdapter(DirectMessageList
244239
return new AmqpInboundChannelAdapterDMLCSpec(listenerContainer);
245240
}
246241

247-
/**
248-
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
249-
* with the provided {@link StreamListenerContainer}.
250-
* Note: only endpoint options are available from spec.
251-
* The {@code listenerContainer} options should be specified
252-
* on the provided {@link StreamListenerContainer} using
253-
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
254-
* @param listenerContainer the listenerContainer.
255-
* @return the RabbitInboundChannelAdapterSLCSpec.
256-
*/
257-
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(StreamListenerContainer listenerContainer) {
258-
return new RabbitInboundChannelAdapterSLCSpec(listenerContainer);
259-
}
260-
261-
/**
262-
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
263-
* with the provided {@link Environment}.
264-
* Note: only endpoint options are available from spec.
265-
* The {@code listenerContainer} options should be specified
266-
* on the provided {@link StreamListenerContainer} using
267-
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
268-
* @param environment the environment.
269-
* @return the RabbitInboundChannelAdapterSLCSpec.
270-
*/
271-
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment) {
272-
return new RabbitInboundChannelAdapterSLCSpec(environment, null);
273-
}
274-
275-
/**
276-
* Create an initial {@link RabbitInboundChannelAdapterSLCSpec}
277-
* with the provided {@link Environment}.
278-
* Note: only endpoint options are available from spec.
279-
* The {@code listenerContainer} options should be specified
280-
* on the provided {@link StreamListenerContainer} using
281-
* {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}.
282-
* @param environment the environment.
283-
* @param codec the codec.
284-
* @return the RabbitInboundChannelAdapterSLCSpec.
285-
*/
286-
public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment, Codec codec) {
287-
return new RabbitInboundChannelAdapterSLCSpec(environment, codec);
288-
}
289-
290242
/**
291243
* Create an initial AmqpOutboundEndpointSpec (adapter).
292244
* @param amqpTemplate the amqpTemplate.
@@ -296,15 +248,6 @@ public static AmqpOutboundChannelAdapterSpec outboundAdapter(AmqpTemplate amqpTe
296248
return new AmqpOutboundChannelAdapterSpec(amqpTemplate);
297249
}
298250

299-
/**
300-
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
301-
* @param template the amqpTemplate.
302-
* @return the RabbitStreamMessageHandlerSpec.
303-
*/
304-
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) {
305-
return new RabbitStreamMessageHandlerSpec(template);
306-
}
307-
308251
/**
309252
* Create an initial AmqpOutboundEndpointSpec (gateway).
310253
* @param amqpTemplate the amqpTemplate.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
20+
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
21+
22+
import com.rabbitmq.stream.Codec;
23+
import com.rabbitmq.stream.Environment;
24+
25+
/**
26+
* Factory class for RabbitMQ components.
27+
*
28+
* @author Gary Russell
29+
* @since 6.0
30+
*
31+
*/
32+
public final class RabbitStream {
33+
34+
private RabbitStream() {
35+
}
36+
37+
/**
38+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
39+
* with the provided {@link StreamListenerContainer}.
40+
* Note: only endpoint options are available from spec.
41+
* The {@code listenerContainer} options should be specified
42+
* on the provided {@link StreamListenerContainer} using
43+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
44+
* @param listenerContainer the listenerContainer.
45+
* @return the RabbitInboundChannelAdapterSLCSpec.
46+
*/
47+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListenerContainer listenerContainer) {
48+
return new RabbitStreamInboundChannelAdapterSpec(listenerContainer);
49+
}
50+
51+
/**
52+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
53+
* with the provided {@link Environment}.
54+
* Note: only endpoint options are available from spec.
55+
* The {@code listenerContainer} options should be specified
56+
* on the provided {@link StreamListenerContainer} using
57+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
58+
* @param environment the environment.
59+
* @return the RabbitInboundChannelAdapterSLCSpec.
60+
*/
61+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) {
62+
return new RabbitStreamInboundChannelAdapterSpec(environment, null);
63+
}
64+
65+
/**
66+
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
67+
* with the provided {@link Environment}.
68+
* Note: only endpoint options are available from spec.
69+
* The {@code listenerContainer} options should be specified
70+
* on the provided {@link StreamListenerContainer} using
71+
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
72+
* @param environment the environment.
73+
* @param codec the codec.
74+
* @return the RabbitInboundChannelAdapterSLCSpec.
75+
*/
76+
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) {
77+
return new RabbitStreamInboundChannelAdapterSpec(environment, codec);
78+
}
79+
80+
/**
81+
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
82+
* @param template the amqpTemplate.
83+
* @return the RabbitStreamMessageHandlerSpec.
84+
*/
85+
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) {
86+
return new RabbitStreamMessageHandlerSpec(template);
87+
}
88+
89+
}
Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,34 +18,33 @@
1818

1919
import java.util.function.Consumer;
2020

21-
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
2221
import org.springframework.lang.Nullable;
2322
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
2423

2524
import com.rabbitmq.stream.Codec;
2625
import com.rabbitmq.stream.Environment;
2726

2827
/**
29-
* Spec for an inbound channel adapter with a {@link DirectMessageListenerContainer}.
28+
* Spec for an inbound channel adapter with a {@link StreamListenerContainer}.
3029
*
3130
* @author Gary Russell
3231
* @author Artem Bilan
3332
*
3433
* @since 6.0
3534
*
3635
*/
37-
public class RabbitInboundChannelAdapterSLCSpec
38-
extends AmqpInboundChannelAdapterSpec<RabbitInboundChannelAdapterSLCSpec, StreamListenerContainer> {
36+
public class RabbitStreamInboundChannelAdapterSpec
37+
extends AmqpInboundChannelAdapterSpec<RabbitStreamInboundChannelAdapterSpec, StreamListenerContainer> {
3938

40-
protected RabbitInboundChannelAdapterSLCSpec(StreamListenerContainer listenerContainer) {
39+
protected RabbitStreamInboundChannelAdapterSpec(StreamListenerContainer listenerContainer) {
4140
super(new RabbitStreamMessageListenerContainerSpec(listenerContainer));
4241
}
4342

44-
protected RabbitInboundChannelAdapterSLCSpec(Environment environment, @Nullable Codec codec) {
43+
protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullable Codec codec) {
4544
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
4645
}
4746

48-
public RabbitInboundChannelAdapterSLCSpec configureContainer(
47+
public RabbitStreamInboundChannelAdapterSpec configureContainer(
4948
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {
5049

5150
configurer.accept((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec);

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,15 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19-
import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint;
2019
import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler;
2120
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
2221
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
2322
import org.springframework.integration.dsl.MessageHandlerSpec;
23+
import org.springframework.messaging.MessageChannel;
2424
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
2525

2626
/**
27-
* The base {@link MessageHandlerSpec} for {@link AbstractAmqpOutboundEndpoint}s.
27+
* The base {@link MessageHandlerSpec} for {@link RabbitStreamMessageHandler}s.
2828
*
2929
* @author Gary Russell
3030
*
@@ -42,7 +42,7 @@ public class RabbitStreamMessageHandlerSpec
4242
/**
4343
* Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers.
4444
* @param headerMapper the {@link AmqpHeaderMapper} to use.
45-
* @return the spec
45+
* @return this spec.
4646
*/
4747
public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper) {
4848
this.target.setHeaderMapper(headerMapper);
@@ -53,7 +53,7 @@ public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper
5353
* Provide the header names that should be mapped from a request to a
5454
* {@link org.springframework.messaging.MessageHeaders}.
5555
* @param headers The request header names.
56-
* @return the spec
56+
* @return this spec.
5757
*/
5858
public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) {
5959
this.headerMapper.setRequestHeaderNames(headers);
@@ -64,35 +64,64 @@ public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) {
6464
* Determine whether the headers are
6565
* mapped before the message is converted, or afterwards.
6666
* @param headersLast true to map headers last.
67-
* @return the spec.
68-
* @see AbstractAmqpOutboundEndpoint#setHeadersMappedLast(boolean)
67+
* @return this spec.
68+
* @see RabbitStreamMessageHandler#setHeadersMappedLast(boolean)
6969
*/
7070
public RabbitStreamMessageHandlerSpec headersMappedLast(boolean headersLast) {
7171
this.target.setHeadersMappedLast(headersLast);
7272
return this;
7373
}
7474

7575
/**
76-
* Set a callback to be invoked when a send is successful.
77-
* @param callback the callback.
76+
* Set the success channel.
77+
* @param channel the channel.
78+
* @return this spec.
7879
*/
79-
public RabbitStreamMessageHandlerSpec successCallback(RabbitStreamMessageHandler.SuccessCallback callback) {
80-
this.target.setSuccessCallback(callback);
80+
public RabbitStreamMessageHandlerSpec sendSuccessChannel(MessageChannel channel) {
81+
this.target.setSendSuccessChannel(channel);
8182
return this;
8283
}
8384

8485
/**
85-
* Set a callback to be invoked when a send fails.
86-
* @param callback the callback.
86+
* Set the failure channel. After a send failure, an
87+
* {@link org.springframework.messaging.support.ErrorMessage} will be sent
88+
* to this channel with a payload of the exception with the
89+
* failed message.
90+
* @param channel the channel.
91+
* @return this spec.
8792
*/
88-
public RabbitStreamMessageHandlerSpec failureCallback(RabbitStreamMessageHandler.FailureCallback callback) {
89-
this.target.setFailureCallback(callback);
93+
public RabbitStreamMessageHandlerSpec sendFailureChannel(MessageChannel channel) {
94+
this.target.setSendFailureChannel(channel);
95+
return this;
96+
}
97+
98+
/**
99+
* Set the success channel.
100+
* @param channel the channel.
101+
* @return this spec.
102+
*/
103+
public RabbitStreamMessageHandlerSpec sendSuccessChannel(String channel) {
104+
this.target.setSendSuccessChannelName(channel);
105+
return this;
106+
}
107+
108+
/**
109+
* Set the failure channel. After a send failure, an
110+
* {@link org.springframework.messaging.support.ErrorMessage} will be sent
111+
* to this channel with a payload of the exception with the
112+
* failed message.
113+
* @param channel the channel.
114+
* @return this spec.
115+
*/
116+
public RabbitStreamMessageHandlerSpec sendFailureChannel(String channel) {
117+
this.target.setSendFailureChannelName(channel);
90118
return this;
91119
}
92120

93121
/**
94122
* Set to true to wait for a confirmation.
95123
* @param sync true to wait.
124+
* @return this spec.
96125
* @see #setConfirmTimeout(long)
97126
*/
98127
public RabbitStreamMessageHandlerSpec sync(boolean sync) {
@@ -103,7 +132,7 @@ public RabbitStreamMessageHandlerSpec sync(boolean sync) {
103132
/**
104133
* Set a timeout for the confirm result.
105134
* @param timeout the approximate timeout.
106-
* @return the spec.
135+
* @return this spec.
107136
* @see #sync(boolean)
108137
*/
109138
public RabbitStreamMessageHandlerSpec confirmTimeout(long timeout) {

0 commit comments

Comments
 (0)