Skip to content

Commit 56561e8

Browse files
committed
Fix releasePartialSequences value propagation for SequenceSizeReleaseStrategy.
Fixes: spring-projects#10003 Issue Link: spring-projects#10003 Signed-off-by: Jiandong Ma <[email protected]>
1 parent 05ee9ff commit 56561e8

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
299299
*/
300300
public void setReleasePartialSequences(boolean releasePartialSequences) {
301301
if (!this.releaseStrategySet && releasePartialSequences) {
302-
setReleaseStrategy(new SequenceSizeReleaseStrategy());
302+
setReleaseStrategy(new SequenceSizeReleaseStrategy(releasePartialSequences));
303303
}
304304
this.releasePartialSequences = releasePartialSequences;
305305
}

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -129,7 +129,6 @@ public void testAggPerf() throws InterruptedException, ExecutionException, Timeo
129129
}
130130

131131
@Test
132-
@Disabled("Time sensitive")
133132
public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionException, TimeoutException {
134133
AggregatingMessageHandler handler = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor());
135134
handler.setCorrelationStrategy(message -> "foo");
@@ -152,28 +151,30 @@ public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionEx
152151
store.setMessageGroupFactory(messageGroupFactory);
153152

154153
handler.setMessageStore(store);
154+
handler.setBeanFactory(mock(BeanFactory.class));
155+
handler.afterPropertiesSet();
155156

156157
StopWatch stopwatch = new StopWatch();
157158
stopwatch.start();
158-
for (int i = 0; i < 120000; i++) {
159-
if (i % 10000 == 0) {
159+
for (int i = 0; i < 1200; i++) {
160+
if (i % 100 == 0) {
160161
stopwatch.stop();
161162
logger.warn("Sent " + i + " in " + stopwatch.getTotalTimeSeconds() +
162-
" (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)");
163+
" (100 in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)");
163164
stopwatch.start();
164165
}
165166
handler.handleMessage(MessageBuilder.withPayload("foo")
166-
.setSequenceSize(120000)
167+
.setSequenceSize(1200)
167168
.setSequenceNumber(i + 1)
168169
.build());
169170
}
170171
stopwatch.stop();
171-
logger.warn("Sent " + 120000 + " in " + stopwatch.getTotalTimeSeconds() +
172+
logger.warn("Sent " + 1200 + " in " + stopwatch.getTotalTimeSeconds() +
172173
" (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)");
173174

174175
Collection<?> result = resultFuture.get(10, TimeUnit.SECONDS);
175176
assertThat(result).isNotNull();
176-
assertThat(result.size()).isEqualTo(120000);
177+
assertThat(result.size()).isEqualTo(1);
177178
assertThat(stopwatch.getTotalTimeSeconds()).isLessThan(60.0); // actually < 2.0, was many minutes
178179
}
179180

0 commit comments

Comments
 (0)