From 71c8b6ff8526ede7a54e0cfc6805495fcff4b17d Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 10:42:55 -0400 Subject: [PATCH 1/5] GH-3626: RabbitMQ Stream Support Resolves https://github.com/spring-projects/spring-integration/issues/3626 - spec for stream listener container - move message handler from scst to here; add spec --- build.gradle | 6 +- .../AbstractMessageListenerContainerSpec.java | 8 +- .../integration/amqp/dsl/Amqp.java | 59 ++++- .../dsl/AmqpInboundChannelAdapterSpec.java | 10 +- .../dsl/MessageListenerContainerSpec.java | 47 ++++ .../RabbitInboundChannelAdapterSLCSpec.java | 55 +++++ .../dsl/RabbitStreamMessageHandlerSpec.java | 114 +++++++++ ...bitStreamMessageListenerContainerSpec.java | 110 +++++++++ .../outbound/RabbitStreamMessageHandler.java | 231 ++++++++++++++++++ .../amqp/support/MappingUtils.java | 33 ++- .../integration/amqp/dsl/AmqpTests.java | 19 ++ .../RabbitStreamMessageHandlerTests.java | 122 +++++++++ .../amqp/support/RabbitTestContainer.java | 56 +++++ .../IntegrationManagementConfigurer.java | 1 - 14 files changed, 853 insertions(+), 18 deletions(-) create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/MessageListenerContainerSpec.java create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java create mode 100644 spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java create mode 100644 spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java diff --git a/build.gradle b/build.gradle index 516e7804297..03e9aca6dd6 100644 --- a/build.gradle +++ b/build.gradle @@ -99,7 +99,7 @@ ext { rsocketVersion = '1.1.3' servletApiVersion = '5.0.0' smackVersion = '4.4.6' - springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-M4' + springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-SNAPSHOT' springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2022.0.0-M6' springGraphqlVersion = '1.1.0-M1' springKafkaVersion = '3.0.0-M6' @@ -470,12 +470,16 @@ project('spring-integration-amqp') { api("org.springframework.amqp:spring-rabbit:$springAmqpVersion") { exclude group: 'org.springframework' } + optionalApi("org.springframework.amqp:spring-rabbit-stream:$springAmqpVersion") { + exclude group: 'org.springframework' + } testImplementation("org.springframework.amqp:spring-rabbit-junit:$springAmqpVersion") { exclude group: 'org.springframework' } testImplementation project(':spring-integration-stream') testImplementation 'org.springframework:spring-web' + testImplementation 'org.testcontainers:rabbitmq' } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java index 2e60c34f596..5e5eb31215f 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AbstractMessageListenerContainerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,14 +28,14 @@ import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; import org.springframework.amqp.support.ConditionalExceptionLogger; import org.springframework.amqp.support.ConsumerTagStrategy; -import org.springframework.integration.dsl.IntegrationComponentSpec; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.interceptor.TransactionAttribute; import org.springframework.util.ErrorHandler; import org.springframework.util.backoff.BackOff; /** - * Base class for container specs. + * Base class for container specs for containers that extend + * {@link AbstractMessageListenerContainer}. * * @param the current spec extension type * @param the listener container type @@ -48,7 +48,7 @@ */ public abstract class AbstractMessageListenerContainerSpec, C extends AbstractMessageListenerContainer> - extends IntegrationComponentSpec { + extends MessageListenerContainerSpec { public AbstractMessageListenerContainerSpec(C listenerContainer) { this.target = listenerContainer; diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java index 33b7a704667..1425c929d2c 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,11 @@ import org.springframework.integration.amqp.channel.PollableAmqpChannel; import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory; import org.springframework.lang.Nullable; +import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Environment; /** * Factory class for AMQP components. @@ -239,6 +244,49 @@ public static AmqpInboundChannelAdapterDMLCSpec inboundAdapter(DirectMessageList return new AmqpInboundChannelAdapterDMLCSpec(listenerContainer); } + /** + * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} + * with the provided {@link StreamListenerContainer}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. + * @param listenerContainer the listenerContainer. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(StreamListenerContainer listenerContainer) { + return new RabbitInboundChannelAdapterSLCSpec(listenerContainer); + } + + /** + * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} + * with the provided {@link Environment}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. + * @param environment the environment. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment) { + return new RabbitInboundChannelAdapterSLCSpec(environment, null); + } + + /** + * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} + * with the provided {@link Environment}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. + * @param environment the environment. + * @param codec the codec. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment, Codec codec) { + return new RabbitInboundChannelAdapterSLCSpec(environment, codec); + } + /** * Create an initial AmqpOutboundEndpointSpec (adapter). * @param amqpTemplate the amqpTemplate. @@ -248,6 +296,15 @@ public static AmqpOutboundChannelAdapterSpec outboundAdapter(AmqpTemplate amqpTe return new AmqpOutboundChannelAdapterSpec(amqpTemplate); } + /** + * Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter). + * @param template the amqpTemplate. + * @return the RabbitStreamMessageHandlerSpec. + */ + public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) { + return new RabbitStreamMessageHandlerSpec(template); + } + /** * Create an initial AmqpOutboundEndpointSpec (gateway). * @param amqpTemplate the amqpTemplate. diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java index 057be05453d..c3042ad6c2a 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ import java.util.Collections; import java.util.Map; -import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.dsl.ComponentsRegistration; @@ -36,13 +36,13 @@ * @since 5.0 */ public abstract class AmqpInboundChannelAdapterSpec - , C extends AbstractMessageListenerContainer> + , C extends MessageListenerContainer> extends AmqpBaseInboundChannelAdapterSpec implements ComponentsRegistration { - protected final AbstractMessageListenerContainerSpec listenerContainerSpec; // NOSONAR final + protected final MessageListenerContainerSpec listenerContainerSpec; // NOSONAR final - protected AmqpInboundChannelAdapterSpec(AbstractMessageListenerContainerSpec listenerContainerSpec) { + protected AmqpInboundChannelAdapterSpec(MessageListenerContainerSpec listenerContainerSpec) { super(new AmqpInboundChannelAdapter(listenerContainerSpec.get())); this.listenerContainerSpec = listenerContainerSpec; } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/MessageListenerContainerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/MessageListenerContainerSpec.java new file mode 100644 index 00000000000..0c748671dfd --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/MessageListenerContainerSpec.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.integration.dsl.IntegrationComponentSpec; + +/** + * Base class for container specs. + * + * @param the current spec extension type + * @param the listener container type + * + * @author Gary Russell + * + * @since 6.0 + * + */ +public abstract class MessageListenerContainerSpec, + C extends MessageListenerContainer> + extends IntegrationComponentSpec { + + /** + * Set the queue names. + * @param queueNames the queue names. + * @return this spec. + */ + public S queueName(String... queueNames) { + this.target.setQueueNames(queueNames); + return _this(); + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java new file mode 100644 index 00000000000..081b8650062 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import java.util.function.Consumer; + +import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; +import org.springframework.lang.Nullable; +import org.springframework.rabbit.stream.listener.StreamListenerContainer; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Environment; + +/** + * Spec for an inbound channel adapter with a {@link DirectMessageListenerContainer}. + * + * @author Gary Russell + * @author Artem Bilan + * + * @since 6.0 + * + */ +public class RabbitInboundChannelAdapterSLCSpec + extends AmqpInboundChannelAdapterSpec { + + protected RabbitInboundChannelAdapterSLCSpec(StreamListenerContainer listenerContainer) { + super(new RabbitStreamMessageListenerContainerSpec(listenerContainer)); + } + + protected RabbitInboundChannelAdapterSLCSpec(Environment environment, @Nullable Codec codec) { + super(new RabbitStreamMessageListenerContainerSpec(environment, codec)); + } + + public RabbitInboundChannelAdapterSLCSpec configureContainer( + Consumer configurer) { + + configurer.accept((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec); + return this; + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java new file mode 100644 index 00000000000..c9a93b25deb --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint; +import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; + +/** + * The base {@link MessageHandlerSpec} for {@link AbstractAmqpOutboundEndpoint}s. + * + * @author Gary Russell + * + * @since 6.0 + */ +public class RabbitStreamMessageHandlerSpec + extends MessageHandlerSpec { + + private final DefaultAmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper(); + + RabbitStreamMessageHandlerSpec(RabbitStreamOperations operations) { + this.target = new RabbitStreamMessageHandler(operations); + } + + /** + * Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers. + * @param headerMapper the {@link AmqpHeaderMapper} to use. + * @return the spec + */ + public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper) { + this.target.setHeaderMapper(headerMapper); + return this; + } + + /** + * Provide the header names that should be mapped from a request to a + * {@link org.springframework.messaging.MessageHeaders}. + * @param headers The request header names. + * @return the spec + */ + public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) { + this.headerMapper.setRequestHeaderNames(headers); + return this; + } + + /** + * Determine whether the headers are + * mapped before the message is converted, or afterwards. + * @param headersLast true to map headers last. + * @return the spec. + * @see AbstractAmqpOutboundEndpoint#setHeadersMappedLast(boolean) + */ + public RabbitStreamMessageHandlerSpec headersMappedLast(boolean headersLast) { + this.target.setHeadersMappedLast(headersLast); + return this; + } + + /** + * Set a callback to be invoked when a send is successful. + * @param callback the callback. + */ + public RabbitStreamMessageHandlerSpec successCallback(RabbitStreamMessageHandler.SuccessCallback callback) { + this.target.setSuccessCallback(callback); + return this; + } + + /** + * Set a callback to be invoked when a send fails. + * @param callback the callback. + */ + public RabbitStreamMessageHandlerSpec failureCallback(RabbitStreamMessageHandler.FailureCallback callback) { + this.target.setFailureCallback(callback); + return this; + } + + /** + * Set to true to wait for a confirmation. + * @param sync true to wait. + * @see #setConfirmTimeout(long) + */ + public RabbitStreamMessageHandlerSpec sync(boolean sync) { + this.target.setSync(sync); + return this; + } + + /** + * Set a timeout for the confirm result. + * @param timeout the approximate timeout. + * @return the spec. + * @see #sync(boolean) + */ + public RabbitStreamMessageHandlerSpec confirmTimeout(long timeout) { + this.target.setConfirmTimeout(timeout); + return this; + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java new file mode 100644 index 00000000000..dff3de9e19d --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java @@ -0,0 +1,110 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import java.util.function.Consumer; + +import org.aopalliance.aop.Advice; + +import org.springframework.lang.Nullable; +import org.springframework.rabbit.stream.listener.ConsumerCustomizer; +import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Environment; + +/** + * Spec for {@link StreamListenerContainer}. + * + * @author Gary Russell + * @since 6.0 + * + */ +public class RabbitStreamMessageListenerContainerSpec extends + MessageListenerContainerSpec { + + RabbitStreamMessageListenerContainerSpec(StreamListenerContainer container) { + this.target = container; + } + + RabbitStreamMessageListenerContainerSpec(Environment environment, @Nullable Codec codec) { + this.target = new StreamListenerContainer(environment, codec); + } + + /** + * Set the Stream queue name; + * Mutually exclusive with {@link #superStream(String, String)}. + * @return this spec. + */ + public RabbitStreamMessageListenerContainerSpec queueName(String queueName) { + return super.queueName(queueName); + } + + /** + * Enable Single Active Consumer on a Super Stream. + * Mutually exclusive with {@link #setQueueName(String...)}. + * @param superStream the stream. + * @param name the consumer name. + * @return this spec. + */ + public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name) { + this.target.superStream(superStream, name); + return this; + } + + /** + * Set a stream message converter. + * @param converter the converter. + * @return this spec. + */ + public RabbitStreamMessageListenerContainerSpec streamConverter(StreamMessageConverter converter) { + this.target.setStreamConverter(converter); + return this; + } + + /** + * Set a consumer customizer. + * @param customizer the customizer. + * @return this spec. + */ + public RabbitStreamMessageListenerContainerSpec consumerCustomizer(ConsumerCustomizer customizer) { + this.target.setConsumerCustomizer(customizer); + return this; + } + + /** + * @param adviceChain the adviceChain. + * @return the spec. + * @see StreamListenerContainer#setAdviceChain(Advice[]) + */ + public RabbitStreamMessageListenerContainerSpec adviceChain(Advice... adviceChain) { + this.target.setAdviceChain(adviceChain); + return this; + } + + /** + * Perform additional configuration of the container. + * @param consumer a consumer for the container. + * @return this spec. + */ + public RabbitStreamMessageListenerContainerSpec configure(Consumer consumer) { + consumer.accept(this.target); + return this; + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java new file mode 100644 index 00000000000..c41670e9f38 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java @@ -0,0 +1,231 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.outbound; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.Lifecycle; +import org.springframework.integration.amqp.support.AmqpHeaderMapper; +import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.amqp.support.MappingUtils; +import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.support.StreamMessageProperties; +import org.springframework.util.Assert; + +/** + * {@link MessageHandler} based on {@link RabbitStreamOperations}. + * + * @author Gary Russell + * @author Chris Bono + * @since 6.0 + * + */ +public class RabbitStreamMessageHandler extends AbstractMessageHandler implements Lifecycle { + + private static final int DEFAULT_CONFIRM_TIMEOUT = 10_000; + + private final RabbitStreamOperations streamOperations; + + private boolean sync; + + private long confirmTimeout = DEFAULT_CONFIRM_TIMEOUT; + + private SuccessCallback successCallback = msg -> { }; + + private FailureCallback failureCallback = (msg, ex) -> { }; + + private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper(); + + private boolean headersMappedLast; + + /** + * Create an instance with the provided {@link RabbitStreamOperations}. + * @param streamOperations the operations. + */ + public RabbitStreamMessageHandler(RabbitStreamOperations streamOperations) { + Assert.notNull(streamOperations, "'streamOperations' cannot be null"); + this.streamOperations = streamOperations; + } + + /** + * Set a callback to be invoked when a send is successful. + * @param successCallback the callback. + */ + public void setSuccessCallback(SuccessCallback successCallback) { + Assert.notNull(successCallback, "'successCallback' cannot be null"); + this.successCallback = successCallback; + } + + /** + * Set a callback to be invoked when a send fails. + * @param failureCallback the callback. + */ + public void setFailureCallback(FailureCallback failureCallback) { + Assert.notNull(failureCallback, "'failureCallback' cannot be null"); + this.failureCallback = failureCallback; + } + + /** + * Set to true to wait for a confirmation. + * @param sync true to wait. + * @see #setConfirmTimeout(long) + */ + public void setSync(boolean sync) { + this.sync = sync; + } + + /** + * Set the confirm timeout. + * @param confirmTimeout the timeout. + * @see #setSync(boolean) + */ + public void setConfirmTimeout(long confirmTimeout) { + this.confirmTimeout = confirmTimeout; + } + + /** + * Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers. + * Defaults to {@link DefaultAmqpHeaderMapper#outboundMapper()}. + * @param headerMapper the {@link AmqpHeaderMapper} to use. + */ + public void setHeaderMapper(AmqpHeaderMapper headerMapper) { + Assert.notNull(headerMapper, "headerMapper must not be null"); + this.headerMapper = headerMapper; + } + + /** + * When mapping headers for the outbound message, determine whether the headers are + * mapped before the message is converted, or afterwards. This only affects headers + * that might be added by the message converter. When false, the converter's headers + * win; when true, any headers added by the converter will be overridden (if the + * source message has a header that maps to those headers). You might wish to set this + * to true, for example, when using a + * {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a + * String payload that contains json; the converter will set the content type to + * {@code text/plain} which can be overridden to {@code application/json} by setting + * the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false. + * @param headersMappedLast true if headers are mapped after conversion. + */ + public void setHeadersMappedLast(boolean headersMappedLast) { + this.headersMappedLast = headersMappedLast; + } + + /** + * Return the {@link RabbitStreamOperations}. + * @return the operations. + */ + public RabbitStreamOperations getStreamOperations() { + return this.streamOperations; + } + + @Override + protected void handleMessageInternal(Message requestMessage) { + CompletableFuture future; + com.rabbitmq.stream.Message streamMessage; + if (requestMessage.getPayload() instanceof com.rabbitmq.stream.Message) { + streamMessage = (com.rabbitmq.stream.Message) requestMessage.getPayload(); + } + else { + MessageConverter converter = this.streamOperations.messageConverter(); + org.springframework.amqp.core.Message amqpMessage = mapMessage(requestMessage, converter, + this.headerMapper, this.headersMappedLast); + streamMessage = this.streamOperations.streamMessageConverter().fromMessage(amqpMessage); + } + future = this.streamOperations.send(streamMessage); + handleConfirms(requestMessage, future); + } + + private void handleConfirms(Message message, CompletableFuture future) { + future.whenComplete((bool, ex) -> { + if (ex != null) { + this.failureCallback.failure(message, ex); + } + else { + this.successCallback.onSuccess(message); + } + }); + if (this.sync) { + try { + future.get(this.confirmTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new MessageHandlingException(message, ex); + } + catch (ExecutionException | TimeoutException ex) { + throw new MessageHandlingException(message, ex); + } + } + } + + private static org.springframework.amqp.core.Message mapMessage(Message message, + MessageConverter converter, AmqpHeaderMapper headerMapper, boolean headersMappedLast) { + + MessageProperties amqpMessageProperties = new StreamMessageProperties(); + return MappingUtils.mapMessage(message, converter, headerMapper, headersMappedLast, headersMappedLast, + amqpMessageProperties); + } + + @Override + public void start() { + } + + @Override + public void stop() { + this.streamOperations.close(); + } + + @Override + public boolean isRunning() { + return true; + } + + /** + * Callback for when publishing succeeds. + */ + public interface SuccessCallback { + /** + * Called when the future completes with success. + * Note that Exceptions raised by this method are ignored. + * @param result the result of the future + */ + void onSuccess(Message result); + } + + /** + * Callback for when publishing fails. + */ + public interface FailureCallback { + /** + * Message publish failure. + * @param message the message. + * @param throwable the throwable. + */ + void failure(Message message, Throwable throwable); + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java index 9259440971d..361d57fbb6b 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,17 +73,39 @@ public static org.springframework.amqp.core.Message mapMessage(Message reques * @since 5.1.9 */ public static org.springframework.amqp.core.Message mapReplyMessage(Message replyMessage, - MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode, - boolean headersMappedLast) { + MessageConverter converter, AmqpHeaderMapper headerMapper, + @Nullable MessageDeliveryMode defaultDeliveryMode, boolean headersMappedLast) { return doMapMessage(replyMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, true); } private static org.springframework.amqp.core.Message doMapMessage(Message message, - MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode, - boolean headersMappedLast, boolean reply) { + MessageConverter converter, AmqpHeaderMapper headerMapper, + @Nullable MessageDeliveryMode defaultDeliveryMode, boolean headersMappedLast, boolean reply) { MessageProperties amqpMessageProperties = new MessageProperties(); + org.springframework.amqp.core.Message amqpMessage = mapMessage(message, converter, headerMapper, + headersMappedLast, reply, amqpMessageProperties); + checkDeliveryMode(message, amqpMessageProperties, defaultDeliveryMode); + return amqpMessage; + } + + /** + * Map a reply o.s.m.Message to an o.s.a.core.Message. When using a + * {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and + * {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP + * header taking precedence. + * @param replyMessage the reply message. + * @param converter the message converter to use. + * @param headerMapper the header mapper to use. + * @param headersMappedLast true if headers are mapped after conversion. + * @return the mapped Message. + * @since 6.0 + */ + public static org.springframework.amqp.core.Message mapMessage(Message message, MessageConverter converter, + AmqpHeaderMapper headerMapper, boolean headersMappedLast, boolean reply, + MessageProperties amqpMessageProperties) { + org.springframework.amqp.core.Message amqpMessage; if (!headersMappedLast) { mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply); @@ -98,7 +120,6 @@ private static org.springframework.amqp.core.Message doMapMessage(Message mes if (headersMappedLast) { mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply); } - checkDeliveryMode(message, amqpMessageProperties, defaultDeliveryMode); return amqpMessage; } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java index 816acd3fe4b..5b8915ffe24 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java @@ -17,6 +17,10 @@ package org.springframework.integration.amqp.dsl; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.util.Collections; import java.util.HashMap; @@ -67,9 +71,13 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; +import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.Environment; + /** * @author Artem Bilan * @author Gary Russell @@ -263,6 +271,17 @@ void testContentTypeOverrideWithReplyHeadersMappedLast() { registration.destroy(); } + @Test + void streamContainer() { + Environment env = mock(Environment.class); + given(env.consumerBuilder()).willReturn(mock(ConsumerBuilder.class)); + RabbitInboundChannelAdapterSLCSpec inboundAdapter = Amqp.inboundAdapter(env); + ConsumerCustomizer customizer = mock(ConsumerCustomizer.class); + inboundAdapter.configureContainer(container -> container.consumerCustomizer(customizer)); + inboundAdapter.start(); + verify(customizer).accept(any(), any()); + } + @Configuration @EnableIntegration public static class ContextConfiguration { diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java new file mode 100644 index 00000000000..5d762c97919 --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java @@ -0,0 +1,122 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.outbound; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.RabbitMQContainer; + +import org.springframework.integration.amqp.dsl.Amqp; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; + +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.OffsetSpecification; + +/** + * @author Gary Russell + * @author Chris Bono + * @since 6.0 + */ +public class RabbitStreamMessageHandlerTests { + + private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance(); + + @Test + void convertAndSend() throws InterruptedException { + Environment env = Environment.builder() + .lazyInitialization(true) + .addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552))) + .build(); + try { + env.deleteStream("stream.stream"); + } + catch (Exception e) { + } + env.streamCreator().stream("stream.stream").create(); + RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream"); + + RabbitStreamMessageHandler handler = Amqp.outboundStreamAdapter(streamTemplate) + .sync(true) + .get(); + + handler.handleMessage(MessageBuilder.withPayload("foo") + .setHeader("bar", "baz") + .build()); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + Consumer consumer = env.consumerBuilder().stream("stream.stream") + .offset(OffsetSpecification.first()) + .messageHandler((context, msg) -> { + received.set(msg); + latch.countDown(); + }) + .build(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(received.get()).isNotNull(); + assertThat(received.get().getBodyAsBinary()).isEqualTo("foo".getBytes()); + assertThat((String) received.get().getApplicationProperties().get("bar")).isEqualTo("baz"); + consumer.close(); + handler.stop(); + } + + @Test + void sendNative() throws InterruptedException { + Environment env = Environment.builder() + .lazyInitialization(true) + .build(); + try { + env.deleteStream("stream.stream"); + } + catch (Exception e) { + } + env.streamCreator().stream("stream.stream").create(); + RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream"); + RabbitStreamMessageHandler handler = new RabbitStreamMessageHandler(streamTemplate); + handler.setSync(true); + handler.handleMessage(MessageBuilder.withPayload(streamTemplate.messageBuilder() + .addData("foo".getBytes()) + .applicationProperties().entry("bar", "baz") + .messageBuilder() + .build()) + .build()); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + Consumer consumer = env.consumerBuilder().stream("stream.stream") + .offset(OffsetSpecification.first()) + .messageHandler((context, msg) -> { + received.set(msg); + latch.countDown(); + }) + .build(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(received.get()).isNotNull(); + assertThat(received.get().getBodyAsBinary()).isEqualTo("foo".getBytes()); + assertThat((String) received.get().getApplicationProperties().get("bar")).isEqualTo("baz"); + consumer.close(); + handler.stop(); + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java new file mode 100644 index 00000000000..f41b474c0b9 --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.support; + +import java.time.Duration; + +import org.testcontainers.containers.RabbitMQContainer; + +/** + * Provides a static {@link RabbitMQContainer} that can be shared across test classes. + * + * @author Chris Bono + */ +public final class RabbitTestContainer { + + private static final RabbitMQContainer RABBITMQ; + + static { + String image = "rabbitmq:management"; + String cache = System.getenv().get("IMAGE_CACHE"); + if (cache != null) { + image = cache + image; + } + RABBITMQ = new RabbitMQContainer(image) + .withExposedPorts(5672, 15672, 5552) + .withEnv("RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS", "-rabbitmq_stream advertised_host localhost") + .withPluginsEnabled("rabbitmq_stream") + .withStartupTimeout(Duration.ofMinutes(2)); + RABBITMQ.start(); + } + + private RabbitTestContainer() { + } + + /** + * Should be called early by test that wants to ensure a shared {@link RabbitMQContainer} is up and running. + */ + public static RabbitMQContainer sharedInstance() { + return RABBITMQ; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java index aaffea57d14..208e414b9fc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java @@ -162,7 +162,6 @@ private MetricsCaptor obtainMetricsCaptor() { return this.metricsCaptor; } - @Nullable private void setupObservationRegistry() { if (this.observationRegistry == null && this.observationRegistryProvider != null) { this.observationRegistry = this.observationRegistryProvider.getIfUnique(); From aa7370370530cd1369adb8ce2cc8b096ec6ce14d Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 11:36:02 -0400 Subject: [PATCH 2/5] Fix test. --- .../amqp/outbound/RabbitStreamMessageHandlerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java index 5d762c97919..e9a7203278b 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java @@ -85,6 +85,7 @@ void convertAndSend() throws InterruptedException { @Test void sendNative() throws InterruptedException { Environment env = Environment.builder() + .addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552))) .lazyInitialization(true) .build(); try { From 68e0783ba9e62d1881f8c806e82992af44f72861 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 12:53:31 -0400 Subject: [PATCH 3/5] Polishing and Docs. --- .../integration/amqp/dsl/Amqp.java | 57 -------- .../integration/amqp/dsl/RabbitStream.java | 89 ++++++++++++ ...abbitStreamInboundChannelAdapterSpec.java} | 15 +-- .../dsl/RabbitStreamMessageHandlerSpec.java | 61 ++++++--- .../outbound/RabbitStreamMessageHandler.java | 127 ++++++++++-------- .../integration/amqp/dsl/AmqpTests.java | 2 +- .../RabbitStreamMessageHandlerTests.java | 17 +-- .../amqp/support/RabbitTestContainer.java | 33 ++--- src/reference/asciidoc/amqp.adoc | 54 +++++++- src/reference/asciidoc/whats-new.adoc | 5 + 10 files changed, 295 insertions(+), 165 deletions(-) create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java rename spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/{RabbitInboundChannelAdapterSLCSpec.java => RabbitStreamInboundChannelAdapterSpec.java} (67%) diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java index 1425c929d2c..38d2cc45d61 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java @@ -25,11 +25,6 @@ import org.springframework.integration.amqp.channel.PollableAmqpChannel; import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory; import org.springframework.lang.Nullable; -import org.springframework.rabbit.stream.listener.StreamListenerContainer; -import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; - -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.Environment; /** * Factory class for AMQP components. @@ -244,49 +239,6 @@ public static AmqpInboundChannelAdapterDMLCSpec inboundAdapter(DirectMessageList return new AmqpInboundChannelAdapterDMLCSpec(listenerContainer); } - /** - * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} - * with the provided {@link StreamListenerContainer}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified - * on the provided {@link StreamListenerContainer} using - * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. - * @param listenerContainer the listenerContainer. - * @return the RabbitInboundChannelAdapterSLCSpec. - */ - public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(StreamListenerContainer listenerContainer) { - return new RabbitInboundChannelAdapterSLCSpec(listenerContainer); - } - - /** - * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} - * with the provided {@link Environment}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified - * on the provided {@link StreamListenerContainer} using - * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. - * @param environment the environment. - * @return the RabbitInboundChannelAdapterSLCSpec. - */ - public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment) { - return new RabbitInboundChannelAdapterSLCSpec(environment, null); - } - - /** - * Create an initial {@link RabbitInboundChannelAdapterSLCSpec} - * with the provided {@link Environment}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified - * on the provided {@link StreamListenerContainer} using - * {@link RabbitInboundChannelAdapterSLCSpec#configureContainer(java.util.function.Consumer)}. - * @param environment the environment. - * @param codec the codec. - * @return the RabbitInboundChannelAdapterSLCSpec. - */ - public static RabbitInboundChannelAdapterSLCSpec inboundAdapter(Environment environment, Codec codec) { - return new RabbitInboundChannelAdapterSLCSpec(environment, codec); - } - /** * Create an initial AmqpOutboundEndpointSpec (adapter). * @param amqpTemplate the amqpTemplate. @@ -296,15 +248,6 @@ public static AmqpOutboundChannelAdapterSpec outboundAdapter(AmqpTemplate amqpTe return new AmqpOutboundChannelAdapterSpec(amqpTemplate); } - /** - * Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter). - * @param template the amqpTemplate. - * @return the RabbitStreamMessageHandlerSpec. - */ - public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) { - return new RabbitStreamMessageHandlerSpec(template); - } - /** * Create an initial AmqpOutboundEndpointSpec (gateway). * @param amqpTemplate the amqpTemplate. diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java new file mode 100644 index 00000000000..dd0c760738e --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java @@ -0,0 +1,89 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Environment; + +/** + * Factory class for RabbitMQ components. + * + * @author Gary Russell + * @since 6.0 + * + */ +public final class RabbitStream { + + private RabbitStream() { + } + + /** + * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} + * with the provided {@link StreamListenerContainer}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. + * @param listenerContainer the listenerContainer. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListenerContainer listenerContainer) { + return new RabbitStreamInboundChannelAdapterSpec(listenerContainer); + } + + /** + * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} + * with the provided {@link Environment}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. + * @param environment the environment. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) { + return new RabbitStreamInboundChannelAdapterSpec(environment, null); + } + + /** + * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} + * with the provided {@link Environment}. + * Note: only endpoint options are available from spec. + * The {@code listenerContainer} options should be specified + * on the provided {@link StreamListenerContainer} using + * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. + * @param environment the environment. + * @param codec the codec. + * @return the RabbitInboundChannelAdapterSLCSpec. + */ + public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) { + return new RabbitStreamInboundChannelAdapterSpec(environment, codec); + } + + /** + * Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter). + * @param template the amqpTemplate. + * @return the RabbitStreamMessageHandlerSpec. + */ + public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(RabbitStreamTemplate template) { + return new RabbitStreamMessageHandlerSpec(template); + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java similarity index 67% rename from spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java rename to spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java index 081b8650062..09c9f54a237 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitInboundChannelAdapterSLCSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import java.util.function.Consumer; -import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.lang.Nullable; import org.springframework.rabbit.stream.listener.StreamListenerContainer; @@ -26,7 +25,7 @@ import com.rabbitmq.stream.Environment; /** - * Spec for an inbound channel adapter with a {@link DirectMessageListenerContainer}. + * Spec for an inbound channel adapter with a {@link StreamListenerContainer}. * * @author Gary Russell * @author Artem Bilan @@ -34,18 +33,18 @@ * @since 6.0 * */ -public class RabbitInboundChannelAdapterSLCSpec - extends AmqpInboundChannelAdapterSpec { +public class RabbitStreamInboundChannelAdapterSpec + extends AmqpInboundChannelAdapterSpec { - protected RabbitInboundChannelAdapterSLCSpec(StreamListenerContainer listenerContainer) { + protected RabbitStreamInboundChannelAdapterSpec(StreamListenerContainer listenerContainer) { super(new RabbitStreamMessageListenerContainerSpec(listenerContainer)); } - protected RabbitInboundChannelAdapterSLCSpec(Environment environment, @Nullable Codec codec) { + protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullable Codec codec) { super(new RabbitStreamMessageListenerContainerSpec(environment, codec)); } - public RabbitInboundChannelAdapterSLCSpec configureContainer( + public RabbitStreamInboundChannelAdapterSpec configureContainer( Consumer configurer) { configurer.accept((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec); diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java index c9a93b25deb..fde1aef5212 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,15 @@ package org.springframework.integration.amqp.dsl; -import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint; import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.messaging.MessageChannel; import org.springframework.rabbit.stream.producer.RabbitStreamOperations; /** - * The base {@link MessageHandlerSpec} for {@link AbstractAmqpOutboundEndpoint}s. + * The base {@link MessageHandlerSpec} for {@link RabbitStreamMessageHandler}s. * * @author Gary Russell * @@ -42,7 +42,7 @@ public class RabbitStreamMessageHandlerSpec /** * Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers. * @param headerMapper the {@link AmqpHeaderMapper} to use. - * @return the spec + * @return this spec. */ public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper) { this.target.setHeaderMapper(headerMapper); @@ -53,7 +53,7 @@ public RabbitStreamMessageHandlerSpec headerMapper(AmqpHeaderMapper headerMapper * Provide the header names that should be mapped from a request to a * {@link org.springframework.messaging.MessageHeaders}. * @param headers The request header names. - * @return the spec + * @return this spec. */ public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) { this.headerMapper.setRequestHeaderNames(headers); @@ -64,8 +64,8 @@ public RabbitStreamMessageHandlerSpec mappedRequestHeaders(String... headers) { * Determine whether the headers are * mapped before the message is converted, or afterwards. * @param headersLast true to map headers last. - * @return the spec. - * @see AbstractAmqpOutboundEndpoint#setHeadersMappedLast(boolean) + * @return this spec. + * @see RabbitStreamMessageHandler#setHeadersMappedLast(boolean) */ public RabbitStreamMessageHandlerSpec headersMappedLast(boolean headersLast) { this.target.setHeadersMappedLast(headersLast); @@ -73,26 +73,55 @@ public RabbitStreamMessageHandlerSpec headersMappedLast(boolean headersLast) { } /** - * Set a callback to be invoked when a send is successful. - * @param callback the callback. + * Set the success channel. + * @param channel the channel. + * @return this spec. */ - public RabbitStreamMessageHandlerSpec successCallback(RabbitStreamMessageHandler.SuccessCallback callback) { - this.target.setSuccessCallback(callback); + public RabbitStreamMessageHandlerSpec sendSuccessChannel(MessageChannel channel) { + this.target.setSendSuccessChannel(channel); return this; } /** - * Set a callback to be invoked when a send fails. - * @param callback the callback. + * Set the failure channel. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be sent + * to this channel with a payload of the exception with the + * failed message. + * @param channel the channel. + * @return this spec. */ - public RabbitStreamMessageHandlerSpec failureCallback(RabbitStreamMessageHandler.FailureCallback callback) { - this.target.setFailureCallback(callback); + public RabbitStreamMessageHandlerSpec sendFailureChannel(MessageChannel channel) { + this.target.setSendFailureChannel(channel); + return this; + } + + /** + * Set the success channel. + * @param channel the channel. + * @return this spec. + */ + public RabbitStreamMessageHandlerSpec sendSuccessChannel(String channel) { + this.target.setSendSuccessChannelName(channel); + return this; + } + + /** + * Set the failure channel. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be sent + * to this channel with a payload of the exception with the + * failed message. + * @param channel the channel. + * @return this spec. + */ + public RabbitStreamMessageHandlerSpec sendFailureChannel(String channel) { + this.target.setSendFailureChannelName(channel); return this; } /** * Set to true to wait for a confirmation. * @param sync true to wait. + * @return this spec. * @see #setConfirmTimeout(long) */ public RabbitStreamMessageHandlerSpec sync(boolean sync) { @@ -103,7 +132,7 @@ public RabbitStreamMessageHandlerSpec sync(boolean sync) { /** * Set a timeout for the confirm result. * @param timeout the approximate timeout. - * @return the spec. + * @return this spec. * @see #sync(boolean) */ public RabbitStreamMessageHandlerSpec confirmTimeout(long timeout) { diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java index c41670e9f38..ec74dcc842d 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,14 +24,15 @@ import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.context.Lifecycle; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.amqp.support.MappingUtils; -import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.handler.AbstractMessageProducingHandler; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.support.ErrorMessage; import org.springframework.rabbit.stream.producer.RabbitStreamOperations; import org.springframework.rabbit.stream.support.StreamMessageProperties; import org.springframework.util.Assert; @@ -44,7 +45,7 @@ * @since 6.0 * */ -public class RabbitStreamMessageHandler extends AbstractMessageHandler implements Lifecycle { +public class RabbitStreamMessageHandler extends AbstractMessageProducingHandler { private static final int DEFAULT_CONFIRM_TIMEOUT = 10_000; @@ -54,9 +55,13 @@ public class RabbitStreamMessageHandler extends AbstractMessageHandler implement private long confirmTimeout = DEFAULT_CONFIRM_TIMEOUT; - private SuccessCallback successCallback = msg -> { }; + private MessageChannel sendFailureChannel; - private FailureCallback failureCallback = (msg, ex) -> { }; + private String sendFailureChannelName; + + private MessageChannel sendSuccessChannel; + + private String sendSuccessChannelName; private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper(); @@ -72,21 +77,41 @@ public RabbitStreamMessageHandler(RabbitStreamOperations streamOperations) { } /** - * Set a callback to be invoked when a send is successful. - * @param successCallback the callback. + * Set the failure channel. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be sent + * to this channel with a payload of the exception with the + * failed message. + * @param sendFailureChannel the failure channel. + */ + public void setSendFailureChannel(MessageChannel sendFailureChannel) { + this.sendFailureChannel = sendFailureChannel; + } + + /** + * Set the failure channel name. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be sent + * to this channel with a payload of the exception with the + * failed message. + * @param sendFailureChannelName the failure channel name. + */ + public void setSendFailureChannelName(String sendFailureChannelName) { + this.sendFailureChannelName = sendFailureChannelName; + } + + /** + * Set the success channel. + * @param sendSuccessChannel the success channel. */ - public void setSuccessCallback(SuccessCallback successCallback) { - Assert.notNull(successCallback, "'successCallback' cannot be null"); - this.successCallback = successCallback; + public void setSendSuccessChannel(MessageChannel sendSuccessChannel) { + this.sendSuccessChannel = sendSuccessChannel; } /** - * Set a callback to be invoked when a send fails. - * @param failureCallback the callback. + * Set the Success channel name. + * @param sendSuccessChannelName the success channel name. */ - public void setFailureCallback(FailureCallback failureCallback) { - Assert.notNull(failureCallback, "'failureCallback' cannot be null"); - this.failureCallback = failureCallback; + public void setSendSuccessChannelName(String sendSuccessChannelName) { + this.sendSuccessChannelName = sendSuccessChannelName; } /** @@ -142,6 +167,28 @@ public RabbitStreamOperations getStreamOperations() { return this.streamOperations; } + protected MessageChannel getSendFailureChannel() { + if (this.sendFailureChannel != null) { + return this.sendFailureChannel; + } + else if (this.sendFailureChannelName != null) { + this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName); + return this.sendFailureChannel; + } + return null; + } + + protected MessageChannel getSendSuccessChannel() { + if (this.sendSuccessChannel != null) { + return this.sendSuccessChannel; + } + else if (this.sendSuccessChannelName != null) { + this.sendSuccessChannel = getChannelResolver().resolveDestination(this.sendSuccessChannelName); + return this.sendSuccessChannel; + } + return null; + } + @Override protected void handleMessageInternal(Message requestMessage) { CompletableFuture future; @@ -162,10 +209,16 @@ protected void handleMessageInternal(Message requestMessage) { private void handleConfirms(Message message, CompletableFuture future) { future.whenComplete((bool, ex) -> { if (ex != null) { - this.failureCallback.failure(message, ex); + MessageChannel failures = getSendFailureChannel(); + if (failures != null) { + this.messagingTemplate.send(failures, new ErrorMessage(ex, message)); + } } else { - this.successCallback.onSuccess(message); + MessageChannel successes = getSendSuccessChannel(); + if (successes != null) { + this.messagingTemplate.send(successes, message); + } } }); if (this.sync) { @@ -190,42 +243,4 @@ private static org.springframework.amqp.core.Message mapMessage(Message messa amqpMessageProperties); } - @Override - public void start() { - } - - @Override - public void stop() { - this.streamOperations.close(); - } - - @Override - public boolean isRunning() { - return true; - } - - /** - * Callback for when publishing succeeds. - */ - public interface SuccessCallback { - /** - * Called when the future completes with success. - * Note that Exceptions raised by this method are ignored. - * @param result the result of the future - */ - void onSuccess(Message result); - } - - /** - * Callback for when publishing fails. - */ - public interface FailureCallback { - /** - * Message publish failure. - * @param message the message. - * @param throwable the throwable. - */ - void failure(Message message, Throwable throwable); - } - } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java index 5b8915ffe24..5d305e5c570 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java @@ -275,7 +275,7 @@ void testContentTypeOverrideWithReplyHeadersMappedLast() { void streamContainer() { Environment env = mock(Environment.class); given(env.consumerBuilder()).willReturn(mock(ConsumerBuilder.class)); - RabbitInboundChannelAdapterSLCSpec inboundAdapter = Amqp.inboundAdapter(env); + RabbitStreamInboundChannelAdapterSpec inboundAdapter = RabbitStream.inboundAdapter(env); ConsumerCustomizer customizer = mock(ConsumerCustomizer.class); inboundAdapter.configureContainer(container -> container.consumerCustomizer(customizer)); inboundAdapter.start(); diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java index e9a7203278b..711d5eb9965 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java @@ -23,9 +23,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.RabbitMQContainer; -import org.springframework.integration.amqp.dsl.Amqp; +import org.springframework.integration.amqp.dsl.RabbitStream; import org.springframework.integration.amqp.support.RabbitTestContainer; import org.springframework.integration.support.MessageBuilder; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; @@ -40,15 +39,13 @@ * @author Chris Bono * @since 6.0 */ -public class RabbitStreamMessageHandlerTests { - - private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance(); +public class RabbitStreamMessageHandlerTests implements RabbitTestContainer { @Test void convertAndSend() throws InterruptedException { Environment env = Environment.builder() .lazyInitialization(true) - .addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552))) + .addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort())) .build(); try { env.deleteStream("stream.stream"); @@ -58,7 +55,7 @@ void convertAndSend() throws InterruptedException { env.streamCreator().stream("stream.stream").create(); RabbitStreamTemplate streamTemplate = new RabbitStreamTemplate(env, "stream.stream"); - RabbitStreamMessageHandler handler = Amqp.outboundStreamAdapter(streamTemplate) + RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(streamTemplate) .sync(true) .get(); @@ -79,13 +76,13 @@ void convertAndSend() throws InterruptedException { assertThat(received.get().getBodyAsBinary()).isEqualTo("foo".getBytes()); assertThat((String) received.get().getApplicationProperties().get("bar")).isEqualTo("baz"); consumer.close(); - handler.stop(); + streamTemplate.close(); } @Test void sendNative() throws InterruptedException { Environment env = Environment.builder() - .addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552))) + .addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort())) .lazyInitialization(true) .build(); try { @@ -117,7 +114,7 @@ void sendNative() throws InterruptedException { assertThat(received.get().getBodyAsBinary()).isEqualTo("foo".getBytes()); assertThat((String) received.get().getApplicationProperties().get("bar")).isEqualTo("baz"); consumer.close(); - handler.stop(); + streamTemplate.close(); } } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java index f41b474c0b9..2c9de59d12a 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/RabbitTestContainer.java @@ -18,39 +18,40 @@ import java.time.Duration; +import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.junit.jupiter.Testcontainers; /** * Provides a static {@link RabbitMQContainer} that can be shared across test classes. * * @author Chris Bono + * @author Gary Russell */ -public final class RabbitTestContainer { +@Testcontainers(disabledWithoutDocker = true) +public interface RabbitTestContainer { - private static final RabbitMQContainer RABBITMQ; - - static { - String image = "rabbitmq:management"; - String cache = System.getenv().get("IMAGE_CACHE"); - if (cache != null) { - image = cache + image; - } - RABBITMQ = new RabbitMQContainer(image) + RabbitMQContainer RABBITMQ = new RabbitMQContainer("rabbitmq:management") .withExposedPorts(5672, 15672, 5552) .withEnv("RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS", "-rabbitmq_stream advertised_host localhost") .withPluginsEnabled("rabbitmq_stream") .withStartupTimeout(Duration.ofMinutes(2)); + + @BeforeAll + static void startContainer() { RABBITMQ.start(); } - private RabbitTestContainer() { + static int amqpPort() { + return RABBITMQ.getMappedPort(5672); + } + + static int managementPort() { + return RABBITMQ.getMappedPort(15672); } - /** - * Should be called early by test that wants to ensure a shared {@link RabbitMQContainer} is up and running. - */ - public static RabbitMQContainer sharedInstance() { - return RABBITMQ; + static int streamPort() { + return RABBITMQ.getMappedPort(5552); } } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index ddd53a104ab..1d6e1b04630 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -1,5 +1,5 @@ [[amqp]] -== AMQP Support +== AMQP (RabbitMQ) Support Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP). @@ -29,6 +29,8 @@ The following adapters are available: * <> * <> * <> +* <> +* <> Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues. @@ -1417,3 +1419,53 @@ In return, that message is retrieved by Spring Integration and printed to the co The following image illustrates the basic set of Spring Integration components used in this sample. .The Spring Integration graph of the AMQP sample image::images/spring-integration-amqp-sample-graph.png[] + +[[rmq-streams]] +=== RabbitMQ Stream Queue Support + +Version 6.0 introduced support for RabbitMQ Stream Queues. + +The DSL factory class for these endpoints is `Rabbit`. + +[[rmq-stream-inbound-channel-adapter]] +==== RabbitMQ Stream Inbound Channel Adapter + +==== +[source, java] +---- +@Bean +IntegrationFlow flow(Environment env) { + @Bean + IntegrationFlow simpleStream(Environment env) { + return IntegrationFlow.from(RabbitStream.inboundAdapter(env) + .configureContainer(container -> container.queueName("my.stream"))) + // ... + .get(); + } + + @Bean + IntegrationFlow superStream(Environment env) { + return IntegrationFlow.from(RabbitStream.inboundAdapter(env) + .configureContainer(container -> container.superStream("my.stream", "my.consumer"))) + // ... + .get(); + } +} +---- +==== + +[[rmq-stream-inbound-channel-adapter]] +==== RabbitMQ Stream Outbound Channel Adapter + +==== +[source, java] +---- +@Bean +IntegrationFlow outbound(RabbitStreamTemplate template) { + return f -> f + // ... + .handle(RabbitStream.outboundStreamAdapter(template)); + +} +---- +==== diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index e7f3db9f73a..d899849aa5c 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -104,3 +104,8 @@ See <<./jdbc.adoc#jdbc-lock-registry,JDBC Lock Registry>> for more information. The `lookupHost` property of the `AbstractConnectionFactory` and `DatagramPacketMessageMapper` is now set to `false` by default to avoid delays in the environments where DNS is not configured. See <<./ip.adoc#ip,TCP and UDP Support>> for more information. + +=== RabbitMQ Changes + +The AMQP module has been enhanced to provide support for inbound and outbound channel adapters using RabbitMQ Stream Queues. +See <<./amqp.adocrmq-streams>> for more information. From 22bcc3b82f9c0b058478d867fe4532c86c32d3b5 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 13:29:02 -0400 Subject: [PATCH 4/5] Fix anchors in doc. --- src/reference/asciidoc/amqp.adoc | 2 +- src/reference/asciidoc/whats-new.adoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 1d6e1b04630..21a67fd485b 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -1454,7 +1454,7 @@ IntegrationFlow flow(Environment env) { ---- ==== -[[rmq-stream-inbound-channel-adapter]] +[[rmq-stream-outbound-channel-adapter]] ==== RabbitMQ Stream Outbound Channel Adapter ==== diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index d899849aa5c..8616e99aa4d 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -108,4 +108,4 @@ See <<./ip.adoc#ip,TCP and UDP Support>> for more information. === RabbitMQ Changes The AMQP module has been enhanced to provide support for inbound and outbound channel adapters using RabbitMQ Stream Queues. -See <<./amqp.adocrmq-streams>> for more information. +See <<./amqp.adoc#rmq-streams,RabbitMQ Stream Queue Support>> for more information. From ca339b068934c17999474b5c872921221ba2702e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 16:37:27 -0400 Subject: [PATCH 5/5] Don't Extend AbstractMessageProducingHandler. --- .../amqp/outbound/RabbitStreamMessageHandler.java | 7 +++++-- src/reference/asciidoc/whats-new.adoc | 10 +++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java index ec74dcc842d..5abe2c38624 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java @@ -27,7 +27,8 @@ import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.amqp.support.MappingUtils; -import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.integration.core.MessagingTemplate; +import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -45,12 +46,14 @@ * @since 6.0 * */ -public class RabbitStreamMessageHandler extends AbstractMessageProducingHandler { +public class RabbitStreamMessageHandler extends AbstractMessageHandler { private static final int DEFAULT_CONFIRM_TIMEOUT = 10_000; private final RabbitStreamOperations streamOperations; + private final MessagingTemplate messagingTemplate = new MessagingTemplate(); + private boolean sync; private long confirmTimeout = DEFAULT_CONFIRM_TIMEOUT; diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 8616e99aa4d..18ddc403b43 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -44,6 +44,11 @@ A `PostgresSubscribableChannel` allows to receive push notifications via `Postgr See <<./jdbc.adoc#postgresql-push,PostgreSQL: Receiving Push Notifications>> for more information. +[[x6.0-rmq]] +==== RabbitMQ Stream Queue Support + +The AMQP module has been enhanced to provide support for inbound and outbound channel adapters using RabbitMQ Stream Queues. +See <<./amqp.adoc#rmq-streams,RabbitMQ Stream Queue Support>> for more information. [[x6.0-general]] === General Changes @@ -104,8 +109,3 @@ See <<./jdbc.adoc#jdbc-lock-registry,JDBC Lock Registry>> for more information. The `lookupHost` property of the `AbstractConnectionFactory` and `DatagramPacketMessageMapper` is now set to `false` by default to avoid delays in the environments where DNS is not configured. See <<./ip.adoc#ip,TCP and UDP Support>> for more information. - -=== RabbitMQ Changes - -The AMQP module has been enhanced to provide support for inbound and outbound channel adapters using RabbitMQ Stream Queues. -See <<./amqp.adoc#rmq-streams,RabbitMQ Stream Queue Support>> for more information.