Skip to content

Commit 2f0b377

Browse files
artembilangaryrussell
authored andcommitted
INT-4091: Aggregator: Removal for Empty Groups
JIRA: https://jira.spring.io/browse/INT-4091 When `empty-group-min-timeout` is configured and `expireGroupsUponCompletion == false` and normal or partial sequences group release happens, schedule the group for removal after `empty-group-min-timeout`, since it is empty already. That lets to avoid `MessageGroupStoreReaper` configuration just for cleaning empty groups. * Retrieve the group `lastModified` to check if group is still valid for removal * Remove `ScheduledFeature` in the remove task * Polishing for debug messages to reflect the current logic * Reschedule empty group removal task in case of `InterruptedException` on the `lock.lockInterruptibly()` * Fix typos in docs Fix race condition between `groupStore.expireMessageGroups()`and `TaskScheduler` in the `AbstractCorrelatingMessageHandlerTests.testReaperReapsAnEmptyGroupAfterConfiguredDelay()` Also check groupSize for removal decision Address PR comments
1 parent 9ae8f32 commit 2f0b377

File tree

7 files changed

+141
-22
lines changed

7 files changed

+141
-22
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
9191

9292
private final Comparator<Message<?>> sequenceNumberComparator = new SequenceNumberComparator();
9393

94-
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new HashMap<UUID, ScheduledFuture<?>>();
94+
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new HashMap<>();
9595

9696
private final MessageGroupProcessor outputProcessor;
9797

@@ -163,8 +163,7 @@ public void setLockRegistry(LockRegistry lockRegistry) {
163163
public final void setMessageStore(MessageGroupStore store) {
164164
this.messageStore = store;
165165
store.registerMessageGroupExpiryCallback(
166-
(messageGroupStore, group) ->
167-
AbstractCorrelatingMessageHandler.this.forceReleaseProcessor.processMessageGroup(group));
166+
(messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group));
168167
}
169168

170169
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
@@ -254,9 +253,7 @@ private MessageGroupProcessor createGroupTimeoutProcessor() {
254253

255254
if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
256255
ProxyFactory proxyFactory = new ProxyFactory(processor);
257-
for (Advice advice : this.forceReleaseAdviceChain) {
258-
proxyFactory.addAdvice(advice);
259-
}
256+
this.forceReleaseAdviceChain.forEach(proxyFactory::addAdvice);
260257
return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
261258
}
262259
return processor;
@@ -405,7 +402,7 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
405402
if (scheduledFuture != null) {
406403
boolean canceled = scheduledFuture.cancel(true);
407404
if (canceled && this.logger.isDebugEnabled()) {
408-
this.logger.debug("Cancel 'forceComplete' scheduling for MessageGroup with Correlation Key [ "
405+
this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ "
409406
+ correlationKey + "].");
410407
}
411408
}
@@ -430,6 +427,9 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
430427
// processing messages
431428
this.afterRelease(messageGroup, completedMessages);
432429
}
430+
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
431+
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
432+
}
433433
}
434434
else {
435435
scheduleGroupToForceComplete(messageGroup);
@@ -444,6 +444,57 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
444444
}
445445
}
446446

447+
protected boolean isExpireGroupsUponCompletion() {
448+
return false;
449+
}
450+
451+
private void removeEmptyGroupAfterTimeout(MessageGroup messageGroup, long timeout) {
452+
Object groupId = messageGroup.getGroupId();
453+
UUID groupUuid = UUIDConverter.getUUID(groupId);
454+
ScheduledFuture<?> scheduledFuture = getTaskScheduler()
455+
.schedule(() -> {
456+
Lock lock = this.lockRegistry.obtain(groupUuid.toString());
457+
458+
try {
459+
lock.lockInterruptibly();
460+
try {
461+
this.expireGroupScheduledFutures.remove(groupUuid);
462+
/*
463+
* Obtain a fresh state for group from the MessageStore,
464+
* since it could be changed while we have waited for lock.
465+
*/
466+
MessageGroup groupNow = this.messageStore.getMessageGroup(groupUuid);
467+
boolean removeGroup = groupNow.size() == 0 &&
468+
groupNow.getLastModified()
469+
<= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
470+
if (removeGroup) {
471+
if (this.logger.isDebugEnabled()) {
472+
this.logger.debug("Removing empty group: " + groupUuid);
473+
}
474+
this.messageStore.removeMessageGroup(groupId);
475+
}
476+
}
477+
finally {
478+
lock.unlock();
479+
}
480+
}
481+
catch (InterruptedException e) {
482+
Thread.currentThread().interrupt();
483+
if (this.logger.isDebugEnabled()) {
484+
this.logger.debug("Thread was interrupted while trying to obtain lock."
485+
+ "Rescheduling empty MessageGroup [ " + groupId + "] for removal.");
486+
}
487+
removeEmptyGroupAfterTimeout(messageGroup, timeout);
488+
}
489+
490+
}, new Date(System.currentTimeMillis() + timeout));
491+
492+
if (this.logger.isDebugEnabled()) {
493+
this.logger.debug("Schedule empty MessageGroup [ " + groupId + "] for removal.");
494+
}
495+
this.expireGroupScheduledFutures.put(groupUuid, scheduledFuture);
496+
}
497+
447498
private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
448499
final Long groupTimeout = obtainGroupTimeout(messageGroup);
449500
/*
@@ -506,7 +557,7 @@ private void discardMessage(Message<?> message) {
506557
* @param completedMessages The completed messages.
507558
* @param timeout True if the release/discard was due to a timeout.
508559
*/
509-
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
560+
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
510561
afterRelease(group, completedMessages);
511562
}
512563

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) {
6161
this.expireGroupsUponCompletion = expireGroupsUponCompletion;
6262
}
6363

