Skip to content

Make the default behavior of Scatter-Gather more intuitive, making performance optimization optional #3592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
swiss-chris opened this issue Jul 16, 2021 · 13 comments · Fixed by #3795

Comments

@swiss-chris
Copy link
Contributor

Expected Behavior

The default behavior of Scatter-Gather is:

  • applySequence is automatically set to true and can be explicitly set to false if desired
  • the aggregator release-strategy is to wait for all messages (using sequenceSize)
  • the output processor (payload aggregator of the abstract MessageGroupProcessor) recognizes when all messages contain Collections of the same type and returns a new collection containing all elements of all aggregated collections.

In summary, the default behaviour should be the most simple use case requiring zero configuration and zero special knowledge of the internals of how scatter gather was implemented in spring integration.

  • I tell the scatterer where to get the data and I transform the individual responses into a uniform data type.
  • I then expect the gatherer to wait for all the data from the scatterer to return (that's why I need applySequence() to be true by default and releaseStrategy() to wait for all messages to return by default), and if the return data are lists (which I would argue is a common use case), the aggregator is capable of automatically returning a new collection containing all the elements of the individual lists)

Any deviation from this standard simple case can be configured as desired and requires special knowledge of the internals of this spring integration (or dsl) library.

Current Behavior

  • If you don't provide applySequence(true), the result is typically an exception at runtime mentioning a missing corellation strategy
  • The default release strategy is to release each message as it arrives (see context for why this is contrary to the EIP definition of the scatter gather pattern). Also, there's no simple way to just tell the releaseStrategy to wait for all message to arrive. Instead I have to hard code something like group -> group.size == 2 making this brittle for refactoring if I add another recipientFlow to the scatterer. At the very lease I would expect an option to just tell the aggregator to wait for all messages instead of having to hard code how many subflows there are.
  • when receiving multiple lists of responses from the scatterer, the default aggregator behaviour is to return a list of lists, instead of a single list containing all aggregated elements

Hence using Scatter Gather for a simple intuitive default case (see context below) already requires special knowledge of the internals of how this was implemented in spring integration.

Context

According to this EIP definition of Scatter-Gather (https://www.enterpriseintegrationpatterns.com/BroadcastAggregate.html), "The Scatter-Gather routes a request message to the a number of recipients. It then uses an Aggregator to collect the responses and distill them into a single response message."

I have emphasized distill them into a single response message because this is what I would expect scatter gather to do by default and what inspires my suggestions for the default behaviour above.

The reason for the current default behavior was given at https://stackoverflow.com/a/68403033/349169: "The scatterer part of this component is fully based on the Recipient List Router which comes with false for that option by default. So, for consistency and runtime optimization we keep it false in scatter-gather as well".

In reply to "for runtime optimization":
Spring and especially Spring Boot are known and loved for being zero configuration frameworks reducing tedious boilerplate code and providing useful defaults that can be extended when desired. I would argue that any successful framework should prioritize intuitive default behaviors over premature optimization, allowing a quick and easy entry bar into using the framework, and leaving technical internals only for those occations when we have a need for optimizing beyond the simple default use cases.

In reply to "for consistency":
According to the EIP quote above, the idea of the scatter gather pattern and in particular the aggregator is to "distill" the gathered messages "into a single response message". As a user of the scatter gather functionality, I would expect that I can use it and it "just works" according to this explanation. All I want to tell it for the default case is 1. go get information here, here and here, and 2. give me back everything you received. The second part shouldn't need any configuration at all in any case where the return data is all of the same type. Just collect the received messages and return them in a single message. End of story. If the goal of a scatter gather pattern is to "distill them into a single response message", the default behaviour of the releaseStrategy should be to wait for all messages instead of releasing each arriving message individually as multiple response messages.

All deviations from this simple intuitive case can be dealt through additional configuration, such as setting applySequence() to false, setting releaseStrategy() to some other logic, and providing a custom outputProcessor().

@swiss-chris swiss-chris added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Jul 16, 2021
@artembilan artembilan added this to the Backlog milestone Jul 16, 2021
@artembilan
Copy link
Member

Since you know my position on the matter from that my answer to your SO question, we probably need to gather more opinions from other community and team members.
As long as I'm here up to consistency with existing solutions and for optimal memory and performance, which really makes that entry bar for Scatter-Gather easier because your experience here is fully based on the existing Recipient List Router and Aggregator, I'm still open for requested changes if that is really what majority would like to see for this component.

Note: the EIP pattern description is just recommendation how it could work and it really does not assume any internal specifics how those replies are going to be gatherer to a single response message.

@swiss-chris
Copy link
Contributor Author

Absolutely. This is just my opinion and I'd love to hear what other opinions are out there from other users / developers.

@garyrussell
Copy link
Contributor

I tend to concur that applySequence should be true by default and configure a sequence size release strategy by default. The fact that we use a RLR internally is an implementation detail.

However, this would be a behavior change so would have to wait until the next minor or major release.

I don't agree that a collection of collections should be "flatMapped" into a single collections by default, because

a. The collections might contain different types
b. Even if they are the same type, the caller may need to know to origin of each member of the collection.

EIP ... distill them into a single response message

But it doesn't attempt to discuss what the format of such a message is.

@swiss-chris
Copy link
Contributor Author

I would be quite happy with that change !

@artembilan
Copy link
Member

OK. Scheduled for the next 6.0.

Thank you all for your vision and opinion!

@artembilan artembilan modified the milestones: Backlog, 6.0.x Jul 19, 2021
@artembilan artembilan added in: core and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jul 19, 2021
@swiss-chris
Copy link
Contributor Author

Hi @artembilan
What's the status of this issue ? Will it still be included in 6.0 ?

@artembilan
Copy link
Member

Hi @swiss-chris !

Yes, this issue is still planning.
We are just short-handed to manage everything.

The contribution is welcome though: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc !

@swiss-chris
Copy link
Contributor Author

swiss-chris commented May 9, 2022

@garyrussell @artembilan
I just used .scatterGather() again today for the first time in a while, and I still believe it would be highly helpful and productivity increasing if there was an easy way to just ask the gatherer to implement an intuitive, default collecting aggregator.

In our project we have this handy class, but after not using it for 6 months, I forgot that it exists and lost 30 Minutes trying to remember how we use to do this.

@Component
public class UnifyingListMessageGroupsProcessor<T> extends AbstractAggregatingMessageGroupProcessor {

    // cdi: I'm surprised Spring Integration DSL doesn't do this automagically...
    @Override
    protected List<T> aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        return group
                .streamMessages()
                .map(Message::getPayload)
                .map(list -> (List<T>) list)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
    }
}

We then use it something like this:

    @Bean
    IntegrationFlow someFlow(
        ReleaseStrategy defaultReleaseStrategy /* we use new SimpleSequenceSizeReleaseStrategy() here */, 
        UnifyingListMessageGroupsProcessor<MyType> unifyingOutputProcessor
    ) {
        final String logCat = LoggingUtils.getLogCat(new Object() {});
        return IntegrationFlows
                .from(SOME_CHANNEL)
                .gateway(flow -> flow
                        .scatterGather(scatterer ->
                                scatterer
                                        .applySequence(true)
                                        .recipientFlow(getA())
                                        .recipientFlow(getB()),
                                gatherer -> gatherer
                                        .releaseStrategy(defaultReleaseStrategy)
                                        .outputProcessor(unifyingOutputProcessor)))
                .gateway(someProcessing())
                .log(INFO, logCat, m -> "Finished flow for channel " + SOME_CHANNEL)
                .nullChannel();
    }

Maybe if you don't like a default behavior for gatherer.outputProcessor(), then there could be an alternative gatherer.defaultOutputProcessor() or some other fancy way to let the user choose if they want to implement their own.

One additional bit of background info. AFAIK there is no other/better way to accomplish this very common use case. We want to get data from 2 sources but the the rest of the flow should be the same for the results from both sources. We always use scatterGather() for this and we always need the default UnifyingListMessageGroupsProcessor behavior. If you know of a better/simpler way to do this, I'm happy to change my position about wanting this default behavior for scatterGather. But if scatterGather remains the easiest way to implement this, I still think it would be nice to offer this functionality without having to write your own Output Processor every time, or having to re-implement a generic one like the one we now use for every project.

@artembilan
Copy link
Member

@swiss-chris ,

that

                .map(Message::getPayload)
                .map(list -> (List<T>) list)

is not a default behavior of the framework. We really have no idea what is your payload to make some decision for you.
The fact that the payload of one of your gathered replies is a List, is just implementation details of your project.
Therefore it is not so generic from the framework perspective as you describe it.

Not sure why you'd like to see that framework does something on the matter by default...

@swiss-chris
Copy link
Contributor Author

I see your point. I wasn't considering non-List messages...

@artembilan artembilan modified the milestones: 6.0.x, 6.0 M3 May 9, 2022
@artembilan artembilan self-assigned this May 9, 2022
artembilan added a commit to artembilan/spring-integration that referenced this issue May 9, 2022
Fixes spring-projects#3592

* Configure XML parser & Java DSL for Scatter-Gather, based on the
`RecipientListRouter` to set an `applySequence` to `true` by default.
This will make a `gatherer` part to fully rely on the default correlation
strategies
garyrussell pushed a commit that referenced this issue May 10, 2022
Fixes #3592

* Configure XML parser & Java DSL for Scatter-Gather, based on the
`RecipientListRouter` to set an `applySequence` to `true` by default.
This will make a `gatherer` part to fully rely on the default correlation
strategies
@swiss-chris
Copy link
Contributor Author

swiss-chris commented May 10, 2022

@garyrussell and @artembilan thanks for this change artembilan@e770823

@garyrussell If I understood correctly, you were also in favor of configuring "a sequence size release strategy by default": #3592 (comment)

Are you still considering adding this to v6 ?

@artembilan
Copy link
Member

"a sequence size release strategy by default"

It was always there by default anyway. See AbstractCorrelatingMessageHandler:

		this.releaseStrategy =
				releaseStrategy == null
						? new SimpleSequenceSizeReleaseStrategy()
						: releaseStrategy;

I'm not sure why is the question?..

@swiss-chris
Copy link
Contributor Author

You're right. For some reason I was convinced that this didn't work out of the box, but I just checked and it does work as you say. In that case, please ignore my suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants