Skip to content

Commit 6d7aebc

Browse files
artembilangaryrussell
authored andcommitted
GH-3592: Scatter-Gather: applySeq=true by default
Fixes #3592 * Configure XML parser & Java DSL for Scatter-Gather, based on the `RecipientListRouter` to set an `applySequence` to `true` by default. This will make a `gatherer` part to fully rely on the default correlation strategies
1 parent db287cf commit 6d7aebc

File tree

8 files changed

+28
-22
lines changed

8 files changed

+28
-22
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -89,7 +89,7 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean
8989
builder.addConstructorArgReference(scatterChannel);
9090
}
9191
else {
92-
BeanDefinition scattererDefinition = null;
92+
BeanDefinition scattererDefinition;
9393
if (!hasScatterer) {
9494
scattererDefinition = new RootBeanDefinition(RecipientListRouter.class);
9595
}
@@ -103,6 +103,11 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean
103103
if (hasScatterer && scatterer.hasAttribute(ID_ATTRIBUTE)) {
104104
scattererId = scatterer.getAttribute(ID_ATTRIBUTE);
105105
}
106+
107+
if (!scatterer.hasAttribute("apply-sequence")) {
108+
scattererDefinition.getPropertyValues().addPropertyValue("applySequence", true);
109+
}
110+
106111
parserContext.getRegistry().registerBeanDefinition(scattererId, scattererDefinition); // NOSONAR not null
107112
builder.addConstructorArgValue(new RuntimeBeanReference(scattererId));
108113
}
@@ -113,7 +118,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition
113118

114119
Element gatherer = DomUtils.getChildElementByTagName(element, "gatherer");
115120