64+
@Override
65+
protected boolean isExpireGroupsUponCompletion() {
66+
return this.expireGroupsUponCompletion;
67+
}
68+
6469
@Override
6570
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
6671
Object groupId = messageGroup.getGroupId();

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration-5.0.xsd

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3834,15 +3834,16 @@
38343834
<xsd:attribute name="empty-group-min-timeout" type="xsd:string">
38353835
<xsd:annotation>
38363836
<xsd:documentation>
3837-
Only applies if a MessageGroupStoreReaper is configured for this Correlation
3838-
Endpoint's MessageStore.
3839-
By default, when a MessageGroupStoreReaper is configured to expire partial
3840-
groups, empty groups are also removed. Empty groups exist after a group
3841-
is released normally. This is to enable the detection and discarding of
3842-
late-arriving messages. If you wish to run empty group deletion on a longer
3837+
Empty groups exist after a group is released normally.
3838+
This is to enable the detection and discarding of late-arriving messages.
3839+
If you wish to run empty group deletion on a longer
38433840
schedule than expiring partial groups, set this property. Empty groups will
38443841
then not be removed from the MessageStore until they have not been modified
38453842
for at least this number of milliseconds.
3843+
If this is set, the group is scheduled for removal after normal
3844+
or partial sequences group release.
3845+
When a MessageGroupStoreReaper is configured to expire partial
3846+
groups, empty groups are also removed, but using this value.
38463847
Note that the actual time to expire an
38473848
empty group will also be affected by the reaper's 'timeout'
38483849
property and it could be as much as this value plus the timeout.

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.springframework.integration.test.util.TestUtils;
4848
import org.springframework.messaging.Message;
4949
import org.springframework.messaging.support.GenericMessage;
50+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
5051

5152
/**
5253
* @author Gary Russell
@@ -118,7 +119,7 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
118119
handler.setDiscardChannel(discards);
119120
handler.setSendPartialResultOnExpiry(true);
120121

121-
Message<String> message = MessageBuilder.withPayload("foo")
122+
Message<String> message = MessageBuilder.withPayload("foo")
122123
.setCorrelationId("qux")
123124
.build();
124125
// partial group that will be reaped
@@ -159,7 +160,7 @@ public void testReaperReapsAnEmptyGroup() throws Exception {
159160
});
160161
handler.setReleaseStrategy(group -> group.size() == 1);
161162

162-
Message<String> message = MessageBuilder.withPayload("foo")
163+
Message<String> message = MessageBuilder.withPayload("foo")
163164
.setCorrelationId("bar")
164165
.build();
165166
handler.handleMessage(message);
@@ -186,20 +187,32 @@ public void testReaperReapsAnEmptyGroupAfterConfiguredDelay() throws Exception {
186187
});
187188
handler.setReleaseStrategy(group -> group.size() == 1);
188189

189-
handler.setMinimumTimeoutForEmptyGroups(1000);
190-
191-
Message<String> message = MessageBuilder.withPayload("foo")
190+
Message<String> message = MessageBuilder.withPayload("foo")
192191
.setCorrelationId("bar")
193192
.build();
194193
handler.handleMessage(message);
195194

195+
handler.setMinimumTimeoutForEmptyGroups(100);
196+
196197
assertEquals(1, outputMessages.size());
197198

198199
assertEquals(1, TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size());
199200
groupStore.expireMessageGroups(0);
200201
assertEquals(1, TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size());
201-
Thread.sleep(1010);
202-
groupStore.expireMessageGroups(0);
202+
203+
int n = 0;
204+
205+
while (n++ < 200) {
206+
groupStore.expireMessageGroups(0);
207+
if (TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size() > 0) {
208+
Thread.sleep(50);
209+
}
210+
else {
211+
break;
212+
}
213+
}
214+
215+
assertTrue(n < 200);
203216
assertEquals(0, TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size());
204217
}
205218

@@ -321,6 +334,7 @@ public void testInt3483DeadlockOnMessageStoreRemoveMessageGroup() throws Interru
321334
handler.setReleaseStrategy(group -> true);
322335
handler.setExpireGroupsUponTimeout(false);
323336
SimpleMessageStore messageStore = new SimpleMessageStore() {
337+
324338
@Override
325339
public void removeMessageGroup(Object groupId) {
326340
throw new RuntimeException("intentional");
@@ -354,8 +368,51 @@ public void removeMessageGroup(Object groupId) {
354368
/* Since MessageGroup had been marked as 'complete', but hasn't been removed because of exception,
355369
the second message is discarded
356370
*/
357-
Message<?> receive = discardChannel.receive(1000);
371+
Message<?> receive = discardChannel.receive(10000);
358372
assertNotNull(receive);
359373
}
360374

