Skip to content

Commit 1755cc6

Browse files
committed
spring-projectsGH-8760 PostgresJdbcChannelMessageStore using DELETE ... RETURNING
Fixes spring-projects#8760 * Add PostgresJdbcChannelMessageStore and tests
1 parent 44433ed commit 1755cc6

File tree

7 files changed

+237
-12
lines changed

7 files changed

+237
-12
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message<?> message)
486486
* @param input Parameter may be null
487487
* @return Returns null when the input is null otherwise the UUID as String.
488488
*/
489-
private String getKey(Object input) {
489+
protected String getKey(Object input) {
490490
return input == null ? null : UUIDConverter.getUUID(input).toString();
491491
}
492492

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store;
18+
19+
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider;
20+
import org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider;
21+
import org.springframework.integration.store.MessageStore;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.util.Assert;
24+
25+
import javax.sql.DataSource;
26+
27+
/**
28+
* Implementation of {@link MessageStore} for Postgres using a single statement to poll for messages.
29+
*
30+
* @author Johannes Edmeier
31+
* @since 6.2
32+
*/
33+
public class PostgresJdbcChannelMessageStore extends JdbcChannelMessageStore {
34+
35+
public PostgresJdbcChannelMessageStore() {
36+
super();
37+
}
38+
39+
public PostgresJdbcChannelMessageStore(DataSource dataSource) {
40+
super(dataSource);
41+
}
42+
43+
@Override
44+
public Message<?> pollMessageFromGroup(Object groupId) {
45+
return doPollForMessage(getKey(groupId));
46+
}
47+
48+
@Override
49+
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) {
50+
Assert.isInstanceOf(DeleteReturningPostgresChannelMessageStoreQueryProvider.class, channelMessageStoreQueryProvider,
51+
"The provided channelMessageStoreQueryProvider must be an instance of DeleteReturningPostgresChannelMessageStoreQueryProvider");
52+
super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.springframework.integration.jdbc.store.channel;
2+
3+
/**
4+
* @author Johannes Edmeier
5+
*
6+
* @since 6.2
7+
*/
8+
public class DeleteReturningPostgresChannelMessageStoreQueryProvider extends PostgresChannelMessageStoreQueryProvider {
9+
10+
@Override
11+
public String getPollFromGroupExcludeIdsQuery() {
12+
return """
13+
delete
14+
from %PREFIX%CHANNEL_MESSAGE
15+
where CTID = (select CTID
16+
from %PREFIX%CHANNEL_MESSAGE
17+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
18+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
19+
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
20+
order by CREATED_DATE, MESSAGE_SEQUENCE
21+
limit 1)
22+
returning MESSAGE_ID, MESSAGE_BYTES;
23+
""";
24+
}
25+
26+
@Override
27+
public String getPollFromGroupQuery() {
28+
return """
29+
delete
30+
from %PREFIX%CHANNEL_MESSAGE
31+
where CTID = (select CTID
32+
from %PREFIX%CHANNEL_MESSAGE
33+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
34+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
35+
order by CREATED_DATE, MESSAGE_SEQUENCE
36+
limit 1)
37+
returning MESSAGE_ID, MESSAGE_BYTES;
38+
""";
39+
}
40+
41+
@Override
42+
public String getPriorityPollFromGroupExcludeIdsQuery() {
43+
return """
44+
delete
45+
from %PREFIX%CHANNEL_MESSAGE
46+
where CTID = (select CTID
47+
from %PREFIX%CHANNEL_MESSAGE
48+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
49+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
50+
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
51+
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
52+
limit 1)
53+
returning MESSAGE_ID, MESSAGE_BYTES;
54+
""";
55+
}
56+
57+
@Override
58+
public String getPriorityPollFromGroupQuery() {
59+
return """
60+
delete
61+
from %PREFIX%CHANNEL_MESSAGE
62+
where CTID = (select CTID
63+
from %PREFIX%CHANNEL_MESSAGE
64+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
65+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
66+
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
67+
limit 1)
68+
returning MESSAGE_ID, MESSAGE_BYTES;
69+
""";
70+
}
71+
72+
}
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
-- Autogenerated: do not edit this file
22

3-
DROP INDEX INT_MESSAGE_IX1 ;
4-
DROP INDEX INT_CHANNEL_MSG_DATE_IDX ;
5-
DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ;
6-
DROP TABLE INT_MESSAGE ;
7-
DROP TABLE INT_MESSAGE_GROUP ;
8-
DROP TABLE INT_GROUP_TO_MESSAGE ;
9-
DROP TABLE INT_LOCK ;
10-
DROP TABLE INT_CHANNEL_MESSAGE ;
11-
DROP TABLE INT_METADATA_STORE ;
12-
DROP SEQUENCE INT_MESSAGE_SEQ ;
3+
DROP INDEX IF EXISTS INT_MESSAGE_IX1 ;
4+
DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ;
5+
DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ;
6+
DROP TABLE IF EXISTS INT_MESSAGE ;
7+
DROP TABLE IF EXISTS INT_MESSAGE_GROUP ;
8+
DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ;
9+
DROP TABLE IF EXISTS INT_LOCK ;
10+
DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ;
11+
DROP TABLE IF EXISTS INT_METADATA_STORE ;
12+
DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ;

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
<property name="password" value="postgres" />
1919
</bean>
2020

21-
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>
21+
<jdbc:initialize-database data-source="dataSource">
22+
<jdbc:script location="classpath:org/springframework/integration/jdbc/schema-drop-postgresql.sql" />
23+
<jdbc:script location="classpath:org/springframework/integration/jdbc/schema-postgresql.sql" />
24+
</jdbc:initialize-database>
25+
26+
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider"/>
2227

2328
</beans>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store.channel;
18+
19+
import org.apache.commons.dbcp2.BasicDataSource;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.core.io.ClassPathResource;
24+
import org.springframework.integration.config.EnableIntegration;
25+
import org.springframework.integration.jdbc.channel.PostgresContainerTest;
26+
import org.springframework.integration.jdbc.store.PostgresJdbcChannelMessageStore;
27+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
28+
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
29+
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
30+
import org.springframework.jdbc.datasource.init.ScriptUtils;
31+
import org.springframework.test.context.ContextConfiguration;
32+
import org.springframework.transaction.PlatformTransactionManager;
33+
34+
import javax.sql.DataSource;
35+
36+
/**
37+
* @author Johannes Edmeier
38+
*/
39+
@ContextConfiguration
40+
public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest {
41+
@BeforeEach
42+
@Override
43+
public void init() {
44+
messageStore = new PostgresJdbcChannelMessageStore(dataSource);
45+
messageStore.setRegion(REGION);
46+
messageStore.setChannelMessageStoreQueryProvider(queryProvider);
47+
messageStore.afterPropertiesSet();
48+
messageStore.removeMessageGroup("AbstractJdbcChannelMessageStoreTests");
49+
}
50+
51+
@Configuration
52+
@EnableIntegration
53+
public static class Config {
54+
55+
@Bean
56+
public DataSource dataSource() {
57+
BasicDataSource dataSource = new BasicDataSource();
58+
dataSource.setUrl(PostgresContainerTest.getJdbcUrl());
59+
dataSource.setUsername(PostgresContainerTest.getUsername());
60+
dataSource.setPassword(PostgresContainerTest.getPassword());
61+
return dataSource;
62+
}
63+
64+
@Bean
65+
DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
66+
DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
67+
dataSourceInitializer.setDataSource(dataSource);
68+
ResourceDatabasePopulator databasePopulator =
69+
new ResourceDatabasePopulator(new ClassPathResource("org/springframework/integration/jdbc/schema-drop-postgresql.sql"),
70+
new ClassPathResource("org/springframework/integration/jdbc/schema-postgresql.sql"));
71+
databasePopulator.setSeparator(ScriptUtils.EOF_STATEMENT_SEPARATOR);
72+
dataSourceInitializer.setDatabasePopulator(
73+
databasePopulator);
74+
return dataSourceInitializer;
75+
}
76+
77+
@Bean
78+
PlatformTransactionManager transactionManager(DataSource dataSource) {
79+
return new DataSourceTransactionManager(dataSource);
80+
}
81+
82+
@Bean
83+
DeleteReturningPostgresChannelMessageStoreQueryProvider queryProvider() {
84+
return new DeleteReturningPostgresChannelMessageStoreQueryProvider();
85+
}
86+
}
87+
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,12 @@
77

88
<import resource="classpath:org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml"/>
99

10+
<bean id="store" class="org.springframework.integration.jdbc.store.PostgresJdbcChannelMessageStore">
11+
<property name="dataSource" ref="dataSource"/>
12+
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
13+
<property name="region" value="TX_TIMEOUT"/>
14+
<property name="usingIdCache" value="true"/>
15+
</bean>
16+
1017
</beans>
1118

0 commit comments

Comments
 (0)