Skip to content

Commit 9075453

Browse files
artembilangaryrussell
authored andcommitted
GH-3794: Use less memory with scheduled tasks
Fixes #3794 * Replace hard reference to message group with its `id` in the `AbstractCorrelatingMessageHandler.removeEmptyGroupAfterTimeout()` * Replace hard reference to message with its `id` in the `DelayHandler.rescheduleAt()` **Cherry-pick to `5.5.x`**
1 parent 6d7aebc commit 9075453

File tree

2 files changed

+30
-34
lines changed

2 files changed

+30
-34
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -579,7 +579,7 @@ private boolean processMessageForGroup(Message<?> message, Object correlationKey
579579
afterRelease(messageGroup, completedMessages);
580580
}
581581
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
582-
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
582+
removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
583583
}
584584
}
585585
else {
@@ -616,29 +616,27 @@ protected boolean isExpireGroupsUponCompletion() {
616616
return false;
617617
}
618618

619-
private void removeEmptyGroupAfterTimeout(MessageGroup messageGroup, long timeout) {
620-
Object groupId = messageGroup.getGroupId();
621-
UUID groupUuid = UUIDConverter.getUUID(groupId);
619+
private void removeEmptyGroupAfterTimeout(UUID groupId, long timeout) {
622620
ScheduledFuture<?> scheduledFuture =
623621
getTaskScheduler()
624622
.schedule(() -> {
625-
Lock lock = this.lockRegistry.obtain(groupUuid.toString());
623+
Lock lock = this.lockRegistry.obtain(groupId.toString());
626624

627625
try {
628626
lock.lockInterruptibly();
629627
try {
630-
this.expireGroupScheduledFutures.remove(groupUuid);
628+
this.expireGroupScheduledFutures.remove(groupId);
631629
/*
632630
* Obtain a fresh state for group from the MessageStore,
633631
* since it could be changed while we have waited for lock.
634632
*/
635-
MessageGroup groupNow = this.messageStore.getMessageGroup(groupUuid);
633+
MessageGroup groupNow = this.messageStore.getMessageGroup(groupId);
636634
boolean removeGroup = groupNow.size() == 0 &&
637635
groupNow.getLastModified()
638636
<= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
639637
if (removeGroup) {
640-
this.logger.debug(() -> "Removing empty group: " + groupUuid);
641-
remove(messageGroup);
638+
this.logger.debug(() -> "Removing empty group: " + groupId);
639+
remove(groupNow);
642640
}
643641
}
644642
finally {
@@ -649,13 +647,13 @@ private void removeEmptyGroupAfterTimeout(MessageGroup messageGroup, long timeou
649647
Thread.currentThread().interrupt();
650648
this.logger.debug(() -> "Thread was interrupted while trying to obtain lock."
651649
+ "Rescheduling empty MessageGroup [ " + groupId + "] for removal.");
652-
removeEmptyGroupAfterTimeout(messageGroup, timeout);
650+
removeEmptyGroupAfterTimeout(groupId, timeout);
653651
}
654652

655653
}, new Date(System.currentTimeMillis() + timeout));
656654

657655
this.logger.debug(() -> "Schedule empty MessageGroup [ " + groupId + "] for removal.");
658-
this.expireGroupScheduledFutures.put(groupUuid, scheduledFuture);
656+
this.expireGroupScheduledFutures.put(groupId, scheduledFuture);
659657
}
660658

661659
private void scheduleGroupToForceComplete(MessageGroup messageGroup) {

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

+20-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -425,24 +425,7 @@ private void releaseMessageAfterDelay(final Message<?> message, long delay) {
425425
this.messageStore.addMessageToGroup(this.messageGroupId, delayedMessage);
426426
}
427427

428-
Runnable releaseTask;
429-
430-
if (this.messageStore instanceof SimpleMessageStore) {
431-
final Message<?> messageToSchedule = delayedMessage;
432-
433-
releaseTask = () -> releaseMessage(messageToSchedule);
434-
}
435-
else {
436-
final UUID messageId = delayedMessage.getHeaders().getId();
437-
438-
releaseTask = () -> {
439-
Message<?> messageToRelease = getMessageById(messageId);
440-
if (messageToRelease != null) {
441-
releaseMessage(messageToRelease);
442-
}
443-
};
444-
}
445-
428+
Runnable releaseTask = releaseTaskForMessage(delayedMessage);
446429
Date startTime = new Date(messageWrapper.getRequestDate() + delay);
447430

448431
if (TransactionSynchronizationManager.isSynchronizationActive() &&
@@ -463,6 +446,21 @@ public void afterCommit() {
463446
}
464447
}
465448

449+
private Runnable releaseTaskForMessage(Message<?> delayedMessage) {
450+
if (this.messageStore instanceof SimpleMessageStore) {
451+
return () -> releaseMessage(delayedMessage);
452+
}
453+
else {
454+
UUID messageId = delayedMessage.getHeaders().getId();
455+
return () -> {
456+
Message<?> messageToRelease = getMessageById(messageId);
457+
if (messageToRelease != null) {
458+
releaseMessage(messageToRelease);
459+
}
460+
};
461+
}
462+
}
463+
466464
private Message<?> getMessageById(UUID messageId) {
467465
Message<?> theMessage = ((MessageStore) this.messageStore).getMessage(messageId);
468466

@@ -531,9 +529,9 @@ private void rescheduleNow(final Message<?> message) {
531529
rescheduleAt(message, new Date());
532530
}
533531

534-
protected void rescheduleAt(final Message<?> message, Date startTime) {
535-
getTaskScheduler()
536-
.schedule(() -> releaseMessage(message), startTime);
532+
protected void rescheduleAt(Message<?> message, Date startTime) {
533+
Runnable releaseTask = releaseTaskForMessage(message);
534+
getTaskScheduler().schedule(releaseTask, startTime);
537535
}
538536

539537
private void doReleaseMessage(Message<?> message) {

0 commit comments

Comments
 (0)