Skip to content

GH-3592: Scatter-Gather: applySeq=true by default #3795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -89,7 +89,7 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean
builder.addConstructorArgReference(scatterChannel);
}
else {
BeanDefinition scattererDefinition = null;
BeanDefinition scattererDefinition;
if (!hasScatterer) {
scattererDefinition = new RootBeanDefinition(RecipientListRouter.class);
}
Expand All @@ -103,6 +103,11 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean
if (hasScatterer && scatterer.hasAttribute(ID_ATTRIBUTE)) {
scattererId = scatterer.getAttribute(ID_ATTRIBUTE);
}

if (!scatterer.hasAttribute("apply-sequence")) {
scattererDefinition.getPropertyValues().addPropertyValue("applySequence", true);
}

parserContext.getRegistry().registerBeanDefinition(scattererId, scattererDefinition); // NOSONAR not null
builder.addConstructorArgValue(new RuntimeBeanReference(scattererId));
}
Expand All @@ -113,7 +118,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition

Element gatherer = DomUtils.getChildElementByTagName(element, "gatherer");

BeanDefinition gathererDefinition = null;
BeanDefinition gathererDefinition;
if (gatherer == null) {
try {
gatherer = DOCUMENT_BUILDER_FACTORY.newDocumentBuilder().newDocument().createElement("aggregator");
Expand All @@ -125,7 +130,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition
}
gathererDefinition = GATHERER_PARSER.parse(gatherer, // NOSONAR
new ParserContext(parserContext.getReaderContext(),
parserContext.getDelegate(), scatterGatherDefinition));
parserContext.getDelegate(), scatterGatherDefinition));
String gathererId = id + ".gatherer";
if (gatherer != null && gatherer.hasAttribute(ID_ATTRIBUTE)) {
gathererId = gatherer.getAttribute(ID_ATTRIBUTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2749,6 +2749,7 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co
* Populate a {@link ScatterGatherHandler} to the current integration flow position
* based on the provided {@link RecipientListRouterSpec} for scattering function
* and {@link AggregatorSpec} for gathering function.
* For convenience, the {@link RecipientListRouterSpec#applySequence(boolean)} is set to true by default.
* @param scatterer the {@link Consumer} for {@link RecipientListRouterSpec} to configure scatterer.
* @param gatherer the {@link Consumer} for {@link AggregatorSpec} to configure gatherer.
* @param scatterGather the {@link Consumer} for {@link ScatterGatherSpec} to configure
Expand All @@ -2760,6 +2761,7 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co

Assert.notNull(scatterer, "'scatterer' must not be null");
RecipientListRouterSpec recipientListRouterSpec = new RecipientListRouterSpec();
recipientListRouterSpec.applySequence(true);
scatterer.accept(recipientListRouterSpec);
AggregatorSpec aggregatorSpec = new AggregatorSpec();
if (gatherer != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -857,7 +857,6 @@ public QueueChannel alwaysRecipient() {
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
Expand All @@ -878,8 +877,7 @@ public IntegrationFlow nestedScatterGatherFlow() {
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.handle((p, h) -> p + " - flow 1"))
.recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2"))
.applySequence(true),
.recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2")),
gatherer -> gatherer
.outputProcessor(mg -> mg
.getMessages()
Expand All @@ -900,7 +898,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
Expand All @@ -923,7 +920,6 @@ public Message<?> processAsyncScatterError(MessagingException payload) {
public IntegrationFlow propagateErrorFromGatherer(TaskExecutor taskExecutor) {
return IntegrationFlows.from(Function.class)
.scatterGather(s -> s
.applySequence(true)
.recipientFlow(subFlow -> subFlow
.channel(c -> c.executor(taskExecutor))
.transform(p -> "foo")),
Expand All @@ -943,9 +939,9 @@ public PollableChannel scatterGatherWireTapChannel() {

@Bean
public IntegrationFlow scatterGatherInSubFlow() {
return flow -> flow.scatterGather(s -> s.applySequence(true)
return flow -> flow.scatterGather(s -> s
.recipientFlow(inflow -> inflow.wireTap(scatterGatherWireTapChannel())
.scatterGather(s1 -> s1.applySequence(true)
.scatterGather(s1 -> s1
.recipientFlow(IntegrationFlowDefinition::bridge)
.recipientFlow("sequencetest"::equals,
IntegrationFlowDefinition::bridge),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>

<int:scatter-gather id="scatterGather2" input-channel="input2" gather-channel="gatherChannel" gather-timeout="100">
<int:scatterer id="myScatterer" apply-sequence="true">
<int:scatterer id="myScatterer">
<int:recipient channel="distributionChannel"/>
</int:scatterer>
<int:gatherer id="myGatherer" message-store="messageStore"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

<!--Distribution scenario-->
<scatter-gather input-channel="inputDistribution" output-channel="output" gather-channel="gatherChannel">
<scatterer apply-sequence="true">
<scatterer>
<recipient channel="distribution1Channel"/>
<recipient channel="distribution2Channel"/>
<recipient channel="distribution3Channel"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,7 @@
import java.util.List;
import java.util.concurrent.Executor;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -54,16 +53,14 @@
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Artem Bilan
* @author Gary Russell
* @since 4.1
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class ScatterGatherHandlerIntegrationTests {

Expand Down
4 changes: 3 additions & 1 deletion src/reference/asciidoc/scatter-gather.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ Unlike the `PublishSubscribeChannel` variant (the auction variant), having a `re
With `apply-sequence="true"`, the default `sequenceSize` is supplied, and the `aggregator` can release the group correctly.
The distribution option is mutually exclusive with the auction option.

NOTE: The `applySequence=true` is required only for plain Java configuration based on the `ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)` constructor configuration since the framework cannot mutate externally provided components.
For convenience, the XML and Java DSL for `Scatter-Gather` sets `applySequence` to true starting with version 6.0.

For both the auction and the distribution variants, the request (scatter) message is enriched with the `gatherResultChannel` header to wait for a reply message from the `aggregator`.

By default, all suppliers should send their result to the `replyChannel` header (usually by omitting the `output-channel` from the ultimate endpoint).
Expand Down Expand Up @@ -183,7 +186,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ The messaging annotations don't require a `poller` attribute as an array of `@Po

See <<./configuration.adoc#annotations,Annotation Support>> for more information.

For convenience, the XML and Java DSL for Scatter-Gather, based on the `RecipientListRouter`, now sets an `applySequence = true`, so the gatherer part can rely on the default correlation strategies.

See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information.

[[x6.0-http]]
=== HTTP Changes

Expand Down