Skip to content

Commit 07a1720

Browse files
artembilangaryrussell
authored andcommitted
GH-8685: Re-fetch group after setting condition (#8686)
* GH-8685: Re-fetch group after setting condition Fixes #8685 The `AbstractCorrelatingMessageHandler` updates the group metadata in DB not only for provided `condition`, but also a `lastModified` field. A subsequent scheduling for group timeout takes the `lastModified` to compare with the value in the store after re-fetching group in task. This does not reflect reality since adding `condition` modifies the data in DB, but in-memory state remains the same. * Re-fetch a group from the store in the `AbstractCorrelatingMessageHandler.setGroupConditionIfAny()`. * Verify expected behavior via new `ConfigurableMongoDbMessageGroupStoreTests.groupIsForceReleaseAfterTimeoutWhenGroupConditionIsSet()` **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`** * * Fix Checkstyle violation in the test
1 parent 758997e commit 07a1720

File tree

3 files changed

+38
-6
lines changed

3 files changed

+38
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -565,7 +565,7 @@ private boolean processMessageForGroup(Message<?> message, Object correlationKey
565565
this.logger.trace(() -> "Adding message to group [ " + messageGroupToLog + "]");
566566
messageGroup = store(correlationKey, message);
567567

568-
setGroupConditionIfAny(message, messageGroup);
568+
messageGroup = setGroupConditionIfAny(message, messageGroup);
569569

570570
if (this.releaseStrategy.canRelease(messageGroup)) {
571571
Collection<Message<?>> completedMessages = null;
@@ -604,12 +604,19 @@ private void cancelScheduledFutureIfAny(Object correlationKey, UUID groupIdUuid,
604604
}
605605
}
606606

607-
private void setGroupConditionIfAny(Message<?> message, MessageGroup messageGroup) {
607+
private MessageGroup setGroupConditionIfAny(Message<?> message, MessageGroup messageGroup) {
608+
MessageGroup messageGroupToUse = messageGroup;
609+
608610
if (this.groupConditionSupplier != null) {
609-
String condition = this.groupConditionSupplier.apply(message, messageGroup.getCondition());
610-
this.messageStore.setGroupCondition(messageGroup.getGroupId(), condition);
611-
messageGroup.setCondition(condition);
611+
String condition = this.groupConditionSupplier.apply(message, messageGroupToUse.getCondition());
612+
this.messageStore.setGroupCondition(messageGroupToUse.getGroupId(), condition);
613+
messageGroupToUse = this.messageStore.getMessageGroup(messageGroupToUse.getGroupId());
614+
if (this.sequenceAware) {
615+
messageGroupToUse = new SequenceAwareMessageGroup(messageGroupToUse);
616+
}
612617
}
618+
619+
return messageGroupToUse;
613620
}
614621

615622
protected boolean isExpireGroupsUponCompletion() {

spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageGroupStoreTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,29 @@ public void messageGroupStoreLazyLoadPerformance() {
8181
// System. out .println(watch.prettyPrint()); // checkstyle
8282
}
8383

84+
@Test
85+
void groupIsForceReleaseAfterTimeoutWhenGroupConditionIsSet() {
86+
try (var context = new ClassPathXmlApplicationContext("mongo-aggregator-configurable-config.xml", getClass())) {
87+
MessageChannel input = context.getBean("inputChannel", MessageChannel.class);
88+
QueueChannel output = context.getBean("outputChannel", QueueChannel.class);
89+
90+
Message<?> message = MessageBuilder.withPayload("test")
91+
.setSequenceNumber(1)
92+
.setSequenceSize(10)
93+
.setCorrelationId("test")
94+
.build();
95+
96+
input.send(message);
97+
98+
Message<?> receive = output.receive(10_000);
99+
100+
assertThat(receive)
101+
.extracting("payload")
102+
.asList()
103+
.hasSize(1);
104+
}
105+
}
106+
84107
private void performLazyLoadEagerTest(StopWatch watch, int sequenceSize, boolean lazyLoad) {
85108
ClassPathXmlApplicationContext context =
86109
new ClassPathXmlApplicationContext("mongo-aggregator-configurable-config.xml", getClass());

spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/mongo-aggregator-configurable-config.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoStore"
1313
release-strategy="releaseStrategy"
14+
group-timeout="500"
15+
send-partial-result-on-expiry="true"
1416
group-condition-supplier="conditionSupplier"/>
1517

1618
<util:constant id="releaseStrategy"

0 commit comments

Comments
 (0)