375+
@Test
376+
public void testScheduleRemoveAnEmptyGroupAfterConfiguredDelay() throws Exception {
377+
final MessageGroupStore groupStore = new SimpleMessageStore();
378+
AggregatingMessageHandler handler = new AggregatingMessageHandler(group -> group, groupStore);
379+
380+
final List<Message<?>> outputMessages = new ArrayList<Message<?>>();
381+
handler.setOutputChannel((message, timeout) -> {
382+
/*
383+
* Executes when group 'bar' completes normally
384+
*/
385+
outputMessages.add(message);
386+
return true;
387+
});
388+
handler.setReleaseStrategy(group -> group.size() == 1);
389+
390+
handler.setMinimumTimeoutForEmptyGroups(100);
391+
392+
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
393+
taskScheduler.afterPropertiesSet();
394+
handler.setTaskScheduler(taskScheduler);
395+
396+
Message<String> message = MessageBuilder.withPayload("foo")
397+
.setCorrelationId("bar")
398+
.build();
399+
handler.handleMessage(message);
400+
401+
assertEquals(1, outputMessages.size());
402+
403+
assertEquals(1, TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size());
404+
405+
Thread.sleep(100);
406+
407+
int n = 0;
408+
409+
while (TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size() > 0
410+
&& n++ < 200) {
411+
Thread.sleep(50);
412+
}
413+
414+
assertTrue(n < 200);
415+
assertEquals(0, TestUtils.getPropertyValue(handler, "messageStore.groupIdToMessageGroup", Map.class).size());
416+
}
417+
361418
}

src/reference/asciidoc/aggregator.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,9 @@ Since _version 4.1_, you can now control this behavior using `expire-groups-upon
505505
NOTE: When a group is timed out, the `ReleaseStrategy` is given one more opportunity to release the group; if it does so, and `expire-groups-upon-timeout` is false, then expiration is controlled by `expire-groups-upon-completion`.
506506
If the group is not released by the release strategy during timeout, then the expiration is controlled by the `expire-groups-upon-timeout`.
507507
Timed-out groups are either discarded, or a partial release occurs (based on `send-partial-result-on-expiry`).
508+
509+
Starting with _version 5.0_ empty groups are also scheduled for removal after `empty-group-min-timeout`.
510+
If `expireGroupsUponCompletion == false` and `minimumTimeoutForEmptyGroups > 0`, the task to remove the group is scheduled, when normal or partial sequences release happens.
508511
=====
509512

510513
Using a `ref` attribute is generally recommended if a custom aggregator handler implementation may be referenced in other `<aggregator>` definitions.

src/reference/asciidoc/resequencer.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ Late arriving messages will be immediately discarded.
161161
Set this to `true` to remove the group completely; then, late arriving messages will start a new group and won't be discarded until the group again times out.
162162
The new group will never be released normally because of the "hole" in the sequence range that caused the timeout.
163163
Empty groups can be expired (completely removed) later using a `MessageGroupStoreReaper` together with the `empty-group-min-timeout` attribute.
164+
Starting with _version 5.0_ empty groups are also scheduled for removal after `empty-group-min-timeout`.
164165
Default: 'false'.
165166

166167
NOTE: Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.

src/reference/asciidoc/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,5 @@ See <<http-header-mapping>> for more information.
8181
==== Aggregator Performance Changes
8282

8383
Aggregators now use a `SimpleSequenceSizeReleaseStrategy` by default, which is more efficient, especially with large groups.
84+
Empty groups are now scheduled for removal after `empty-group-min-timeout`.
8485
See <<aggregator>> for more information.

0 commit comments

Comments
 (0)