Skip to content

Improve Java DSL for Rabbit Streams #8598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,16 @@
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Environment;

import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;

/**
* Factory class for RabbitMQ components.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 6.0
*
*/
Expand All @@ -37,8 +40,8 @@ private RabbitStream() {
/**
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
* with the provided {@link StreamListenerContainer}.
* Note: only endpoint options are available from spec.
* The {@code listenerContainer} options should be specified
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
* or the {@code listenerContainer} options should be specified
* on the provided {@link StreamListenerContainer} using
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
* @param listenerContainer the listenerContainer.
Expand All @@ -51,32 +54,43 @@ public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(StreamListene
/**
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
* with the provided {@link Environment}.
* Note: only endpoint options are available from spec.
* The {@code listenerContainer} options should be specified
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
* or the {@code listenerContainer} options should be specified
* on the provided {@link StreamListenerContainer} using
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
* @param environment the environment.
* @return the RabbitInboundChannelAdapterSLCSpec.
*/
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment) {
return new RabbitStreamInboundChannelAdapterSpec(environment, null);
return inboundAdapter(environment, null);
}

/**
* Create an initial {@link RabbitStreamInboundChannelAdapterSpec}
* with the provided {@link Environment}.
* Note: only endpoint options are available from spec.
* The {@code listenerContainer} options should be specified
* The {@code streamName} or {@code superStream} must be provided after creation of this spec;
* or the {@code listenerContainer} options should be specified
* on the provided {@link StreamListenerContainer} using
* {@link RabbitStreamInboundChannelAdapterSpec#configureContainer(java.util.function.Consumer)}.
* @param environment the environment.
* @param codec the codec.
* @return the RabbitInboundChannelAdapterSLCSpec.
*/
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, Codec codec) {
public static RabbitStreamInboundChannelAdapterSpec inboundAdapter(Environment environment, @Nullable Codec codec) {
return new RabbitStreamInboundChannelAdapterSpec(environment, codec);
}

/**
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
* @param environment the environment.
* @param streamName the name of stream to produce.
* @return the RabbitStreamMessageHandlerSpec.
* @since 6.1
*/
public static RabbitStreamMessageHandlerSpec outboundStreamAdapter(Environment environment, String streamName) {
return outboundStreamAdapter(new RabbitStreamTemplate(environment, streamName));
}

/**
* Create an initial {@link RabbitStreamMessageHandlerSpec} (adapter).
* @param template the amqpTemplate.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,42 @@ protected RabbitStreamInboundChannelAdapterSpec(Environment environment, @Nullab
super(new RabbitStreamMessageListenerContainerSpec(environment, codec));
}

/**
* Configure a name for Rabbit stream to consume from.
* @param streamName the name of Rabbit stream.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec streamName(String streamName) {
this.listenerContainerSpec.queueName(streamName);
return this;
}

/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName) {
return superName(superStream, consumerName, 1);
}

/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @param consumers the number of consumers.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName, int consumers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs say this is superStream (which I think is better).

Suggested change
/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName) {
return superName(superStream, consumerName, 1);
}
/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @param consumers the number of consumers.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superName(String superStream, String consumerName, int consumers) {
/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName) {
return superStream(superStream, consumerName, 1);
}
/**
* Configure a name for Rabbit super stream to consume from.
* @param superStream the name of Rabbit super stream.
* @param consumerName the logical name to enable offset tracking.
* @param consumers the number of consumers.
* @return the spec
* @since 6.1
*/
public RabbitStreamInboundChannelAdapterSpec superStream(String superStream, String consumerName, int consumers) {

((RabbitStreamMessageListenerContainerSpec) this.listenerContainerSpec)
.superStream(superStream, consumerName, consumers);
return this;
}

public RabbitStreamInboundChannelAdapterSpec configureContainer(
Consumer<RabbitStreamMessageListenerContainerSpec> configurer) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,20 @@ public class RabbitStreamMessageListenerContainerSpec extends
* @return this spec.
*/
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name) {
this.target.superStream(superStream, name);
return superStream(superStream, name, 1);
}

