diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java index fb437a52f9a..4550cdb1195 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,8 +108,7 @@ public BoundRabbitChannelAdvice advice(RabbitTemplate template) { @Bean public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice advice) { return IntegrationFlow.from(Gate.class) - .split(s -> s.delimiters(",") - .advice(advice)) + .splitWith(s -> s.delimiters(",").advice(advice)) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey(QUEUE)) .get(); diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java index 0884e2141c9..d179dff3716 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java @@ -118,7 +118,7 @@ public RabbitTemplate template() throws Exception { @Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gate.class) - .split(s -> s.delimiters(",") + .splitWith(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/JavaUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/JavaUtils.java index 4c894e145fd..e39bc6aa63c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/JavaUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/JavaUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,7 +80,7 @@ public JavaUtils acceptIfNotNull(@Nullable T value, Consumer consumer) { * @return this. * @since 5.2 */ - public JavaUtils acceptIfHasText(String value, Consumer consumer) { + public JavaUtils acceptIfHasText(@Nullable String value, Consumer consumer) { if (StringUtils.hasText(value)) { consumer.accept(value); } @@ -95,7 +95,7 @@ public JavaUtils acceptIfHasText(String value, Consumer consumer) { * @return this. * @since 5.2 */ - public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) { + public JavaUtils acceptIfNotEmpty(@Nullable List value, Consumer> consumer) { if (!CollectionUtils.isEmpty(value)) { consumer.accept(value); } @@ -110,7 +110,7 @@ public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) * @return this. * @since 5.2 */ - public JavaUtils acceptIfNotEmpty(T[] value, Consumer consumer) { + public JavaUtils acceptIfNotEmpty(@Nullable T[] value, Consumer consumer) { if (!ObjectUtils.isEmpty(value)) { consumer.accept(value); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java index 6b1fdc061c1..3003841eae4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import reactor.util.function.Tuple2; -import org.springframework.core.Ordered; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.aggregator.BarrierMessageHandler; import org.springframework.integration.aggregator.CorrelationStrategy; @@ -44,14 +43,6 @@ public class BarrierSpec extends ConsumerEndpointSpec doGet() { this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy); - if (!this.adviceChain.isEmpty()) { - this.handler.setAdviceChain(this.adviceChain); - } - this.handler.setRequiresReply(this.requiresReply); - this.handler.setSendTimeout(this.sendTimeout); - this.handler.setAsync(this.async); - this.handler.setOrder(this.order); return super.doGet(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 9447f575928..c2b8ea86265 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -1405,7 +1405,7 @@ public B enrichHeaders(Consumer headerEnricherConfigurer) { * @return the current {@link BaseIntegrationFlowDefinition}. */ public B split() { - return split((Consumer>) null); + return splitWith((splitterSpec) -> { }); } /** @@ -1420,21 +1420,47 @@ public B split() { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options * and for {@link DefaultMessageSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(@Nullable Consumer> endpointConfigurer) { return split(new DefaultMessageSplitter(), endpointConfigurer); } + + /** + * Populate the splitter with provided options to the current integration flow position: + *
+	 * {@code
+	 *  .splitWith(s -> s.applySequence(false).delimiters(","))
+	 * }
+	 * 
+ * or with the refenrence to POJO service method call: + *
+	 * {@code
+	 *  .splitWith(s -> s.ref("someService").method("someMethod"))
+	 * }
+	 * 
+ * @param splitterConfigurer the {@link Consumer} to provide options splitter endpoint. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.2 + * @see SplitterSpec + */ + public B splitWith(Consumer splitterConfigurer) { + return register(new SplitterSpec(), splitterConfigurer); + } + /** * Populate the {@link ExpressionEvaluatingSplitter} with provided SpEL expression. * @param expression the splitter SpEL expression. * and for {@link ExpressionEvaluatingSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. - * @see SplitterEndpointSpec + * @see SplitterSpec */ public B split(String expression) { - return split(expression, (Consumer>) null); + return splitWith((splitterSpec) -> splitterSpec.expression(expression)); } /** @@ -1443,8 +1469,11 @@ public B split(String expression) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options * and for {@link ExpressionEvaluatingSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(String expression, @Nullable Consumer> endpointConfigurer) { @@ -1472,7 +1501,7 @@ public B split(Object service) { * @see MethodInvokingSplitter */ public B split(Object service, @Nullable String methodName) { - return split(service, methodName, null); + return splitWith((splitterSpec) -> splitterSpec.ref(service).method(methodName)); } /** @@ -1484,9 +1513,12 @@ public B split(Object service, @Nullable String methodName) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options * and for {@link MethodInvokingSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec * @see MethodInvokingSplitter */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(Object service, @Nullable String methodName, @Nullable Consumer> endpointConfigurer) { @@ -1508,7 +1540,7 @@ public B split(Object service, @Nullable String methodName, * @return the current {@link BaseIntegrationFlowDefinition}. */ public B split(String beanName, @Nullable String methodName) { - return split(beanName, methodName, null); + return splitWith((splitterSpec) -> splitterSpec.refName(beanName).method(methodName)); } /** @@ -1520,8 +1552,11 @@ public B split(String beanName, @Nullable String methodName) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options * and for {@link MethodInvokingSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(String beanName, @Nullable String methodName, @Nullable Consumer> endpointConfigurer) { @@ -1540,10 +1575,10 @@ public B split(String beanName, @Nullable String methodName, * * @param messageProcessorSpec the splitter {@link MessageProcessorSpec}. * @return the current {@link BaseIntegrationFlowDefinition}. - * @see SplitterEndpointSpec + * @see SplitterSpec */ public B split(MessageProcessorSpec messageProcessorSpec) { - return split(messageProcessorSpec, (Consumer>) null); + return splitWith((splitterSpec) -> splitterSpec.ref(messageProcessorSpec)); } /** @@ -1561,8 +1596,11 @@ public B split(MessageProcessorSpec messageProcessorSpec) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options * and for {@link MethodInvokingSplitter}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(MessageProcessorSpec messageProcessorSpec, @Nullable Consumer> endpointConfigurer) { @@ -1595,7 +1633,7 @@ public B split(MessageProcessorSpec messageProcessorSpec, * @see LambdaMessageProcessor */ public

B split(Class

expectedType, Function splitter) { - return split(expectedType, splitter, null); + return splitWith((splitterSpec) -> splitterSpec.function(splitter).expectedType(expectedType)); } /** @@ -1621,9 +1659,12 @@ public

B split(Class

expectedType, Function splitter) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param

the payload type or {@code Message.class}. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see LambdaMessageProcessor * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public

B split(@Nullable Class

expectedType, Function splitter, @Nullable Consumer> endpointConfigurer) { @@ -1640,10 +1681,10 @@ public

B split(@Nullable Class

expectedType, Function splitter, * @param splitterMessageHandlerSpec the {@link MessageHandlerSpec} to populate. * @param the {@link AbstractMessageSplitter} * @return the current {@link BaseIntegrationFlowDefinition}. - * @see SplitterEndpointSpec + * @see SplitterSpec */ public B split(MessageHandlerSpec splitterMessageHandlerSpec) { - return split(splitterMessageHandlerSpec, (Consumer>) null); + return splitWith((splitterSpec) -> splitterSpec.ref(splitterMessageHandlerSpec)); } /** @@ -1653,8 +1694,11 @@ public B split(MessageHandlerSpec spli * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param the {@link AbstractMessageSplitter} * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(MessageHandlerSpec splitterMessageHandlerSpec, @Nullable Consumer> endpointConfigurer) { @@ -1667,10 +1711,10 @@ public B split(MessageHandlerSpec spli * flow position. * @param splitter the {@link AbstractMessageSplitter} to populate. * @return the current {@link BaseIntegrationFlowDefinition}. - * @see SplitterEndpointSpec + * @see SplitterSpec */ public B split(AbstractMessageSplitter splitter) { - return split(splitter, (Consumer>) null); + return splitWith((splitterSpec) -> splitterSpec.ref(splitter)); } /** @@ -1680,8 +1724,11 @@ public B split(AbstractMessageSplitter splitter) { * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param the {@link AbstractMessageSplitter} * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public B split(S splitter, @Nullable Consumer> endpointConfigurer) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java index 61c4cecf499..c5942934c4e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java @@ -29,13 +29,13 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import org.springframework.integration.JavaUtils; import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.AbstractMessageProducingHandler; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.advice.HandleMessageAdviceAdapter; import org.springframework.integration.handler.advice.ReactiveRequestHandlerAdvice; -import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.transaction.TransactionInterceptorBuilder; import org.springframework.lang.Nullable; @@ -62,6 +62,21 @@ public abstract class ConsumerEndpointSpec, protected final List adviceChain = new LinkedList<>(); // NOSONAR final + @Nullable + private Boolean requiresReply; + + @Nullable + private Long sendTimeout; + + @Nullable + private Integer order; + + @Nullable + private Boolean async; + + @Nullable + private String[] notPropagatedHeaders; + protected ConsumerEndpointSpec(@Nullable H messageHandler) { super(messageHandler, new ConsumerEndpointFactoryBean()); } @@ -242,12 +257,16 @@ public S customizeMonoReply(BiFunction, Mono, Publisher> * @see AbstractReplyProducingMessageHandler#setRequiresReply(boolean) */ public S requiresReply(boolean requiresReply) { - assertHandler(); - if (this.handler instanceof AbstractReplyProducingMessageHandler) { - ((AbstractReplyProducingMessageHandler) this.handler).setRequiresReply(requiresReply); + if (this.handler != null) { + if (this.handler instanceof AbstractReplyProducingMessageHandler producingHandler) { + producingHandler.setRequiresReply(requiresReply); + } + else { + this.logger.warn("'requiresReply' can be applied only for AbstractReplyProducingMessageHandler"); + } } else { - this.logger.warn("'requiresReply' can be applied only for AbstractReplyProducingMessageHandler"); + this.requiresReply = requiresReply; } return _this(); } @@ -258,16 +277,16 @@ public S requiresReply(boolean requiresReply) { * @see AbstractMessageProducingHandler#setSendTimeout(long) */ public S sendTimeout(long sendTimeout) { - assertHandler(); - if (this.handler instanceof AbstractMessageProducingHandler) { - ((AbstractMessageProducingHandler) this.handler).setSendTimeout(sendTimeout); - } - else if (this.handler instanceof AbstractMessageRouter) { - // This should probably go on the RouterSpec, but we put it here for consistency - ((AbstractMessageRouter) this.handler).setSendTimeout(sendTimeout); + if (this.handler != null) { + if (this.handler instanceof AbstractMessageProducingHandler producingHandler) { + producingHandler.setSendTimeout(sendTimeout); + } + else { + this.logger.warn("'sendTimeout' can be applied only for AbstractMessageProducingHandler"); + } } else { - this.logger.warn("'sendTimeout' can be applied only for AbstractMessageProducingHandler"); + this.sendTimeout = sendTimeout; } return _this(); } @@ -278,33 +297,41 @@ else if (this.handler instanceof AbstractMessageRouter) { * @see AbstractMessageHandler#setOrder(int) */ public S order(int order) { - assertHandler(); - if (this.handler instanceof AbstractMessageHandler) { - ((AbstractMessageHandler) this.handler).setOrder(order); + if (this.handler != null) { + if (this.handler instanceof AbstractMessageHandler abstractMessageHandler) { + abstractMessageHandler.setOrder(order); + } + else { + this.logger.warn("'order' can be applied only for AbstractMessageHandler"); + } } else { - this.logger.warn("'order' can be applied only for AbstractMessageHandler"); + this.order = order; } return _this(); } /** * Allow async replies. If the handler reply is a - * {@code org.springframework.util.concurrent.ListenableFuture}, send the output when + * {@link java.util.concurrent.CompletableFuture}, send the output when * it is satisfied rather than sending the future as the result. Ignored for handler * return types other than - * {@link org.springframework.util.concurrent.ListenableFuture}. + * {@link java.util.concurrent.CompletableFuture}. * @param async true to allow. * @return the endpoint spec. * @see AbstractMessageProducingHandler#setAsync(boolean) */ public S async(boolean async) { - assertHandler(); - if (this.handler instanceof AbstractMessageProducingHandler) { - ((AbstractMessageProducingHandler) this.handler).setAsync(async); + if (this.handler != null) { + if (this.handler instanceof AbstractMessageProducingHandler producingHandler) { + producingHandler.setAsync(async); + } + else { + this.logger.warn("'async' can be applied only for AbstractMessageProducingHandler"); + } } else { - this.logger.warn("'async' can be applied only for AbstractMessageProducingHandler"); + this.async = async; } return _this(); } @@ -318,12 +345,16 @@ public S async(boolean async) { * @see AbstractMessageProducingHandler#setNotPropagatedHeaders(String...) */ public S notPropagatedHeaders(String... headerPatterns) { - assertHandler(); - if (this.handler instanceof AbstractMessageProducingHandler) { - ((AbstractMessageProducingHandler) this.handler).setNotPropagatedHeaders(headerPatterns); + if (this.handler != null) { + if (this.handler instanceof AbstractMessageProducingHandler producingHandler) { + producingHandler.setNotPropagatedHeaders(headerPatterns); + } + else { + this.logger.warn("'headerPatterns' can be applied only for AbstractMessageProducingHandler"); + } } else { - this.logger.warn("'headerPatterns' can be applied only for AbstractMessageProducingHandler"); + this.notPropagatedHeaders = headerPatterns; } return _this(); } @@ -331,9 +362,17 @@ public S notPropagatedHeaders(String... headerPatterns) { @Override protected Tuple2 doGet() { this.endpointFactoryBean.setAdviceChain(this.adviceChain); - if (this.handler instanceof AbstractReplyProducingMessageHandler && !this.adviceChain.isEmpty()) { - ((AbstractReplyProducingMessageHandler) this.handler).setAdviceChain(this.adviceChain); + + if (this.handler instanceof AbstractReplyProducingMessageHandler producingMessageHandler) { + JavaUtils.INSTANCE + .acceptIfNotNull(this.requiresReply, producingMessageHandler::setRequiresReply) + .acceptIfNotNull(this.sendTimeout, producingMessageHandler::setSendTimeout) + .acceptIfNotNull(this.async, producingMessageHandler::setAsync) + .acceptIfNotNull(this.order, producingMessageHandler::setOrder) + .acceptIfNotEmpty(this.notPropagatedHeaders, producingMessageHandler::setNotPropagatedHeaders) + .acceptIfNotEmpty(this.adviceChain, producingMessageHandler::setAdviceChain); } + this.endpointFactoryBean.setHandler(this.handler); return super.doGet(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index 8f3f6e3bfd5..38b09a1b0d1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -203,16 +203,18 @@ public

B handle(GenericHandler

handler, * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @param

the payload type. * @return the current {@link IntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}. * @see org.springframework.integration.handler.LambdaMessageProcessor * @see SplitterEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) + @SuppressWarnings("removal") public

B split(Function splitter, Consumer> endpointConfigurer) { return split(null, splitter, endpointConfigurer); } - /** * Populate the {@link MethodInvokingRouter} for provided {@link Function} * with default options. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java index 8225ff4485c..7d0c66ca7f0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java @@ -57,6 +57,12 @@ protected RouterSpec(R router) { this.mappingProvider = new RouterMappingProvider(this.handler); } + @Override + public RouterSpec sendTimeout(long sendTimeout) { + this.handler.setSendTimeout(sendTimeout); + return this; + } + /** * @param resolutionRequired the resolutionRequired. * @return the router spec. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterEndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterEndpointSpec.java index d26695e7b7c..05bb5efac63 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterEndpointSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterEndpointSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,10 @@ * @author Artem Bilan * * @since 5.0 + * + * @deprecated since 6.2 in favor of {@link SplitterSpec} */ +@Deprecated(since = "6.2", forRemoval = true) public class SplitterEndpointSpec extends ConsumerEndpointSpec, S> { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterSpec.java new file mode 100644 index 00000000000..156ef7698c5 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/SplitterSpec.java @@ -0,0 +1,308 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import reactor.util.function.Tuple2; + +import org.springframework.expression.Expression; +import org.springframework.integration.JavaUtils; +import org.springframework.integration.config.ConsumerEndpointFactoryBean; +import org.springframework.integration.handler.BeanNameMessageProcessor; +import org.springframework.integration.handler.LambdaMessageProcessor; +import org.springframework.integration.handler.MessageProcessor; +import org.springframework.integration.splitter.AbstractMessageSplitter; +import org.springframework.integration.splitter.DefaultMessageSplitter; +import org.springframework.integration.splitter.ExpressionEvaluatingSplitter; +import org.springframework.integration.splitter.MethodInvokingSplitter; +import org.springframework.integration.util.ClassUtils; +import org.springframework.lang.Nullable; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.util.Assert; + +/** + * A {@link ConsumerEndpointSpec} for an {@link AbstractMessageSplitter}. + * + * @author Artem Bilan + * + * @since 6.2 + */ +public class SplitterSpec extends ConsumerEndpointSpec { + + private final AtomicBoolean splitterSet = new AtomicBoolean(); + + private Expression expression; + + private Object ref; + + private String refName; + + @Nullable + private String method; + + private Function function; + + @Nullable + private Class expectedType; + + @Nullable + private String delimiters; + + @Nullable + private String discardChannelName; + + @Nullable + private MessageChannel discardChannel; + + @Nullable + private Boolean applySequence; + + protected SplitterSpec() { + super(null); + } + + /** + * Set delimiters to tokenize String values. The default is + * null indicating that no tokenizing should occur. + * If delimiters are provided, they will be applied to any String payload. + * Only applied if provided {@code splitter} is instance of {@link DefaultMessageSplitter}. + * @param delimiters The delimiters. + * @return the endpoint spec. + * @see DefaultMessageSplitter#setDelimiters(String) + */ + public SplitterSpec delimiters(String delimiters) { + this.delimiters = delimiters; + return this; + } + + /** + * Provide an expression to use an {@link ExpressionEvaluatingSplitter} for the target handler. + * @param expression the SpEL expression to use. + * @return the spec + */ + public SplitterSpec expression(String expression) { + return expression(PARSER.parseExpression(expression)); + } + + /** + * Provide an expression to use an {@link ExpressionEvaluatingSplitter} for the target handler. + * @param expression the SpEL expression to use. + * @return the spec + */ + public SplitterSpec expression(Expression expression) { + assertSplitterSet(); + this.expression = expression; + return this; + } + + /** + * Provide a service to use a {@link MethodInvokingSplitter} for the target handler. + * This option can be set to an {@link AbstractMessageSplitter} implementation, + * a {@link MessageHandlerSpec} providing an {@link AbstractMessageSplitter}, + * or {@link MessageProcessorSpec}. + * @param ref the service to call as a splitter POJO. + * @return the spec + */ + public SplitterSpec ref(Object ref) { + assertSplitterSet(); + this.ref = ref; + return this; + } + + /** + * Provide a bean name to use a {@link MethodInvokingSplitter} + * (based on {@link BeanNameMessageProcessor}) for the target handler. + * @param refName the bean name for service to call as a splitter POJO. + * @return the spec + */ + public SplitterSpec refName(String refName) { + assertSplitterSet(); + this.refName = refName; + return this; + } + + /** + * Provide a service method name to call. Optional. + * Use only together with {@link #ref(Object)} or {@link #refName(String)}. + * @param method the service method name to call. + * @return the spec + */ + public SplitterSpec method(@Nullable String method) { + this.method = method; + return this; + } + + /** + * Provide a {@link Function} as a direct delegate for {@link MethodInvokingSplitter}. + * @param function the {@link Function} instance to use. + * @param

the input type. + * @return the spec + */ + public

SplitterSpec function(Function function) { + assertSplitterSet(); + this.function = function; + return this; + } + + /** + * Set a {@link Function} input argument type. + * Can be a {@link org.springframework.messaging.Message}. + * Ignored for all other options, but {@link #function(Function)}. + * @param expectedType the {@link Function} input argument type. + * @return the spec. + */ + public SplitterSpec expectedType(@Nullable Class expectedType) { + this.expectedType = expectedType; + return this; + } + + /** + * Set the applySequence flag to the specified value. Defaults to {@code true}. + * @param applySequence the applySequence. + * @return the endpoint spec. + * @see AbstractMessageSplitter#setApplySequence(boolean) + */ + public SplitterSpec applySequence(boolean applySequence) { + this.applySequence = applySequence; + return this; + } + + /** + * Specify a channel where rejected Messages should be sent. If the discard + * channel is null (the default), rejected Messages will be dropped. + * A "Rejected Message" means that split function has returned an empty result (but not null): + * no items to iterate for sending. + * @param discardChannel The discard channel. + * @return the endpoint spec. + * @see DefaultMessageSplitter#setDelimiters(String) + */ + public SplitterSpec discardChannel(MessageChannel discardChannel) { + this.discardChannel = discardChannel; + return this; + } + + /** + * Configure a subflow to run for discarded messages instead of a + * {@link #discardChannel(MessageChannel)}. + * @param discardFlow the discard flow. + * @return the endpoint spec. + */ + public SplitterSpec discardFlow(IntegrationFlow discardFlow) { + return discardChannel(obtainInputChannelFromFlow(discardFlow)); + } + + /** + * Specify a channel bean name where rejected Messages should be sent. If the discard + * channel is null (the default), rejected Messages will be dropped. + * A "Rejected Message" means that split function has returned an empty result (but not null): + * no items to iterate for sending. + * @param discardChannelName The discard channel bean name. + * @return the endpoint spec. + * @see DefaultMessageSplitter#setDelimiters(String) + */ + public SplitterSpec discardChannel(String discardChannelName) { + this.discardChannelName = discardChannelName; + return this; + } + + private void assertSplitterSet() { + Assert.isTrue(this.splitterSet.compareAndSet(false, true), this::assertMessage); + } + + private String assertMessage() { + String currentSplitterValue = null; + if (this.expression != null) { + currentSplitterValue = "'expression'=" + this.expression; + } + else if (this.ref != null) { + currentSplitterValue = "'ref'=" + this.ref; + } + else if (this.refName != null) { + currentSplitterValue = "'refName'=" + this.refName; + } + else if (this.function != null) { + currentSplitterValue = "'function'=" + this.function; + } + return "Only one of the 'expression', 'ref', 'refName', 'function' can be set. " + + "Current one is " + currentSplitterValue; + } + + @Override + public Tuple2 doGet() { + AbstractMessageSplitter splitter = new DefaultMessageSplitter(); + if (this.expression != null) { + splitter = new ExpressionEvaluatingSplitter(this.expression); + } + else if (this.ref != null) { + if (this.method != null) { + splitter = new MethodInvokingSplitter(this.ref, this.method); + } + else if (this.ref instanceof MessageProcessorSpec messageProcessorSpec) { + MessageProcessor targetProcessor = messageProcessorSpec.getObject(); + this.componentsToRegister.put(targetProcessor, null); + splitter = new MethodInvokingSplitter(targetProcessor); + } + else if (this.ref instanceof MessageHandlerSpec messageHandlerSpec) { + MessageHandler messageHandler = messageHandlerSpec.getObject(); + Assert.isInstanceOf(AbstractMessageSplitter.class, messageHandler, + "Only the 'MessageHandlerSpec' producing an `AbstractMessageSplitter` can be used as a `ref`. " + + "All others should be used in a `.handle()`."); + splitter = (AbstractMessageSplitter) messageHandler; + } + else if (this.ref instanceof AbstractMessageSplitter messageSplitter) { + splitter = messageSplitter; + } + else { + splitter = new MethodInvokingSplitter(this.ref); + } + } + else if (this.refName != null) { + splitter = new MethodInvokingSplitter(new BeanNameMessageProcessor<>(this.refName, this.method)); + } + else if (this.function != null) { + splitter = wrapFunctionToSplitter(); + } + + if (this.delimiters != null) { + if (splitter instanceof DefaultMessageSplitter defaultMessageSplitter) { + defaultMessageSplitter.setDelimiters(this.delimiters); + } + else { + logger.warn("'delimiters' can be applied only for the DefaultMessageSplitter"); + } + } + + JavaUtils.INSTANCE + .acceptIfNotNull(this.discardChannel, splitter::setDiscardChannel) + .acceptIfHasText(this.discardChannelName, splitter::setDiscardChannelName) + .acceptIfNotNull(this.applySequence, splitter::setApplySequence); + + this.handler = splitter; + + return super.doGet(); + } + + private MethodInvokingSplitter wrapFunctionToSplitter() { + return ClassUtils.isLambda(this.function) + ? new MethodInvokingSplitter(new LambdaMessageProcessor(this.function, this.expectedType)) + : new MethodInvokingSplitter(this.function, ClassUtils.FUNCTION_APPLY_METHOD); + } + +} diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt index 4d81bac3216..4ef28eb191f 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt @@ -111,9 +111,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ } /** - * Inline function for [IntegrationFlowDefinition.transformWith] - * providing a `transform()` variant - * with reified generic type. + * Populate a transformer endpoint. * @since 6.2 */ fun transformWith(configurer: KotlinTransformerEndpointSpec.() -> Unit) { @@ -128,11 +126,24 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.split(P::class.java) { function(it) } } + /** + * Populate a splitter endpoint. + * @since 6.2 + */ + fun splitWith(configurer: KotlinSplitterSpec.() -> Unit) { + this.delegate.register(KotlinSplitterSpec(), configurer) + } /** * Inline function for [IntegrationFlowDefinition.split] providing a `split()` variant * with reified generic type. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + function {} + }""")) + @Suppress("DEPRECATION", "REMOVAL") inline fun split( crossinline function: (P) -> Any, crossinline configurer: KotlinSplitterEndpointSpec.() -> Unit @@ -708,10 +719,19 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.split() } + fun split(expression: String) { + this.delegate.split(expression) + } + /** - * Populate the [ExpressionEvaluatingSplitter] with provided - * SpEL expression. + * Populate the [ExpressionEvaluatingSplitter] with provided SpEL expression. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + expression() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( expression: String, endpointConfigurer: KotlinSplitterEndpointSpec.() -> Unit = {} @@ -733,6 +753,13 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * `method` of the `bean` at runtime. * In addition, accept options for the integration endpoint using [KotlinSplitterEndpointSpec]. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + ref() + method() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( service: Any, methodName: String?, splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit @@ -754,6 +781,13 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * `method` of the `bean` at runtime. * In addition, accept options for the integration endpoint using [KotlinSplitterEndpointSpec]. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + refName() + method() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( beanName: String, methodName: String?, splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit @@ -762,37 +796,67 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ this.delegate.split(beanName, methodName) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) } } + fun split(messageProcessorSpec: MessageProcessorSpec<*>) { + this.delegate.split(messageProcessorSpec) + } + /** * Populate the [MethodInvokingSplitter] to evaluate the * [MessageProcessor] at runtime from provided [MessageProcessorSpec]. * In addition, accept options for the integration endpoint using [KotlinSplitterEndpointSpec]. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + ref() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( messageProcessorSpec: MessageProcessorSpec<*>, - splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit = {} + splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit ) { this.delegate.split(messageProcessorSpec) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) } } + fun split(splitterMessageHandlerSpec: MessageHandlerSpec<*, out AbstractMessageSplitter>) { + this.delegate.split(splitterMessageHandlerSpec) + } + /** * Populate the provided [AbstractMessageSplitter] to the current integration flow position. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + ref() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( splitterMessageHandlerSpec: MessageHandlerSpec<*, S>, - splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit = {} + splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit ) { this.delegate.split(splitterMessageHandlerSpec) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) } } + fun split(splitter: AbstractMessageSplitter) { + this.delegate.split(splitter) + } + /** * Populate the provided [AbstractMessageSplitter] to the current integration * flow position. */ + @Deprecated("since 6.2", + ReplaceWith(""" + splitWith { + ref() + }""")) + @Suppress("DEPRECATION", "REMOVAL") fun split( splitter: S, - splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit = {} + splitterConfigurer: KotlinSplitterEndpointSpec.() -> Unit ) { this.delegate.split(splitter) { splitterConfigurer(KotlinSplitterEndpointSpec(it)) } diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt index 8cf449be1fb..2d86179f49d 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterEndpointSpec.kt @@ -28,6 +28,8 @@ import org.springframework.messaging.MessageChannel * * @since 5.3 */ +@Deprecated("since 6.2", ReplaceWith("KotlinSplitterSpec")) +@Suppress("REMOVAL", "DEPRECATION") class KotlinSplitterEndpointSpec(override val delegate: SplitterEndpointSpec) : KotlinConsumerEndpointSpec, H>(delegate) { diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterSpec.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterSpec.kt new file mode 100644 index 00000000000..e1edfc9c647 --- /dev/null +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinSplitterSpec.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl + +/** + * A [SplitterSpec] wrapped for Kotlin DSL. + * + * @author Artem Bilan + * + * @since 6.2 + */ +class KotlinSplitterSpec : SplitterSpec() { + + /** + * Provide a Kotlin function as a direct delegate for + * [org.springframework.integration.splitter.MethodInvokingSplitter]. + * @param function the function instance to use. + * @param

the input type. + */ + inline fun function(crossinline function: (P) -> Any) { + expectedType(P::class.java) + function

{ function(it) } + } + + fun discardFlow(discardFlow: KotlinIntegrationFlowDefinition.() -> Unit) { + discardFlow {definition -> discardFlow(KotlinIntegrationFlowDefinition(definition)) } + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java index 0ca1165f0f8..0072b58f449 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java @@ -128,7 +128,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException { CountDownLatch finishLatch = new CountDownLatch(1); IntegrationFlow testFlow = f -> f - .split(__ -> Flux.fromStream(IntStream.range(0, 100).boxed()), null) + .splitWith(s -> s.function(__ -> Flux.fromStream(IntStream.range(0, 100).boxed()))) .channel(flux) .aggregate(a -> a.releaseStrategy(m -> m.size() == 100).releaseLockBeforeSend(true)) .handle(__ -> finishLatch.countDown()); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java index be136aaf10b..8b63145d1bd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java @@ -228,11 +228,17 @@ public TestSplitterPojo testSplitterData() { @SuppressWarnings("rawtypes") public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel, TaskExecutor taskExecutor) { return f -> f.enrichHeaders(s -> s.header("FOO", "BAR")) - .split("testSplitterData", "buildList", c -> c.applySequence(false)) + .splitWith(s -> s + .applySequence(false) + .refName("testSplitterData") + .method("buildList")) .channel(executorChannel) - .split(Message.class, Message::getPayload, c -> c.applySequence(false)) + .splitWith(s -> s + .applySequence(false) + .function(Message::getPayload) + .expectedType(Message.class)) .channel(MessageChannels.executor(taskExecutor)) - .split(s -> s + .splitWith(s -> s .applySequence(false) .delimiters(",")) .channel(MessageChannels.executor(taskExecutor)) @@ -248,8 +254,9 @@ public IntegrationFlow splitResequenceFlow(MessageChannel executorChannel, TaskE public IntegrationFlow splitAggregateFlow() { return IntegrationFlow.from("splitAggregateInput", true) .transform(Transformers.toJson(ObjectToJsonTransformer.ResultType.NODE)) - .split((splitter) -> splitter - .discardFlow((subFlow) -> subFlow.channel((c) -> c.queue("discardChannel")))) + .splitWith((splitter) -> splitter + .discardFlow((subFlow) -> subFlow + .channel((c) -> c.queue("discardChannel")))) .channel(MessageChannels.flux()) .resequence() .aggregate() diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index ae4bb8826bd..247a5e6c464 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -205,7 +205,7 @@ private Instant nextExecution(TriggerContext triggerContext) { @Override protected IntegrationFlowDefinition buildFlow() { return fromSupplier(this::messageSource, e -> e.poller(p -> p.trigger(this::nextExecution))) - .split(this, null, e -> e.applySequence(false)) + .splitWith(s -> s.applySequence(false).ref(this)) .transform(this) .aggregate(a -> a.processor(this, null)) .enrichHeaders(Collections.singletonMap("foo", "FOO")) @@ -268,7 +268,7 @@ public String handle(String payload) { @Override protected IntegrationFlowDefinition buildFlow() { return from("delaysBetweenPollsInput") - .split(splitter -> splitter.delimiters(",")) + .splitWith(splitter -> splitter.delimiters(",")) .channel(MessageChannels.queue()) .handle(this, "handle", e -> e.poller(poller -> poller.fixedDelay(500).maxMessagesPerPoll(1))) .channel(MessageChannels.queue("delaysBetweenPollsOutput")); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index ef32cf87af6..e267a1f5c0a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -191,7 +191,7 @@ void testFluxTransform() { QueueChannel resultChannel = new QueueChannel(); IntegrationFlow integrationFlow = f -> f - .split((splitter) -> splitter.delimiters(",")) + .splitWith((splitter) -> splitter.delimiters(",")) .fluxTransform(flux -> flux .map(Message::getPayload) .map(String::toUpperCase)) @@ -261,7 +261,7 @@ public Publisher> reactiveFlow() { public Publisher> pollableReactiveFlow() { return IntegrationFlow .from("inputChannel") - .split(s -> s.delimiters(",")) + .splitWith(s -> s.delimiters(",")) .transformWith(t -> t .transformer(Integer::parseInt) .reactive(flux -> flux.publishOn(Schedulers.parallel())) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java index 470d2d1447d..abf7ba0fc90 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -872,7 +872,7 @@ public IntegrationFlow scatterGatherFlow() { @Bean public IntegrationFlow nestedScatterGatherFlow() { return f -> f - .split(s -> s.delimiters(" ")) + .splitWith(s -> s.delimiters(" ")) .scatterGather( scatterer -> scatterer .recipientFlow(f1 -> f1.handle((p, h) -> p + " - flow 1")) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java index 3d48c89f8f3..c5377fc8229 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1128,6 +1128,7 @@ public void testCompiledSpELForProxy() { @Test + @Disabled("Jackson does not understand a generic argument value") public void testCollectionArgument() throws JsonProcessingException { class A { diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt index 6bb5072998f..8e9094c7b73 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt @@ -261,7 +261,8 @@ class KotlinDslTests { } transform { it.uppercase() } split> { it.payload } - split({ it }) { + splitWith { + function{ it } id("splitterEndpoint") phase(257) } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java index 8bfc11d62ac..17c4e09b806 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/dsl/FileTests.java @@ -416,11 +416,12 @@ public IntegrationFlow fileSplitterFlow(BeanFactory beanFactory) { .addFilter(new AcceptOnceFileListFilter<>()) .addFilter(fileExpressionFileListFilter)), e -> e.poller(p -> p.fixedDelay(100))) - .split(Files.splitter() + .splitWith(s -> s + .id("fileSplitter") + .ref(Files.splitter() .markers() .charset(StandardCharsets.US_ASCII) - .applySequence(true), - e -> e.id("fileSplitter")) + .applySequence(true))) .channel(c -> c.queue("fileSplittingResultChannel")) .get(); } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java index bc4ef5e0296..fd504945e52 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java @@ -349,7 +349,7 @@ public PollableChannel futuresChannel() { public IntegrationFlow sendToKafkaFlow( KafkaProducerMessageHandlerSpec kafkaMessageHandlerTopic2) { return f -> f - .split(p -> Stream.generate(() -> p).limit(101).iterator(), null) + .splitWith(s -> s.function(p -> Stream.generate(() -> p).limit(101).iterator())) .enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo")) .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle( diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 2c64c9f7a35..6cc8f0c3799 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -1396,7 +1396,7 @@ Consider the following integration flow: @Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) - .split(s -> s.delimiters(",")) + .splitWith(s -> s.delimiters(",")) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get(); @@ -1420,7 +1420,7 @@ The following example shows how to use `BoundRabbitChannelAdvice`: @Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) - .split(s -> s.delimiters(",") + .splitWith(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index 7afc3ffc48f..b0c6376c58b 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -491,14 +491,14 @@ To create a splitter, use the `split()` EIP method. By default, if the payload is an `Iterable`, an `Iterator`, an `Array`, a `Stream`, or a reactive `Publisher`, the `split()` method outputs each item as an individual message. It accepts a lambda, a SpEL expression, or any `AbstractMessageSplitter` implementation. Alternatively, you can use it without parameters to provide the `DefaultMessageSplitter`. -The following example shows how to use the `split()` method by providing a lambda: +The following example shows how to use the `splitWith()` method by providing a lambda: [source,java] ---- @Bean public IntegrationFlow splitFlow() { return IntegrationFlow.from("splitInput") - .split(s -> s.applySequence(false).delimiters(",")) + .splitWith(s -> s.applySequence(false).delimiters(",")) .channel(MessageChannels.executor(taskExecutor())) .get(); } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 53289da1d33..33f7d746d92 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -110,7 +110,7 @@ public ProducerFactory producerFactory() { @Bean public IntegrationFlow sendToKafkaFlow() { return f -> f - .split(p -> Stream.generate(() -> p).limit(101).iterator(), null) + .splitWith(s -> s.function(p -> Stream.generate(() -> p).limit(101).iterator())) .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC1) diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 17c6dbb83c4..f5be700435a 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -30,4 +30,4 @@ See <<./debezium.adoc#debezium, Debezium Support>> for more information. See <<./endpoint.adoc#endpoint-pollingconsumer, Polling Consumer>> for more information. - Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options. -See, for example, `transformWith()` in <<./dsl.adoc#java-dsl, Java DSL Chapter>>. +See, for example, `transformWith()`, `splitWith()` in <<./dsl.adoc#java-dsl, Java DSL Chapter>>.