diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java index dbf854d0022..7831f87e1f2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,7 +89,7 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean builder.addConstructorArgReference(scatterChannel); } else { - BeanDefinition scattererDefinition = null; + BeanDefinition scattererDefinition; if (!hasScatterer) { scattererDefinition = new RootBeanDefinition(RecipientListRouter.class); } @@ -103,6 +103,11 @@ private void scatter(ParserContext parserContext, String scatterChannel, boolean if (hasScatterer && scatterer.hasAttribute(ID_ATTRIBUTE)) { scattererId = scatterer.getAttribute(ID_ATTRIBUTE); } + + if (!scatterer.hasAttribute("apply-sequence")) { + scattererDefinition.getPropertyValues().addPropertyValue("applySequence", true); + } + parserContext.getRegistry().registerBeanDefinition(scattererId, scattererDefinition); // NOSONAR not null builder.addConstructorArgValue(new RuntimeBeanReference(scattererId)); } @@ -113,7 +118,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition Element gatherer = DomUtils.getChildElementByTagName(element, "gatherer"); - BeanDefinition gathererDefinition = null; + BeanDefinition gathererDefinition; if (gatherer == null) { try { gatherer = DOCUMENT_BUILDER_FACTORY.newDocumentBuilder().newDocument().createElement("aggregator"); @@ -125,7 +130,7 @@ private void gather(Element element, ParserContext parserContext, BeanDefinition } gathererDefinition = GATHERER_PARSER.parse(gatherer, // NOSONAR new ParserContext(parserContext.getReaderContext(), - parserContext.getDelegate(), scatterGatherDefinition)); + parserContext.getDelegate(), scatterGatherDefinition)); String gathererId = id + ".gatherer"; if (gatherer != null && gatherer.hasAttribute(ID_ATTRIBUTE)) { gathererId = gatherer.getAttribute(ID_ATTRIBUTE); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 104de5ee29f..121e1bcfb3e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -2749,6 +2749,7 @@ public B scatterGather(Consumer scatterer, @Nullable Co * Populate a {@link ScatterGatherHandler} to the current integration flow position * based on the provided {@link RecipientListRouterSpec} for scattering function * and {@link AggregatorSpec} for gathering function. + * For convenience, the {@link RecipientListRouterSpec#applySequence(boolean)} is set to true by default. * @param scatterer the {@link Consumer} for {@link RecipientListRouterSpec} to configure scatterer. * @param gatherer the {@link Consumer} for {@link AggregatorSpec} to configure gatherer. * @param scatterGather the {@link Consumer} for {@link ScatterGatherSpec} to configure @@ -2760,6 +2761,7 @@ public B scatterGather(Consumer scatterer, @Nullable Co Assert.notNull(scatterer, "'scatterer' must not be null"); RecipientListRouterSpec recipientListRouterSpec = new RecipientListRouterSpec(); + recipientListRouterSpec.applySequence(true); scatterer.accept(recipientListRouterSpec); AggregatorSpec aggregatorSpec = new AggregatorSpec(); if (gatherer != null) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java index f3ba2398ec4..e7f9a616c1b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -857,7 +857,6 @@ public QueueChannel alwaysRecipient() { public IntegrationFlow scatterGatherFlow() { return f -> f .scatterGather(scatterer -> scatterer - .applySequence(true) .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)) .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)) .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)), @@ -878,8 +877,7 @@ public IntegrationFlow nestedScatterGatherFlow() { .scatterGather( scatterer -> scatterer .recipientFlow(f1 -> f1.handle((p, h) -> p + " - flow 1")) - .recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2")) - .applySequence(true), + .recipientFlow(f2 -> f2.handle((p, h) -> p + " - flow 2")), gatherer -> gatherer .outputProcessor(mg -> mg .getMessages() @@ -900,7 +898,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE return f -> f .scatterGather( scatterer -> scatterer - .applySequence(true) .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1")) .recipientFlow(f2 -> f2 .channel(c -> c.executor(taskExecutor)) @@ -923,7 +920,6 @@ public Message processAsyncScatterError(MessagingException payload) { public IntegrationFlow propagateErrorFromGatherer(TaskExecutor taskExecutor) { return IntegrationFlows.from(Function.class) .scatterGather(s -> s - .applySequence(true) .recipientFlow(subFlow -> subFlow .channel(c -> c.executor(taskExecutor)) .transform(p -> "foo")), @@ -943,9 +939,9 @@ public PollableChannel scatterGatherWireTapChannel() { @Bean public IntegrationFlow scatterGatherInSubFlow() { - return flow -> flow.scatterGather(s -> s.applySequence(true) + return flow -> flow.scatterGather(s -> s .recipientFlow(inflow -> inflow.wireTap(scatterGatherWireTapChannel()) - .scatterGather(s1 -> s1.applySequence(true) + .scatterGather(s1 -> s1 .recipientFlow(IntegrationFlowDefinition::bridge) .recipientFlow("sequencetest"::equals, IntegrationFlowDefinition::bridge), diff --git a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests-context.xml index a62e3b7de87..b7292545f03 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests-context.xml @@ -14,7 +14,7 @@ - + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml index 5255b2eaf95..5754fce70f5 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml @@ -30,7 +30,7 @@ - + diff --git a/spring-integration-jmx/src/test/java/org/springframework/integration/monitor/ScatterGatherHandlerIntegrationTests.java b/spring-integration-jmx/src/test/java/org/springframework/integration/monitor/ScatterGatherHandlerIntegrationTests.java index e8e046e61c6..6af6141a320 100644 --- a/spring-integration-jmx/src/test/java/org/springframework/integration/monitor/ScatterGatherHandlerIntegrationTests.java +++ b/spring-integration-jmx/src/test/java/org/springframework/integration/monitor/ScatterGatherHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,7 @@ import java.util.List; import java.util.concurrent.Executor; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -54,16 +53,14 @@ import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Artem Bilan * @author Gary Russell * @since 4.1 */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@SpringJUnitConfig @DirtiesContext public class ScatterGatherHandlerIntegrationTests { diff --git a/src/reference/asciidoc/scatter-gather.adoc b/src/reference/asciidoc/scatter-gather.adoc index 66567b0f83c..8923421efed 100644 --- a/src/reference/asciidoc/scatter-gather.adoc +++ b/src/reference/asciidoc/scatter-gather.adoc @@ -35,6 +35,9 @@ Unlike the `PublishSubscribeChannel` variant (the auction variant), having a `re With `apply-sequence="true"`, the default `sequenceSize` is supplied, and the `aggregator` can release the group correctly. The distribution option is mutually exclusive with the auction option. +NOTE: The `applySequence=true` is required only for plain Java configuration based on the `ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)` constructor configuration since the framework cannot mutate externally provided components. +For convenience, the XML and Java DSL for `Scatter-Gather` sets `applySequence` to true starting with version 6.0. + For both the auction and the distribution variants, the request (scatter) message is enriched with the `gatherResultChannel` header to wait for a reply message from the `aggregator`. By default, all suppliers should send their result to the `replyChannel` header (usually by omitting the `output-channel` from the ultimate endpoint). @@ -183,7 +186,6 @@ public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskE return f -> f .scatterGather( scatterer -> scatterer - .applySequence(true) .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1")) .recipientFlow(f2 -> f2 .channel(c -> c.executor(taskExecutor)) diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 54b7ec8088d..9fd8c489a5e 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -31,6 +31,10 @@ The messaging annotations don't require a `poller` attribute as an array of `@Po See <<./configuration.adoc#annotations,Annotation Support>> for more information. +For convenience, the XML and Java DSL for Scatter-Gather, based on the `RecipientListRouter`, now sets an `applySequence = true`, so the gatherer part can rely on the default correlation strategies. + +See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information. + [[x6.0-http]] === HTTP Changes