Skip to content

Improve split() DSL #8666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,8 +108,7 @@ public BoundRabbitChannelAdvice advice(RabbitTemplate template) {
@Bean
public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice advice) {
return IntegrationFlow.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(advice))
.splitWith(s -> s.delimiters(",").advice(advice))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey(QUEUE))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public RabbitTemplate template() throws Exception {
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gate.class)
.split(s -> s.delimiters(",")
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,7 +80,7 @@ public <T> JavaUtils acceptIfNotNull(@Nullable T value, Consumer<T> consumer) {
* @return this.
* @since 5.2
*/
public JavaUtils acceptIfHasText(String value, Consumer<String> consumer) {
public JavaUtils acceptIfHasText(@Nullable String value, Consumer<String> consumer) {
if (StringUtils.hasText(value)) {
consumer.accept(value);
}
Expand All @@ -95,7 +95,7 @@ public JavaUtils acceptIfHasText(String value, Consumer<String> consumer) {
* @return this.
* @since 5.2
*/
public <T> JavaUtils acceptIfNotEmpty(List<T> value, Consumer<List<T>> consumer) {
public <T> JavaUtils acceptIfNotEmpty(@Nullable List<T> value, Consumer<List<T>> consumer) {
if (!CollectionUtils.isEmpty(value)) {
consumer.accept(value);
}
Expand All @@ -110,7 +110,7 @@ public <T> JavaUtils acceptIfNotEmpty(List<T> value, Consumer<List<T>> consumer)
* @return this.
* @since 5.2
*/
public <T> JavaUtils acceptIfNotEmpty(T[] value, Consumer<T[]> consumer) {
public <T> JavaUtils acceptIfNotEmpty(@Nullable T[] value, Consumer<T[]> consumer) {
if (!ObjectUtils.isEmpty(value)) {
consumer.accept(value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import reactor.util.function.Tuple2;

import org.springframework.core.Ordered;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.BarrierMessageHandler;
import org.springframework.integration.aggregator.CorrelationStrategy;
Expand All @@ -44,14 +43,6 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
private CorrelationStrategy correlationStrategy =
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);

private boolean requiresReply;

private long sendTimeout = -1;

private int order = Ordered.LOWEST_PRECEDENCE;

private boolean async;

protected BarrierSpec(long timeout) {
super(null);
this.timeout = timeout;
Expand All @@ -69,40 +60,9 @@ public BarrierSpec correlationStrategy(CorrelationStrategy correlationStrategy)
return this;
}

@Override
public BarrierSpec requiresReply(boolean requiresReply) {
this.requiresReply = requiresReply;
return this;
}

@Override
public BarrierSpec sendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
return this;
}

@Override
public BarrierSpec order(int order) {
this.order = order;
return this;
}

@Override
public BarrierSpec async(boolean async) {
this.async = async;
return this;
}

@Override
public Tuple2<ConsumerEndpointFactoryBean, BarrierMessageHandler> doGet() {
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
if (!this.adviceChain.isEmpty()) {
this.handler.setAdviceChain(this.adviceChain);
}
this.handler.setRequiresReply(this.requiresReply);
this.handler.setSendTimeout(this.sendTimeout);
this.handler.setAsync(this.async);
this.handler.setOrder(this.order);
return super.doGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ public B enrichHeaders(Consumer<HeaderEnricherSpec> headerEnricherConfigurer) {
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B split() {
return split((Consumer<SplitterEndpointSpec<DefaultMessageSplitter>>) null);
return splitWith((splitterSpec) -> { });
}

/**
Expand All @@ -1420,21 +1420,47 @@ public B split() {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
* and for {@link DefaultMessageSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public B split(@Nullable Consumer<SplitterEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
return split(new DefaultMessageSplitter(), endpointConfigurer);
}


/**
* Populate the splitter with provided options to the current integration flow position:
* <pre class="code">
* {@code
* .splitWith(s -> s.applySequence(false).delimiters(","))
* }
* </pre>
* or with the refenrence to POJO service method call:
* <pre class="code">
* {@code
* .splitWith(s -> s.ref("someService").method("someMethod"))
* }
* </pre>
* @param splitterConfigurer the {@link Consumer} to provide options splitter endpoint.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.2
* @see SplitterSpec
*/
public B splitWith(Consumer<SplitterSpec> splitterConfigurer) {
return register(new SplitterSpec(), splitterConfigurer);
}

/**
* Populate the {@link ExpressionEvaluatingSplitter} with provided SpEL expression.
* @param expression the splitter SpEL expression.
* and for {@link ExpressionEvaluatingSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @see SplitterEndpointSpec
* @see SplitterSpec
*/
public B split(String expression) {
return split(expression, (Consumer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>>) null);
return splitWith((splitterSpec) -> splitterSpec.expression(expression));
}

/**
Expand All @@ -1443,8 +1469,11 @@ public B split(String expression) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
* and for {@link ExpressionEvaluatingSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public B split(String expression,
@Nullable Consumer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {

Expand Down Expand Up @@ -1472,7 +1501,7 @@ public B split(Object service) {
* @see MethodInvokingSplitter
*/
public B split(Object service, @Nullable String methodName) {
return split(service, methodName, null);
return splitWith((splitterSpec) -> splitterSpec.ref(service).method(methodName));
}

/**
Expand All @@ -1484,9 +1513,12 @@ public B split(Object service, @Nullable String methodName) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
* and for {@link MethodInvokingSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
* @see MethodInvokingSplitter
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public B split(Object service, @Nullable String methodName,
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

Expand All @@ -1508,7 +1540,7 @@ public B split(Object service, @Nullable String methodName,
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B split(String beanName, @Nullable String methodName) {
return split(beanName, methodName, null);
return splitWith((splitterSpec) -> splitterSpec.refName(beanName).method(methodName));
}

/**
Expand All @@ -1520,8 +1552,11 @@ public B split(String beanName, @Nullable String methodName) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
* and for {@link MethodInvokingSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public B split(String beanName, @Nullable String methodName,
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

Expand All @@ -1540,10 +1575,10 @@ public B split(String beanName, @Nullable String methodName,
* </pre>
* @param messageProcessorSpec the splitter {@link MessageProcessorSpec}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @see SplitterEndpointSpec
* @see SplitterSpec
*/
public B split(MessageProcessorSpec<?> messageProcessorSpec) {
return split(messageProcessorSpec, (Consumer<SplitterEndpointSpec<MethodInvokingSplitter>>) null);
return splitWith((splitterSpec) -> splitterSpec.ref(messageProcessorSpec));
}

/**
Expand All @@ -1561,8 +1596,11 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
* and for {@link MethodInvokingSplitter}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public B split(MessageProcessorSpec<?> messageProcessorSpec,
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

Expand Down Expand Up @@ -1595,7 +1633,7 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec,
* @see LambdaMessageProcessor
*/
public <P> B split(Class<P> expectedType, Function<P, ?> splitter) {
return split(expectedType, splitter, null);
return splitWith((splitterSpec) -> splitterSpec.function(splitter).expectedType(expectedType));
}

/**
Expand All @@ -1621,9 +1659,12 @@ public <P> B split(Class<P> expectedType, Function<P, ?> splitter) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <P> the payload type or {@code Message.class}.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see LambdaMessageProcessor
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public <P> B split(@Nullable Class<P> expectedType, Function<P, ?> splitter,
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

Expand All @@ -1640,10 +1681,10 @@ public <P> B split(@Nullable Class<P> expectedType, Function<P, ?> splitter,
* @param splitterMessageHandlerSpec the {@link MessageHandlerSpec} to populate.
* @param <S> the {@link AbstractMessageSplitter}
* @return the current {@link BaseIntegrationFlowDefinition}.
* @see SplitterEndpointSpec
* @see SplitterSpec
*/
public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> splitterMessageHandlerSpec) {
return split(splitterMessageHandlerSpec, (Consumer<SplitterEndpointSpec<S>>) null);
return splitWith((splitterSpec) -> splitterSpec.ref(splitterMessageHandlerSpec));
}

/**
Expand All @@ -1653,8 +1694,11 @@ public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> spli
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <S> the {@link AbstractMessageSplitter}
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> splitterMessageHandlerSpec,
@Nullable Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {

Expand All @@ -1667,10 +1711,10 @@ public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> spli
* flow position.
* @param splitter the {@link AbstractMessageSplitter} to populate.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @see SplitterEndpointSpec
* @see SplitterSpec
*/
public B split(AbstractMessageSplitter splitter) {
return split(splitter, (Consumer<SplitterEndpointSpec<AbstractMessageSplitter>>) null);
return splitWith((splitterSpec) -> splitterSpec.ref(splitter));
}

/**
Expand All @@ -1680,8 +1724,11 @@ public B split(AbstractMessageSplitter splitter) {
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @param <S> the {@link AbstractMessageSplitter}
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
* @see SplitterEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
@SuppressWarnings("removal")
public <S extends AbstractMessageSplitter> B split(S splitter,
@Nullable Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {

Expand Down
Loading