Skip to content

Commit e454f59

Browse files
authored
GH-3785: Close stream for persistent collection (#3786)
* GH-3785: Close stream for persistent collection Fixes #3785 * Fix `CollectionArgumentResolver` and `PayloadsArgumentResolver` to close the `Stream` of message after its usage * Rework `AbstractKeyValueMessageStore.removeMessagesFromGroup()` to iterate input collection of messages not its stream to avoid the mentioned problem **Cherry-pick to `5.5.x`** * * Add `JdbcMessageStoreTests.testMessageGroupStreamNoConnectionPoolLeak()` to ensure that no leaks in the connection pool anymore. * Improve `MessageGroupStore.streamMessagesForGroup()` JavaDocs about requirements to close the `Stream` from persistent message store impls
1 parent 860f9fe commit e454f59

File tree

5 files changed

+95
-37
lines changed

5 files changed

+95
-37
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.Iterator;
2222
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
2324

2425
import org.springframework.core.MethodParameter;
2526
import org.springframework.core.convert.TypeDescriptor;
@@ -71,17 +72,16 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
7172
if (this.canProcessMessageList) {
7273
Assert.state(value instanceof Collection,
7374
"This Argument Resolver only supports messages with a payload of Collection<Message<?>>, "
74-
+ "payload is: " + value.getClass());
75+
+ "payload is: " + value.getClass());
7576

7677
Collection<Message<?>> messages = (Collection<Message<?>>) value;
7778

78-
if (Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) {
79-
value = messages;
80-
}
81-
else {
82-
value = messages.stream()
83-
.map(Message::getPayload)
84-
.collect(Collectors.toList());
79+
if (!Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) {
80+
try (Stream<Message<?>> messageStream = messages.stream()) {
81+
value = messageStream
82+
.map(Message::getPayload)
83+
.collect(Collectors.toList());
84+
}
8585
}
8686
}
8787

spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
import org.springframework.core.MethodParameter;
2627
import org.springframework.core.convert.TypeDescriptor;
@@ -78,15 +79,17 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
7879
return evaluateExpression(expression, messages, parameter.getParameterType());
7980
}
8081
else {
81-
List<?> payloads = messages.stream()
82-
.map(Message::getPayload)
83-
.collect(Collectors.toList());
82+
try (Stream<Message<?>> messageStream = messages.stream()) {
83+
List<?> payloads = messageStream
84+
.map(Message::getPayload)
85+
.collect(Collectors.toList());
86+
return getEvaluationContext()
87+
.getTypeConverter()
88+
.convertValue(payloads,
89+
TypeDescriptor.forObject(payloads),
90+
TypeDescriptor.valueOf(parameter.getParameterType()));
91+
}
8492

85-
return getEvaluationContext()
86-
.getTypeConverter()
87-
.convertValue(payloads,
88-
TypeDescriptor.forObject(payloads),
89-
TypeDescriptor.valueOf(parameter.getParameterType()));
9093
}
9194
}
9295

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -244,17 +244,17 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
244244
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
245245
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
246246

247-
List<UUID> ids =
248-
messages.stream()
249-
.map(messageToRemove -> messageToRemove.getHeaders().getId())
250-
.collect(Collectors.toList());
247+
List<UUID> ids = new ArrayList<>();
248+
for (Message<?> messageToRemove : messages) {
249+
ids.add(messageToRemove.getHeaders().getId());
250+
}
251251

252252
messageGroupMetadata.removeAll(ids);
253253

254-
List<Object> messageIds =
255-
ids.stream()
256-
.map(id -> this.messagePrefix + id)
257-
.collect(Collectors.toList());
254+
List<Object> messageIds = new ArrayList<>();
255+
for (UUID id : ids) {
256+
messageIds.add(this.messagePrefix + id);
257+
}
258258

259259
doRemoveAll(messageIds);
260260

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java

