Skip to content

Commit 34f901f

Browse files
authored
Improve split() DSL (#8666)
* Introduce a `SplitterSpec` which can accept possible splitter variants: `expression`, `function`, `ref` etc. This way we are going to have a complex endpoint configuration only with a single method argument. * Use `SplitterSpec` in a newly introduced `splitWith()` * Deprecate those `split()` methods which use `SplitterEndpointSpec`. This method are complex enough because of their several arguments * Refactor some common `MessageHandler` options initialization into the `ConsumerEndpointSpec.doGet()` * Disable failing now `MethodInvokingMessageProcessorTests.testCollectionArgument()`: or Jackson problem, or fresh SF * Groovy DSL will be fixed in the separate PR: https://stackoverflow.com/questions/76595843/groovy-selects-a-defaultgroovymethods-split-instead-of-mine-one
1 parent 0798c8d commit 34f901f

File tree

26 files changed

+607
-124
lines changed

26 files changed

+607
-124
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -108,8 +108,7 @@ public BoundRabbitChannelAdvice advice(RabbitTemplate template) {
108108
@Bean
109109
public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice advice) {
110110
return IntegrationFlow.from(Gate.class)
111-
.split(s -> s.delimiters(",")
112-
.advice(advice))
111+
.splitWith(s -> s.delimiters(",").advice(advice))
113112
.<String, String>transform(String::toUpperCase)
114113
.handle(Amqp.outboundAdapter(template).routingKey(QUEUE))
115114
.get();

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public RabbitTemplate template() throws Exception {
118118
@Bean
119119
public IntegrationFlow flow(RabbitTemplate template) {
120120
return IntegrationFlow.from(Gate.class)
121-
.split(s -> s.delimiters(",")
121+
.splitWith(s -> s.delimiters(",")
122122
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
123123
.<String, String>transform(String::toUpperCase)
124124
.handle(Amqp.outboundAdapter(template).routingKey("rk"))

spring-integration-core/src/main/java/org/springframework/integration/JavaUtils.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,7 +80,7 @@ public <T> JavaUtils acceptIfNotNull(@Nullable T value, Consumer<T> consumer) {
8080
* @return this.
8181
* @since 5.2
8282
*/
83-
public JavaUtils acceptIfHasText(String value, Consumer<String> consumer) {
83+
public JavaUtils acceptIfHasText(@Nullable String value, Consumer<String> consumer) {
8484
if (StringUtils.hasText(value)) {
8585
consumer.accept(value);
8686
}
@@ -95,7 +95,7 @@ public JavaUtils acceptIfHasText(String value, Consumer<String> consumer) {
9595
* @return this.
9696
* @since 5.2
9797
*/
98-
public <T> JavaUtils acceptIfNotEmpty(List<T> value, Consumer<List<T>> consumer) {
98+
public <T> JavaUtils acceptIfNotEmpty(@Nullable List<T> value, Consumer<List<T>> consumer) {
9999
if (!CollectionUtils.isEmpty(value)) {
100100
consumer.accept(value);
101101
}
@@ -110,7 +110,7 @@ public <T> JavaUtils acceptIfNotEmpty(List<T> value, Consumer<List<T>> consumer)
110110
* @return this.
111111
* @since 5.2
112112
*/
113-
public <T> JavaUtils acceptIfNotEmpty(T[] value, Consumer<T[]> consumer) {
113+
public <T> JavaUtils acceptIfNotEmpty(@Nullable T[] value, Consumer<T[]> consumer) {
114114
if (!ObjectUtils.isEmpty(value)) {
115115
consumer.accept(value);
116116
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/BarrierSpec.java

+1-41
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@
1818

1919
import reactor.util.function.Tuple2;
2020

21-
import org.springframework.core.Ordered;
2221
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2322
import org.springframework.integration.aggregator.BarrierMessageHandler;
2423
import org.springframework.integration.aggregator.CorrelationStrategy;
@@ -44,14 +43,6 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
4443
private CorrelationStrategy correlationStrategy =
4544
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
4645

47-
private boolean requiresReply;
48-
49-
private long sendTimeout = -1;
50-
51-
private int order = Ordered.LOWEST_PRECEDENCE;
52-
53-
private boolean async;
54-
5546
protected BarrierSpec(long timeout) {
5647
super(null);
5748
this.timeout = timeout;
@@ -69,40 +60,9 @@ public BarrierSpec correlationStrategy(CorrelationStrategy correlationStrategy)
6960
return this;
7061
}
7162

72-
@Override
73-
public BarrierSpec requiresReply(boolean requiresReply) {
74-
this.requiresReply = requiresReply;
75-
return this;
76-
}
77-
78-
@Override
79-
public BarrierSpec sendTimeout(long sendTimeout) {
80-
this.sendTimeout = sendTimeout;
81-
return this;
82-
}
83-
84-
@Override
85-
public BarrierSpec order(int order) {
86-
this.order = order;
87-
return this;
88-
}
89-
90-
@Override
91-
public BarrierSpec async(boolean async) {
92-
this.async = async;
93-
return this;
94-
}
95-
9663
@Override
9764
public Tuple2<ConsumerEndpointFactoryBean, BarrierMessageHandler> doGet() {
9865
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
99-
if (!this.adviceChain.isEmpty()) {
100-
this.handler.setAdviceChain(this.adviceChain);
101-
}
102-
this.handler.setRequiresReply(this.requiresReply);
103-
this.handler.setSendTimeout(this.sendTimeout);
104-
this.handler.setAsync(this.async);
105-
this.handler.setOrder(this.order);
10666
return super.doGet();
10767
}
10868

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

+59-12
Original file line numberDiff line numberDiff line change
@@ -1405,7 +1405,7 @@ public B enrichHeaders(Consumer<HeaderEnricherSpec> headerEnricherConfigurer) {
14051405
* @return the current {@link BaseIntegrationFlowDefinition}.
14061406
*/
14071407
public B split() {
1408-
return split((Consumer<SplitterEndpointSpec<DefaultMessageSplitter>>) null);
1408+
return splitWith((splitterSpec) -> { });
14091409
}
14101410

14111411
/**
@@ -1420,21 +1420,47 @@ public B split() {
14201420
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
14211421
* and for {@link DefaultMessageSplitter}.
14221422
* @return the current {@link BaseIntegrationFlowDefinition}.
1423+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
14231424
* @see SplitterEndpointSpec
14241425
*/
1426+
@Deprecated(since = "6.2", forRemoval = true)
1427+
@SuppressWarnings("removal")
14251428
public B split(@Nullable Consumer<SplitterEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
14261429
return split(new DefaultMessageSplitter(), endpointConfigurer);
14271430
}
14281431

1432+
1433+
/**
1434+
* Populate the splitter with provided options to the current integration flow position:
1435+
* <pre class="code">
1436+
* {@code
1437+
* .splitWith(s -> s.applySequence(false).delimiters(","))
1438+
* }
1439+
* </pre>
1440+
* or with the refenrence to POJO service method call:
1441+
* <pre class="code">
1442+
* {@code
1443+
* .splitWith(s -> s.ref("someService").method("someMethod"))
1444+
* }
1445+
* </pre>
1446+
* @param splitterConfigurer the {@link Consumer} to provide options splitter endpoint.
1447+
* @return the current {@link BaseIntegrationFlowDefinition}.
1448+
* @since 6.2
1449+
* @see SplitterSpec
1450+
*/
1451+
public B splitWith(Consumer<SplitterSpec> splitterConfigurer) {
1452+
return register(new SplitterSpec(), splitterConfigurer);
1453+
}
1454+
14291455
/**
14301456
* Populate the {@link ExpressionEvaluatingSplitter} with provided SpEL expression.
14311457
* @param expression the splitter SpEL expression.
14321458
* and for {@link ExpressionEvaluatingSplitter}.
14331459
* @return the current {@link BaseIntegrationFlowDefinition}.
1434-
* @see SplitterEndpointSpec
1460+
* @see SplitterSpec
14351461
*/
14361462
public B split(String expression) {
1437-
return split(expression, (Consumer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>>) null);
1463+
return splitWith((splitterSpec) -> splitterSpec.expression(expression));
14381464
}
14391465

14401466
/**
@@ -1443,8 +1469,11 @@ public B split(String expression) {
14431469
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
14441470
* and for {@link ExpressionEvaluatingSplitter}.
14451471
* @return the current {@link BaseIntegrationFlowDefinition}.
1472+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
14461473
* @see SplitterEndpointSpec
14471474
*/
1475+
@Deprecated(since = "6.2", forRemoval = true)
1476+
@SuppressWarnings("removal")
14481477
public B split(String expression,
14491478
@Nullable Consumer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
14501479

@@ -1472,7 +1501,7 @@ public B split(Object service) {
14721501
* @see MethodInvokingSplitter
14731502
*/
14741503
public B split(Object service, @Nullable String methodName) {
1475-
return split(service, methodName, null);
1504+
return splitWith((splitterSpec) -> splitterSpec.ref(service).method(methodName));
14761505
}
14771506

14781507
/**
@@ -1484,9 +1513,12 @@ public B split(Object service, @Nullable String methodName) {
14841513
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
14851514
* and for {@link MethodInvokingSplitter}.
14861515
* @return the current {@link BaseIntegrationFlowDefinition}.
1516+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
14871517
* @see SplitterEndpointSpec
14881518
* @see MethodInvokingSplitter
14891519
*/
1520+
@Deprecated(since = "6.2", forRemoval = true)
1521+
@SuppressWarnings("removal")
14901522
public B split(Object service, @Nullable String methodName,
14911523
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
14921524

@@ -1508,7 +1540,7 @@ public B split(Object service, @Nullable String methodName,
15081540
* @return the current {@link BaseIntegrationFlowDefinition}.
15091541
*/
15101542
public B split(String beanName, @Nullable String methodName) {
1511-
return split(beanName, methodName, null);
1543+
return splitWith((splitterSpec) -> splitterSpec.refName(beanName).method(methodName));
15121544
}
15131545

15141546
/**
@@ -1520,8 +1552,11 @@ public B split(String beanName, @Nullable String methodName) {
15201552
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
15211553
* and for {@link MethodInvokingSplitter}.
15221554
* @return the current {@link BaseIntegrationFlowDefinition}.
1555+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
15231556
* @see SplitterEndpointSpec
15241557
*/
1558+
@Deprecated(since = "6.2", forRemoval = true)
1559+
@SuppressWarnings("removal")
15251560
public B split(String beanName, @Nullable String methodName,
15261561
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
15271562

@@ -1540,10 +1575,10 @@ public B split(String beanName, @Nullable String methodName,
15401575
* </pre>
15411576
* @param messageProcessorSpec the splitter {@link MessageProcessorSpec}.
15421577
* @return the current {@link BaseIntegrationFlowDefinition}.
1543-
* @see SplitterEndpointSpec
1578+
* @see SplitterSpec
15441579
*/
15451580
public B split(MessageProcessorSpec<?> messageProcessorSpec) {
1546-
return split(messageProcessorSpec, (Consumer<SplitterEndpointSpec<MethodInvokingSplitter>>) null);
1581+
return splitWith((splitterSpec) -> splitterSpec.ref(messageProcessorSpec));
15471582
}
15481583

15491584
/**
@@ -1561,8 +1596,11 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec) {
15611596
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options
15621597
* and for {@link MethodInvokingSplitter}.
15631598
* @return the current {@link BaseIntegrationFlowDefinition}.
1599+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
15641600
* @see SplitterEndpointSpec
15651601
*/
1602+
@Deprecated(since = "6.2", forRemoval = true)
1603+
@SuppressWarnings("removal")
15661604
public B split(MessageProcessorSpec<?> messageProcessorSpec,
15671605
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
15681606

@@ -1595,7 +1633,7 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec,
15951633
* @see LambdaMessageProcessor
15961634
*/
15971635
public <P> B split(Class<P> expectedType, Function<P, ?> splitter) {
1598-
return split(expectedType, splitter, null);
1636+
return splitWith((splitterSpec) -> splitterSpec.function(splitter).expectedType(expectedType));
15991637
}
16001638

16011639
/**
@@ -1621,9 +1659,12 @@ public <P> B split(Class<P> expectedType, Function<P, ?> splitter) {
16211659
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
16221660
* @param <P> the payload type or {@code Message.class}.
16231661
* @return the current {@link BaseIntegrationFlowDefinition}.
1662+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
16241663
* @see LambdaMessageProcessor
16251664
* @see SplitterEndpointSpec
16261665
*/
1666+
@Deprecated(since = "6.2", forRemoval = true)
1667+
@SuppressWarnings("removal")
16271668
public <P> B split(@Nullable Class<P> expectedType, Function<P, ?> splitter,
16281669
@Nullable Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
16291670

@@ -1640,10 +1681,10 @@ public <P> B split(@Nullable Class<P> expectedType, Function<P, ?> splitter,
16401681
* @param splitterMessageHandlerSpec the {@link MessageHandlerSpec} to populate.
16411682
* @param <S> the {@link AbstractMessageSplitter}
16421683
* @return the current {@link BaseIntegrationFlowDefinition}.
1643-
* @see SplitterEndpointSpec
1684+
* @see SplitterSpec
16441685
*/
16451686
public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> splitterMessageHandlerSpec) {
1646-
return split(splitterMessageHandlerSpec, (Consumer<SplitterEndpointSpec<S>>) null);
1687+
return splitWith((splitterSpec) -> splitterSpec.ref(splitterMessageHandlerSpec));
16471688
}
16481689

16491690
/**
@@ -1653,8 +1694,11 @@ public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> spli
16531694
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
16541695
* @param <S> the {@link AbstractMessageSplitter}
16551696
* @return the current {@link BaseIntegrationFlowDefinition}.
1697+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
16561698
* @see SplitterEndpointSpec
16571699
*/
1700+
@Deprecated(since = "6.2", forRemoval = true)
1701+
@SuppressWarnings("removal")
16581702
public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> splitterMessageHandlerSpec,
16591703
@Nullable Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {
16601704

@@ -1667,10 +1711,10 @@ public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> spli
16671711
* flow position.
16681712
* @param splitter the {@link AbstractMessageSplitter} to populate.
16691713
* @return the current {@link BaseIntegrationFlowDefinition}.
1670-
* @see SplitterEndpointSpec
1714+
* @see SplitterSpec
16711715
*/
16721716
public B split(AbstractMessageSplitter splitter) {
1673-
return split(splitter, (Consumer<SplitterEndpointSpec<AbstractMessageSplitter>>) null);
1717+
return splitWith((splitterSpec) -> splitterSpec.ref(splitter));
16741718
}
16751719

16761720
/**
@@ -1680,8 +1724,11 @@ public B split(AbstractMessageSplitter splitter) {
16801724
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
16811725
* @param <S> the {@link AbstractMessageSplitter}
16821726
* @return the current {@link BaseIntegrationFlowDefinition}.
1727+
* @deprecated since 6.2 in favor of {@link #splitWith(Consumer)}.
16831728
* @see SplitterEndpointSpec
16841729
*/
1730+
@Deprecated(since = "6.2", forRemoval = true)
1731+
@SuppressWarnings("removal")
16851732
public <S extends AbstractMessageSplitter> B split(S splitter,
16861733
@Nullable Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {
16871734

0 commit comments

Comments
 (0)