/**
* Enable Single Active Consumer on a Super Stream.
* Mutually exclusive with {@link #queueName(String...)}.
* @param superStream the stream.
* @param name the consumer name.
* @param consumers the number of consumers.
* @return this spec.
* @since 6.1
*/
public RabbitStreamMessageListenerContainerSpec superStream(String superStream, String name, int consumers) {
this.target.superStream(superStream, name, consumers);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.amqp.dsl;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.support.RabbitTestContainer;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.rabbit.stream.config.SuperStream;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Artem Bilan
*
* @since 6.1
*/
@SpringJUnitConfig
@DirtiesContext
public class RabbitStreamTests implements RabbitTestContainer {

@Autowired
MessageChannel sendToRabbitStreamChannel;

@Autowired
PollableChannel results;

@Test
void rabbitStreamWithSpringIntegrationChannelAdapters() {
var testData = "test data";
this.sendToRabbitStreamChannel.send(new GenericMessage<>(testData));

Message<?> receive = results.receive(10_000);

assertThat(receive).isNotNull()
.extracting(Message::getPayload)
.isEqualTo(testData);
}

@Autowired
MessageChannel sendToRabbitSuperStreamChannel;

@Test
void superStreamWithSpringIntegrationChannelAdapters() {
var testData = "test super data";
this.sendToRabbitSuperStreamChannel.send(new GenericMessage<>(testData));

Message<?> receive = results.receive(10_000);

assertThat(receive).isNotNull()
.extracting(Message::getPayload)
.isEqualTo(testData);
}

@Configuration
@EnableIntegration
public static class ContextConfiguration {

@Bean
ConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(RabbitTestContainer.amqpPort());
}

@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
return new RabbitTemplate(rabbitConnectionFactory);
}

@Bean
Environment rabbitStreamEnvironment() {
return Environment.builder()
.addressResolver(add -> new Address("localhost", RabbitTestContainer.streamPort()))
.build();
}

@Bean(initMethod = "initialize")
RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory) {
return new RabbitAdmin(rabbitConnectionFactory);
}

@Bean
Queue stream() {
return QueueBuilder.durable("test.stream1")
.stream()
.build();
}

@Bean
@ServiceActivator(inputChannel = "sendToRabbitStreamChannel")
RabbitStreamMessageHandlerSpec rabbitStreamMessageHandler(Environment env, Queue stream) {
return RabbitStream.outboundStreamAdapter(env, stream.getName()).sync(true);
}

@Bean
IntegrationFlow rabbitStreamConsumer(Environment env, Queue stream) {
return IntegrationFlow.from(
RabbitStream.inboundAdapter(env)
.streamName(stream.getName()))
.channel("results")
.get();
}

@Bean
QueueChannel results() {
return new QueueChannel();
}

@Bean
SuperStream superStream() {
return new SuperStream("test.superStream1", 3);
}

@Bean
@ServiceActivator(inputChannel = "sendToRabbitSuperStreamChannel")
AmqpOutboundChannelAdapterSpec rabbitSuperStreamMessageHandler(RabbitTemplate rabbitTemplate) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("test.superStream1")
.routingKey("1");
}

@Bean
IntegrationFlow superStreamConsumer(Environment env) {
return IntegrationFlow.from(
RabbitStream.inboundAdapter(env)
.superName("test.superStream1", "mySuperConsumer"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.superName("test.superStream1", "mySuperConsumer"))
.superStream("test.superStream1", "mySuperConsumer"))

.channel("results")
.get();
}

}

}
33 changes: 14 additions & 19 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ image::images/spring-integration-amqp-sample-graph.png[align="center"]

Version 6.0 introduced support for RabbitMQ Stream Queues.

The DSL factory class for these endpoints is `Rabbit`.
The DSL factory class for these endpoints is a `RabbitStream`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The DSL factory class for these endpoints is a `RabbitStream`.
The DSL factory class for these endpoints is `RabbitStream`.


[[rmq-stream-inbound-channel-adapter]]
==== RabbitMQ Stream Inbound Channel Adapter
Expand All @@ -1435,22 +1435,17 @@ The DSL factory class for these endpoints is `Rabbit`.
[source, java]
----
@Bean
IntegrationFlow flow(Environment env) {
@Bean
IntegrationFlow simpleStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
.configureContainer(container -> container.queueName("my.stream")))
// ...
.get();
}

@Bean
IntegrationFlow superStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
// ...
.get();
}
IntegrationFlow simpleStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).streamName("my.stream"))
// ...
.get();
}

@Bean
IntegrationFlow superStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).superStream("my.super.stream", "my.consumer"))
// ...
.get();
}
----
====
Expand All @@ -1462,10 +1457,10 @@ IntegrationFlow flow(Environment env) {
[source, java]
----
@Bean
IntegrationFlow outbound(RabbitStreamTemplate template) {
IntegrationFlow outbound(Environment env) {
return f -> f
// ...
.handle(RabbitStream.outboundStreamAdapter(template));
.handle(RabbitStream.outboundStreamAdapter(env, "my.stream"));

}
----
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more inform

The `FileReadingMessageSource` now exposes `watchMaxDepth` and `watchDirPredicate` options for the `WatchService`.
See <<./file.adoc#watch-service-directory-scanner, `WatchServiceDirectoryScanner`>> for more information.

[[x6.1-amqp]]
=== AMQP Changes

The Java DSL API for Rabbit Streams (the `RabbitStream` factory) exposes additional properties for simple configurations.
See <<./amqp.adoc#rmq-streams, `RabbitMQ Stream Queue Support`>> for more information.