diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java index 2ddbe21a95a..389a728aeba 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/CollectionArgumentResolver.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.springframework.core.MethodParameter; import org.springframework.core.convert.TypeDescriptor; @@ -71,17 +72,16 @@ public Object resolveArgument(MethodParameter parameter, Message message) { if (this.canProcessMessageList) { Assert.state(value instanceof Collection, "This Argument Resolver only supports messages with a payload of Collection>, " - + "payload is: " + value.getClass()); + + "payload is: " + value.getClass()); Collection> messages = (Collection>) value; - if (Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) { - value = messages; - } - else { - value = messages.stream() - .map(Message::getPayload) - .collect(Collectors.toList()); + if (!Message.class.isAssignableFrom(parameter.nested().getNestedParameterType())) { + try (Stream> messageStream = messages.stream()) { + value = messageStream + .map(Message::getPayload) + .collect(Collectors.toList()); + } } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java index 1e1c219dcb8..28bdfc6bf2f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/PayloadsArgumentResolver.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.springframework.core.MethodParameter; import org.springframework.core.convert.TypeDescriptor; @@ -78,15 +79,17 @@ public Object resolveArgument(MethodParameter parameter, Message message) { return evaluateExpression(expression, messages, parameter.getParameterType()); } else { - List payloads = messages.stream() - .map(Message::getPayload) - .collect(Collectors.toList()); + try (Stream> messageStream = messages.stream()) { + List payloads = messageStream + .map(Message::getPayload) + .collect(Collectors.toList()); + return getEvaluationContext() + .getTypeConverter() + .convertValue(payloads, + TypeDescriptor.forObject(payloads), + TypeDescriptor.valueOf(parameter.getParameterType())); + } - return getEvaluationContext() - .getTypeConverter() - .convertValue(payloads, - TypeDescriptor.forObject(payloads), - TypeDescriptor.valueOf(parameter.getParameterType())); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index 351f052852e..68505a830b9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -244,17 +244,17 @@ public void removeMessagesFromGroup(Object groupId, Collection> messa Assert.isInstanceOf(MessageGroupMetadata.class, mgm); MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm; - List ids = - messages.stream() - .map(messageToRemove -> messageToRemove.getHeaders().getId()) - .collect(Collectors.toList()); + List ids = new ArrayList<>(); + for (Message messageToRemove : messages) { + ids.add(messageToRemove.getHeaders().getId()); + } messageGroupMetadata.removeAll(ids); - List messageIds = - ids.stream() - .map(id -> this.messagePrefix + id) - .collect(Collectors.toList()); + List messageIds = new ArrayList<>(); + for (UUID id : ids) { + messageIds.add(this.messagePrefix + id); + } doRemoveAll(messageIds); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java index 90591e6193c..48c90ed6c39 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java @@ -156,6 +156,9 @@ public interface MessageGroupStore extends BasicMessageGroupStore { /** * Return a stream for messages stored in the provided group. + * The persistent implementations return a Stream which has + * to be closed once fully processed (e.g. through a try-with-resources clause). + * By default, it streams a result of {@link #getMessagesForGroup(Object)}. * @param groupId the group id to retrieve messages. * @return the {@link Stream} for messages in this group. * @since 5.5 diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/JdbcMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/JdbcMessageStoreTests.java index 0499453c68c..651ad9de8d4 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/JdbcMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/JdbcMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,10 @@ import java.io.BufferedReader; import java.io.InputStreamReader; +import java.lang.reflect.Method; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -30,12 +32,23 @@ import javax.sql.DataSource; +import org.apache.commons.dbcp2.DataSourceConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnection; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.core.MethodParameter; +import org.springframework.core.annotation.SynthesizingMethodParameter; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.handler.support.CollectionArgumentResolver; import org.springframework.integration.history.MessageHistory; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.support.MessageBuilder; @@ -47,6 +60,7 @@ import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; /** @@ -90,7 +104,6 @@ public void testAddAndGet() { @Test public void testWithMessageHistory() { - Message message = new GenericMessage<>("Hello"); DirectChannel fooChannel = new DirectChannel(); fooChannel.setBeanName("fooChannel"); @@ -185,14 +198,14 @@ public void testAddAndUpdateWithChange() { } @Test - public void testAddAndRemoveMessageGroup() throws Exception { + public void testAddAndRemoveMessageGroup() { Message message = MessageBuilder.withPayload("foo").build(); message = messageStore.addMessage(message); assertThat(messageStore.removeMessage(message.getHeaders().getId())).isNotNull(); } @Test - public void testAddAndGetMessageGroup() throws Exception { + public void testAddAndGetMessageGroup() { String groupId = "X"; Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); long now = System.currentTimeMillis(); @@ -203,7 +216,7 @@ public void testAddAndGetMessageGroup() throws Exception { } @Test - public void testAddAndRemoveMessageFromMessageGroup() throws Exception { + public void testAddAndRemoveMessageFromMessageGroup() { String groupId = "X"; Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); messageStore.addMessagesToGroup(groupId, message); @@ -213,7 +226,7 @@ public void testAddAndRemoveMessageFromMessageGroup() throws Exception { } @Test - public void testAddAndRemoveMessagesFromMessageGroup() throws Exception { + public void testAddAndRemoveMessagesFromMessageGroup() { String groupId = "X"; this.messageStore.setRemoveBatchSize(10); List> messages = new ArrayList<>(); @@ -230,7 +243,7 @@ public void testAddAndRemoveMessagesFromMessageGroup() throws Exception { } @Test - public void testRemoveMessageGroup() throws Exception { + public void testRemoveMessageGroup() { JdbcTemplate template = new JdbcTemplate(this.dataSource); template.afterPropertiesSet(); String groupId = "X"; @@ -247,7 +260,7 @@ public void testRemoveMessageGroup() throws Exception { } @Test - public void testCompleteMessageGroup() throws Exception { + public void testCompleteMessageGroup() { String groupId = "X"; Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); messageStore.addMessagesToGroup(groupId, message); @@ -319,7 +332,7 @@ public void testExpireMessageGroupOnCreateOnly() throws Exception { template.afterPropertiesSet(); template.update("UPDATE INT_MESSAGE_GROUP set CREATED_DATE=? where GROUP_KEY=? and REGION=?", - (PreparedStatementSetter) ps -> { + ps -> { ps.setTimestamp(1, new Timestamp(System.currentTimeMillis() - 10000)); ps.setString(2, UUIDConverter.getUUID(groupId).toString()); ps.setString(3, "DEFAULT"); @@ -333,7 +346,7 @@ public void testExpireMessageGroupOnCreateOnly() throws Exception { } @Test - public void testExpireMessageGroupOnIdleOnly() throws Exception { + public void testExpireMessageGroupOnIdleOnly() { String groupId = "X"; Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); messageStore.setTimeoutOnIdle(true); @@ -406,7 +419,6 @@ public void testMessagePollingFromTheGroup() throws Exception { @Test public void testSameMessageToMultipleGroups() { - final String group1Id = "group1"; final String group2Id = "group2"; @@ -436,7 +448,7 @@ public void testSameMessageToMultipleGroups() { } @Test - public void testSameMessageAndGroupToMultipleRegions() throws Exception { + public void testSameMessageAndGroupToMultipleRegions() { final String groupId = "myGroup"; final String region1 = "region1"; @@ -474,7 +486,7 @@ public void testSameMessageAndGroupToMultipleRegions() throws Exception { } @Test - public void testCompletedNotExpiredGroupINT3037() throws Exception { + public void testCompletedNotExpiredGroupINT3037() { /* * based on the aggregator scenario as follows; * @@ -528,4 +540,44 @@ public void testMessageGroupCondition() { assertThat(this.messageStore.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition"); } + + @Test + @Transactional(propagation = Propagation.NEVER) + public void testMessageGroupStreamNoConnectionPoolLeak() throws NoSuchMethodException { + DataSourceConnectionFactory connFactory = new DataSourceConnectionFactory(this.dataSource); + PoolableConnectionFactory poolFactory = new PoolableConnectionFactory(connFactory, null); + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(2); + config.setMaxWaitMillis(500); + ObjectPool connPool = new GenericObjectPool<>(poolFactory, config); + poolFactory.setPool(connPool); + PoolingDataSource poolingDataSource = new PoolingDataSource<>(connPool); + + JdbcMessageStore pooledMessageStore = new JdbcMessageStore(poolingDataSource); + + CollectionArgumentResolver collectionArgumentResolver = new CollectionArgumentResolver(true); + collectionArgumentResolver.setBeanFactory(new DefaultListableBeanFactory()); + Method methodForCollectionOfPayloads = getClass().getMethod("methodForCollectionOfPayloads", Collection.class); + MethodParameter methodParameter = SynthesizingMethodParameter.forExecutable(methodForCollectionOfPayloads, 0); + + String groupId = "X"; + Message message = MessageBuilder.withPayload("test data").build(); + pooledMessageStore.addMessagesToGroup(groupId, message); + + // Before the stream close fix in the 'CollectionArgumentResolver' + // it failed with "Cannot get a connection, pool error Timeout waiting for idle object" + for (int i = 0; i < 3; i++) { + Object result = + collectionArgumentResolver.resolveArgument(methodParameter, + new GenericMessage<>(pooledMessageStore.getMessageGroup(groupId).getMessages())); + + assertThat(result).isInstanceOf(Collection.class).asList().hasSize(1).contains("test data"); + } + + pooledMessageStore.removeMessageGroup(groupId); + } + + public void methodForCollectionOfPayloads(Collection payloads) { + } + }