116-
BeanDefinition gathererDefinition = null;
121+
BeanDefinition gathererDefinition;
117122
if (gatherer == null) {
118123
try {
119124
gatherer = DOCUMENT_BUILDER_FACTORY.newDocumentBuilder().newDocument().createElement("aggregator");
@@ -125,7 +130,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition
125130
}
126131
gathererDefinition = GATHERER_PARSER.parse(gatherer, // NOSONAR
127132
new ParserContext(parserContext.getReaderContext(),
128-
parserContext.getDelegate(), scatterGatherDefinition));
133+
parserContext.getDelegate(), scatterGatherDefinition));
129134
String gathererId = id + ".gatherer";
130135
if (gatherer != null && gatherer.hasAttribute(ID_ATTRIBUTE)) {
131136
gathererId = gatherer.getAttribute(ID_ATTRIBUTE);

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

+2
Original file line numberDiff line numberDiff line change
@@ -2749,6 +2749,7 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co
27492749
* Populate a {@link ScatterGatherHandler} to the current integration flow position
27502750
* based on the provided {@link RecipientListRouterSpec} for scattering function
27512751
* and {@link AggregatorSpec} for gathering function.
2752+
* For convenience, the {@link RecipientListRouterSpec#applySequence(boolean)} is set to true by default.
27522753
* @param scatterer the {@link Consumer} for {@link RecipientListRouterSpec} to configure scatterer.
27532754
* @param gatherer the {@link Consumer} for {@link AggregatorSpec} to configure gatherer.
27542755
* @param scatterGather the {@link Consumer} for {@link ScatterGatherSpec} to configure
@@ -2760,6 +2761,7 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co
27602761

27612762
Assert.notNull(scatterer, "'scatterer' must not be null");
27622763
RecipientListRouterSpec recipientListRouterSpec = new RecipientListRouterSpec();
2764+
recipientListRouterSpec.applySequence(true);
27632765
scatterer.accept(recipientListRouterSpec);
27642766
AggregatorSpec aggregatorSpec = new AggregatorSpec();
27652767
if (gatherer != null) {

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -857,7 +857,6 @@ public QueueChannel alwaysRecipient() {
857857
public IntegrationFlow scatterGatherFlow() {
858858
return f -> f
859859
.scatterGather(scatterer -> scatterer
860-
.applySequence(true)
861860
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
862861
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
863862
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
@@ -878,8 +877,7 @@ public IntegrationFlow nestedScatterGatherFlow() {
878877
.scatterGather(
879878
scatterer -> scatterer
880879
.recipientFlow(f1 -> f1.handle((p, h) -> p + " - flow 1"))
881-
.recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2"))
882-
.applySequence(true),
880+
.recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2")),
883881
gatherer -> gatherer
884882
.outputProcessor(mg -> mg
885883
.getMessages()
@@ -900,7 +898,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE
900898
return f -> f
901899
.scatterGather(
902900
scatterer -> scatterer
903-
.applySequence(true)
904901
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
905902
.recipientFlow(f2 -> f2
906903
.channel(c -> c.executor(taskExecutor))
@@ -923,7 +920,6 @@ public Message<?> processAsyncScatterError(MessagingException payload) {
923920
public IntegrationFlow propagateErrorFromGatherer(TaskExecutor taskExecutor) {
924921
return IntegrationFlows.from(Function.class)
925922
.scatterGather(s -> s
926-
.applySequence(true)
927923
.recipientFlow(subFlow -> subFlow
928924
.channel(c -> c.executor(taskExecutor))
929925
.transform(p -> "foo")),
@@ -943,9 +939,9 @@ public PollableChannel scatterGatherWireTapChannel() {
943939

944940
@Bean
945941
public IntegrationFlow scatterGatherInSubFlow() {
946-
return flow -> flow.scatterGather(s -> s.applySequence(true)
942+
return flow -> flow.scatterGather(s -> s
947943
.recipientFlow(inflow -> inflow.wireTap(scatterGatherWireTapChannel())
948-
.scatterGather(s1 -> s1.applySequence(true)
944+
.scatterGather(s1 -> s1
949945
.recipientFlow(IntegrationFlowDefinition::bridge)
950946
.recipientFlow("sequencetest"::equals,
951947
IntegrationFlowDefinition::bridge),

spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests-context.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
1515

1616
<int:scatter-gather id="scatterGather2" input-channel="input2" gather-channel="gatherChannel" gather-timeout="100">
17-
<int:scatterer id="myScatterer" apply-sequence="true">
17+
<int:scatterer id="myScatterer">
1818
<int:recipient channel="distributionChannel"/>
1919
</int:scatterer>
2020
<int:gatherer id="myGatherer" message-store="messageStore"/>

spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
<!--Distribution scenario-->
3232
<scatter-gather input-channel="inputDistribution" output-channel="output" gather-channel="gatherChannel">
33-
<scatterer apply-sequence="true">
33+
<scatterer>
3434
<recipient channel="distribution1Channel"/>
3535
<recipient channel="distribution2Channel"/>
3636
<recipient channel="distribution3Channel"/>

spring-integration-jmx/src/test/java/org/springframework/integration/monitor/ScatterGatherHandlerIntegrationTests.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,8 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.Executor;
2424

25-
import org.junit.Test;
26-
import org.junit.runner.RunWith;
25+
import org.junit.jupiter.api.Test;
2726

2827
import org.springframework.beans.factory.annotation.Autowired;
2928
import org.springframework.context.annotation.Bean;
@@ -54,16 +53,14 @@
5453
import org.springframework.messaging.SubscribableChannel;
5554
import org.springframework.messaging.support.MessageBuilder;
5655
import org.springframework.test.annotation.DirtiesContext;
57-
import org.springframework.test.context.ContextConfiguration;
58-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
56+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5957

6058
/**
6159
* @author Artem Bilan
6260
* @author Gary Russell
6361
* @since 4.1
6462
*/
65-
@ContextConfiguration
66-
@RunWith(SpringJUnit4ClassRunner.class)
63+
@SpringJUnitConfig
6764
@DirtiesContext
6865
public class ScatterGatherHandlerIntegrationTests {
6966

src/reference/asciidoc/scatter-gather.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ Unlike the `PublishSubscribeChannel` variant (the auction variant), having a `re
3535
With `apply-sequence="true"`, the default `sequenceSize` is supplied, and the `aggregator` can release the group correctly.
3636
The distribution option is mutually exclusive with the auction option.
3737

38+
NOTE: The `applySequence=true` is required only for plain Java configuration based on the `ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)` constructor configuration since the framework cannot mutate externally provided components.
39+
For convenience, the XML and Java DSL for `Scatter-Gather` sets `applySequence` to true starting with version 6.0.
40+
3841
For both the auction and the distribution variants, the request (scatter) message is enriched with the `gatherResultChannel` header to wait for a reply message from the `aggregator`.
3942

4043
By default, all suppliers should send their result to the `replyChannel` header (usually by omitting the `output-channel` from the ultimate endpoint).
@@ -183,7 +186,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE
183186
return f -> f
184187
.scatterGather(
185188
scatterer -> scatterer
186-
.applySequence(true)
187189
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
188190
.recipientFlow(f2 -> f2
189191
.channel(c -> c.executor(taskExecutor))

src/reference/asciidoc/whats-new.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ The messaging annotations don't require a `poller` attribute as an array of `@Po
3737

3838
See <<./configuration.adoc#annotations,Annotation Support>> for more information.
3939

40+
For convenience, the XML and Java DSL for Scatter-Gather, based on the `RecipientListRouter`, now sets an `applySequence = true`, so the gatherer part can rely on the default correlation strategies.
41+
42+
See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information.
43+
4044
[[x6.0-http]]
4145
=== HTTP Changes
4246

0 commit comments

Comments
 (0)