+3
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ public interface MessageGroupStore extends BasicMessageGroupStore {
156156

157157
/**
158158
* Return a stream for messages stored in the provided group.
159+
* The persistent implementations return a Stream which has
160+
* to be closed once fully processed (e.g. through a try-with-resources clause).
161+
* By default, it streams a result of {@link #getMessagesForGroup(Object)}.
159162
* @param groupId the group id to retrieve messages.
160163
* @return the {@link Stream} for messages in this group.
161164
* @since 5.5

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/JdbcMessageStoreTests.java

+65-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,10 @@
2020

2121
import java.io.BufferedReader;
2222
import java.io.InputStreamReader;
23+
import java.lang.reflect.Method;
2324
import java.sql.Timestamp;
2425
import java.util.ArrayList;
26+
import java.util.Collection;
2527
import java.util.List;
2628
import java.util.Properties;
2729
import java.util.UUID;
@@ -30,12 +32,23 @@
3032

3133
import javax.sql.DataSource;
3234

35+
import org.apache.commons.dbcp2.DataSourceConnectionFactory;
36+
import org.apache.commons.dbcp2.PoolableConnection;
37+
import org.apache.commons.dbcp2.PoolableConnectionFactory;
38+
import org.apache.commons.dbcp2.PoolingDataSource;
39+
import org.apache.commons.pool2.ObjectPool;
40+
import org.apache.commons.pool2.impl.GenericObjectPool;
41+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
3342
import org.junit.jupiter.api.BeforeEach;
3443
import org.junit.jupiter.api.Test;
3544

3645
import org.springframework.beans.factory.annotation.Autowired;
46+
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
47+
import org.springframework.core.MethodParameter;
48+
import org.springframework.core.annotation.SynthesizingMethodParameter;
3749
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3850
import org.springframework.integration.channel.DirectChannel;
51+
import org.springframework.integration.handler.support.CollectionArgumentResolver;
3952
import org.springframework.integration.history.MessageHistory;
4053
import org.springframework.integration.store.MessageGroup;
4154
import org.springframework.integration.support.MessageBuilder;
@@ -47,6 +60,7 @@
4760
import org.springframework.messaging.support.GenericMessage;
4861
import org.springframework.test.annotation.DirtiesContext;
4962
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
63+
import org.springframework.transaction.annotation.Propagation;
5064
import org.springframework.transaction.annotation.Transactional;
5165

5266
/**
@@ -90,7 +104,6 @@ public void testAddAndGet() {
90104

91105
@Test
92106
public void testWithMessageHistory() {
93-
94107
Message<?> message = new GenericMessage<>("Hello");
95108
DirectChannel fooChannel = new DirectChannel();
96109
fooChannel.setBeanName("fooChannel");
@@ -185,14 +198,14 @@ public void testAddAndUpdateWithChange() {
185198
}
186199

187200
@Test
188-
public void testAddAndRemoveMessageGroup() throws Exception {
201+
public void testAddAndRemoveMessageGroup() {
189202
Message<String> message = MessageBuilder.withPayload("foo").build();
190203
message = messageStore.addMessage(message);
191204
assertThat(messageStore.removeMessage(message.getHeaders().getId())).isNotNull();
192205
}
193206

194207
@Test
195-
public void testAddAndGetMessageGroup() throws Exception {
208+
public void testAddAndGetMessageGroup() {
196209
String groupId = "X";
197210
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
198211
long now = System.currentTimeMillis();
@@ -203,7 +216,7 @@ public void testAddAndGetMessageGroup() throws Exception {
203216
}
204217

205218
@Test
206-
public void testAddAndRemoveMessageFromMessageGroup() throws Exception {
219+
public void testAddAndRemoveMessageFromMessageGroup() {
207220
String groupId = "X";
208221
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
209222
messageStore.addMessagesToGroup(groupId, message);
@@ -213,7 +226,7 @@ public void testAddAndRemoveMessageFromMessageGroup() throws Exception {
213226
}
214227

215228
@Test
216-
public void testAddAndRemoveMessagesFromMessageGroup() throws Exception {
229+
public void testAddAndRemoveMessagesFromMessageGroup() {
217230
String groupId = "X";
218231
this.messageStore.setRemoveBatchSize(10);
219232
List<Message<?>> messages = new ArrayList<>();
@@ -230,7 +243,7 @@ public void testAddAndRemoveMessagesFromMessageGroup() throws Exception {
230243
}
231244

232245
@Test
233-
public void testRemoveMessageGroup() throws Exception {
246+
public void testRemoveMessageGroup() {
234247
JdbcTemplate template = new JdbcTemplate(this.dataSource);
235248
template.afterPropertiesSet();
236249
String groupId = "X";
@@ -247,7 +260,7 @@ public void testRemoveMessageGroup() throws Exception {
247260
}
248261

249262
@Test
250-
public void testCompleteMessageGroup() throws Exception {
263+
public void testCompleteMessageGroup() {
251264
String groupId = "X";
252265
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
253266
messageStore.addMessagesToGroup(groupId, message);
@@ -319,7 +332,7 @@ public void testExpireMessageGroupOnCreateOnly() throws Exception {
319332
template.afterPropertiesSet();
320333

321334
template.update("UPDATE INT_MESSAGE_GROUP set CREATED_DATE=? where GROUP_KEY=? and REGION=?",
322-
(PreparedStatementSetter) ps -> {
335+
ps -> {
323336
ps.setTimestamp(1, new Timestamp(System.currentTimeMillis() - 10000));
324337
ps.setString(2, UUIDConverter.getUUID(groupId).toString());
325338
ps.setString(3, "DEFAULT");
@@ -333,7 +346,7 @@ public void testExpireMessageGroupOnCreateOnly() throws Exception {
333346
}
334347

335348
@Test
336-
public void testExpireMessageGroupOnIdleOnly() throws Exception {
349+
public void testExpireMessageGroupOnIdleOnly() {
337350
String groupId = "X";
338351
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
339352
messageStore.setTimeoutOnIdle(true);
@@ -406,7 +419,6 @@ public void testMessagePollingFromTheGroup() throws Exception {
406419

407420
@Test
408421
public void testSameMessageToMultipleGroups() {
409-
410422
final String group1Id = "group1";
411423
final String group2Id = "group2";
412424

@@ -436,7 +448,7 @@ public void testSameMessageToMultipleGroups() {
436448
}
437449

438450
@Test
439-
public void testSameMessageAndGroupToMultipleRegions() throws Exception {
451+
public void testSameMessageAndGroupToMultipleRegions() {
440452

441453
final String groupId = "myGroup";
442454
final String region1 = "region1";
@@ -474,7 +486,7 @@ public void testSameMessageAndGroupToMultipleRegions() throws Exception {
474486
}
475487

476488
@Test
477-
public void testCompletedNotExpiredGroupINT3037() throws Exception {
489+
public void testCompletedNotExpiredGroupINT3037() {
478490
/*
479491
* based on the aggregator scenario as follows;
480492
*
@@ -528,4 +540,44 @@ public void testMessageGroupCondition() {
528540
assertThat(this.messageStore.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition");
529541
}
530542

543+
544+
@Test
545+
@Transactional(propagation = Propagation.NEVER)
546+
public void testMessageGroupStreamNoConnectionPoolLeak() throws NoSuchMethodException {
547+
DataSourceConnectionFactory connFactory = new DataSourceConnectionFactory(this.dataSource);
548+
PoolableConnectionFactory poolFactory = new PoolableConnectionFactory(connFactory, null);
549+
GenericObjectPoolConfig<PoolableConnection> config = new GenericObjectPoolConfig<>();
550+
config.setMaxTotal(2);
551+
config.setMaxWaitMillis(500);
552+
ObjectPool<PoolableConnection> connPool = new GenericObjectPool<>(poolFactory, config);
553+
poolFactory.setPool(connPool);
554+
PoolingDataSource<PoolableConnection> poolingDataSource = new PoolingDataSource<>(connPool);
555+
556+
JdbcMessageStore pooledMessageStore = new JdbcMessageStore(poolingDataSource);
557+
558+
CollectionArgumentResolver collectionArgumentResolver = new CollectionArgumentResolver(true);
559+
collectionArgumentResolver.setBeanFactory(new DefaultListableBeanFactory());
560+
Method methodForCollectionOfPayloads = getClass().getMethod("methodForCollectionOfPayloads", Collection.class);
561+
MethodParameter methodParameter = SynthesizingMethodParameter.forExecutable(methodForCollectionOfPayloads, 0);
562+
563+
String groupId = "X";
564+
Message<String> message = MessageBuilder.withPayload("test data").build();
565+
pooledMessageStore.addMessagesToGroup(groupId, message);
566+
567+
// Before the stream close fix in the 'CollectionArgumentResolver'
568+
// it failed with "Cannot get a connection, pool error Timeout waiting for idle object"
569+
for (int i = 0; i < 3; i++) {
570+
Object result =
571+
collectionArgumentResolver.resolveArgument(methodParameter,
572+
new GenericMessage<>(pooledMessageStore.getMessageGroup(groupId).getMessages()));
573+
574+
assertThat(result).isInstanceOf(Collection.class).asList().hasSize(1).contains("test data");
575+
}
576+
577+
pooledMessageStore.removeMessageGroup(groupId);
578+
}
579+
580+
public void methodForCollectionOfPayloads(Collection<String> payloads) {
581+
}
582+
531583
}

0 commit comments

Comments
 (0)