Skip to content

Commit 906c25b

Browse files
committed
spring-projectsGH-3785: Close stream for persistent collection
Fixes spring-projects#3785 * Fix `CollectionArgumentResolver` and `PayloadsArgumentResolver` to close the `Stream` of message after its usage * Rework `AbstractKeyValueMessageStore.removeMessagesFromGroup()` to iterate input collection of messages not its stream to avoid the mentioned problem **Cherry-pick to `5.5.x`**
1 parent 860f9fe commit 906c25b

File tree

3 files changed

+27
-24
lines changed

3 files changed

+27
-24
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.Iterator;
2222
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
2324

2425
import org.springframework.core.MethodParameter;
2526
import org.springframework.core.convert.TypeDescriptor;
@@ -71,17 +72,16 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
7172
if (this.canProcessMessageList) {
7273
Assert.state(value instanceof Collection,
7374
"This Argument Resolver only supports messages with a payload of Collection<Message<?>>, "
74-
+ "payload is: " + value.getClass());
75+
+ "payload is: " + value.getClass());
7576

7677
Collection<Message<?>> messages = (Collection<Message<?>>) value;
7778

78-
if (Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) {
79-
value = messages;
80-
}
81-
else {
82-
value = messages.stream()
83-
.map(Message::getPayload)
84-
.collect(Collectors.toList());
79+
if (!Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) {
80+
try (Stream<Message<?>> messageStream = messages.stream()) {
81+
value = messageStream
82+
.map(Message::getPayload)
83+
.collect(Collectors.toList());
84+
}
8585
}
8686
}
8787

spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
import org.springframework.core.MethodParameter;
2627
import org.springframework.core.convert.TypeDescriptor;
@@ -78,15 +79,17 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
7879
return evaluateExpression(expression, messages, parameter.getParameterType());
7980
}
8081
else {
81-
List<?> payloads = messages.stream()
82-
.map(Message::getPayload)
83-
.collect(Collectors.toList());
82+
try (Stream<Message<?>> messageStream = messages.stream()) {
83+
List<?> payloads = messageStream
84+
.map(Message::getPayload)
85+
.collect(Collectors.toList());
86+
return getEvaluationContext()
87+
.getTypeConverter()
88+
.convertValue(payloads,
89+
TypeDescriptor.forObject(payloads),
90+
TypeDescriptor.valueOf(parameter.getParameterType()));
91+
}
8492

85-
return getEvaluationContext()
86-
.getTypeConverter()
87-
.convertValue(payloads,
88-
TypeDescriptor.forObject(payloads),
89-
TypeDescriptor.valueOf(parameter.getParameterType()));
9093
}
9194
}
9295

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,17 +244,17 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
244244
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
245245
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
246246

247-
List<UUID> ids =
248-
messages.stream()
249-
.map(messageToRemove -> messageToRemove.getHeaders().getId())
250-
.collect(Collectors.toList());
247+
List<UUID> ids = new ArrayList<>();
248+
for (Message<?> messageToRemove : messages) {
249+
ids.add(messageToRemove.getHeaders().getId());
250+
}
251251

252252
messageGroupMetadata.removeAll(ids);
253253

254-
List<Object> messageIds =
255-
ids.stream()
256-
.map(id -> this.messagePrefix + id)
257-
.collect(Collectors.toList());
254+
List<Object> messageIds = new ArrayList<>();
255+
for (UUID id : ids) {
256+
messageIds.add(this.messagePrefix + id);
257+
}
258258

259259
doRemoveAll(messageIds);
260260

0 commit comments

Comments
 (0)