diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java index 338427c11be..c1413574d34 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.Environment; +import org.springframework.lang.Nullable; import org.springframework.rabbit.stream.listener.StreamListenerContainer; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; @@ -26,6 +27,8 @@ * Factory class for RabbitMQ components. * * @author Gary Russell + * @author Artem Bilan + * * @since 6.0 * */ @@ -37,8 +40,8 @@ private RabbitStream() { /** * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} * with the provided {@link StreamListenerContainer}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified + * The {@code streamName} or {@code superStream} must be provided after creation of this spec; + * or the {@code listenerContainer} options should be specified * on the provided {@link StreamListenerContainer} using * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. * @param listenerContainer the listenerContainer. @@ -51,32 +54,43 @@ public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListene /** * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} * with the provided {@link Environment}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified + * The {@code streamName} or {@code superStream} must be provided after creation of this spec; + * or the {@code listenerContainer} options should be specified * on the provided {@link StreamListenerContainer} using * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. * @param environment the environment. * @return the RabbitInboundChannelAdapterSLCSpec. */ public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) { - return new RabbitStreamInboundChannelAdapterSpec(environment, null); + return inboundAdapter(environment, null); } /** * Create an initial {@link RabbitStreamInboundChannelAdapterSpec} * with the provided {@link Environment}. - * Note: only endpoint options are available from spec. - * The {@code listenerContainer} options should be specified + * The {@code streamName} or {@code superStream} must be provided after creation of this spec; + * or the {@code listenerContainer} options should be specified * on the provided {@link StreamListenerContainer} using * {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}. * @param environment the environment. * @param codec the codec. * @return the RabbitInboundChannelAdapterSLCSpec. */ - public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) { + public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, @Nullable Codec codec) { return new RabbitStreamInboundChannelAdapterSpec(environment, codec); } + /** + * Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter). + * @param environment the environment. + * @param streamName the name of stream to produce. + * @return the RabbitStreamMessageHandlerSpec. + * @since 6.1 + */ + public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(Environment environment, String streamName) { + return outboundStreamAdapter(new RabbitStreamTemplate(environment, streamName)); + } + /** * Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter). * @param template the amqpTemplate. diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java index 18f4106fe54..525474b95b4 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamInboundChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,42 @@ protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullab super(new RabbitStreamMessageListenerContainerSpec(environment, codec)); } + /** + * Configure a name for Rabbit stream to consume from. + * @param streamName the name of Rabbit stream. + * @return the spec + * @since 6.1 + */ + public RabbitStreamInboundChannelAdapterSpec streamName(String streamName) { + this.listenerContainerSpec.queueName(streamName); + return this; + } + + /** + * Configure a name for Rabbit super stream to consume from. + * @param superStream the name of Rabbit super stream. + * @param consumerName the logical name to enable offset tracking. + * @return the spec + * @since 6.1 + */ + public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName) { + return superStream(superStream, consumerName, 1); + } + + /** + * Configure a name for Rabbit super stream to consume from. + * @param superStream the name of Rabbit super stream. + * @param consumerName the logical name to enable offset tracking. + * @param consumers the number of consumers. + * @return the spec + * @since 6.1 + */ + public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName, int consumers) { + ((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec) + .superStream(superStream, consumerName, consumers); + return this; + } + public RabbitStreamInboundChannelAdapterSpec configureContainer( Consumer configurer) { diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java index 06e44b11c57..0b55bd46b47 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageListenerContainerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,20 @@ public class RabbitStreamMessageListenerContainerSpec extends * @return this spec. */ public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name) { - this.target.superStream(superStream, name); + return superStream(superStream, name, 1); + } + + /** + * Enable Single Active Consumer on a Super Stream. + * Mutually exclusive with {@link #queueName(String...)}. + * @param superStream the stream. + * @param name the consumer name. + * @param consumers the number of consumers. + * @return this spec. + * @since 6.1 + */ + public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name, int consumers) { + this.target.superStream(superStream, name, consumers); return this; } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/RabbitStreamTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/RabbitStreamTests.java new file mode 100644 index 00000000000..9674d8e9d4a --- /dev/null +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/RabbitStreamTests.java @@ -0,0 +1,166 @@ +/* + * Copyright 2014-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.dsl; + +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.Environment; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.amqp.support.RabbitTestContainer; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.rabbit.stream.config.SuperStream; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 6.1 + */ +@SpringJUnitConfig +@DirtiesContext +public class RabbitStreamTests implements RabbitTestContainer { + + @Autowired + MessageChannel sendToRabbitStreamChannel; + + @Autowired + PollableChannel results; + + @Test + void rabbitStreamWithSpringIntegrationChannelAdapters() { + var testData = "test data"; + this.sendToRabbitStreamChannel.send(new GenericMessage<>(testData)); + + Message receive = results.receive(10_000); + + assertThat(receive).isNotNull() + .extracting(Message::getPayload) + .isEqualTo(testData); + } + + @Autowired + MessageChannel sendToRabbitSuperStreamChannel; + + @Test + void superStreamWithSpringIntegrationChannelAdapters() { + var testData = "test super data"; + this.sendToRabbitSuperStreamChannel.send(new GenericMessage<>(testData)); + + Message receive = results.receive(10_000); + + assertThat(receive).isNotNull() + .extracting(Message::getPayload) + .isEqualTo(testData); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + @Bean + ConnectionFactory rabbitConnectionFactory() { + return new CachingConnectionFactory(RabbitTestContainer.amqpPort()); + } + + @Bean + RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) { + return new RabbitTemplate(rabbitConnectionFactory); + } + + @Bean + Environment rabbitStreamEnvironment() { + return Environment.builder() + .addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort())) + .build(); + } + + @Bean(initMethod = "initialize") + RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory) { + return new RabbitAdmin(rabbitConnectionFactory); + } + + @Bean + Queue stream() { + return QueueBuilder.durable("test.stream1") + .stream() + .build(); + } + + @Bean + @ServiceActivator(inputChannel = "sendToRabbitStreamChannel") + RabbitStreamMessageHandlerSpec rabbitStreamMessageHandler(Environment env, Queue stream) { + return RabbitStream.outboundStreamAdapter(env, stream.getName()).sync(true); + } + + @Bean + IntegrationFlow rabbitStreamConsumer(Environment env, Queue stream) { + return IntegrationFlow.from( + RabbitStream.inboundAdapter(env) + .streamName(stream.getName())) + .channel("results") + .get(); + } + + @Bean + QueueChannel results() { + return new QueueChannel(); + } + + @Bean + SuperStream superStream() { + return new SuperStream("test.superStream1", 3); + } + + @Bean + @ServiceActivator(inputChannel = "sendToRabbitSuperStreamChannel") + AmqpOutboundChannelAdapterSpec rabbitSuperStreamMessageHandler(RabbitTemplate rabbitTemplate) { + return Amqp.outboundAdapter(rabbitTemplate) + .exchangeName("test.superStream1") + .routingKey("1"); + } + + @Bean + IntegrationFlow superStreamConsumer(Environment env) { + return IntegrationFlow.from( + RabbitStream.inboundAdapter(env) + .superStream("test.superStream1", "mySuperConsumer")) + .channel("results") + .get(); + } + + } + +} diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 06cffb56510..3307953fda7 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -1426,7 +1426,7 @@ image::images/spring-integration-amqp-sample-graph.png[align="center"] Version 6.0 introduced support for RabbitMQ Stream Queues. -The DSL factory class for these endpoints is `Rabbit`. +The DSL factory class for these endpoints is `RabbitStream`. [[rmq-stream-inbound-channel-adapter]] ==== RabbitMQ Stream Inbound Channel Adapter @@ -1435,22 +1435,17 @@ The DSL factory class for these endpoints is `Rabbit`. [source, java] ---- @Bean -IntegrationFlow flow(Environment env) { - @Bean - IntegrationFlow simpleStream(Environment env) { - return IntegrationFlow.from(RabbitStream.inboundAdapter(env) - .configureContainer(container -> container.queueName("my.stream"))) - // ... - .get(); - } - - @Bean - IntegrationFlow superStream(Environment env) { - return IntegrationFlow.from(RabbitStream.inboundAdapter(env) - .configureContainer(container -> container.superStream("my.stream", "my.consumer"))) - // ... - .get(); - } +IntegrationFlow simpleStream(Environment env) { + return IntegrationFlow.from(RabbitStream.inboundAdapter(env).streamName("my.stream")) + // ... + .get(); +} + +@Bean +IntegrationFlow superStream(Environment env) { + return IntegrationFlow.from(RabbitStream.inboundAdapter(env).superStream("my.super.stream", "my.consumer")) + // ... + .get(); } ---- ==== @@ -1462,10 +1457,10 @@ IntegrationFlow flow(Environment env) { [source, java] ---- @Bean -IntegrationFlow outbound(RabbitStreamTemplate template) { +IntegrationFlow outbound(Environment env) { return f -> f // ... - .handle(RabbitStream.outboundStreamAdapter(template)); + .handle(RabbitStream.outboundStreamAdapter(env, "my.stream")); } ---- diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 5e24137617f..c5b60ae53fb 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -69,3 +69,9 @@ See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more inform The `FileReadingMessageSource` now exposes `watchMaxDepth` and `watchDirPredicate` options for the `WatchService`. See <<./file.adoc#watch-service-directory-scanner, `WatchServiceDirectoryScanner`>> for more information. + +[[x6.1-amqp]] +=== AMQP Changes + +The Java DSL API for Rabbit Streams (the `RabbitStream` factory) exposes additional properties for simple configurations. +See <<./amqp.adoc#rmq-streams, `RabbitMQ Stream Queue Support`>> for more information.