Skip to content

Commit 459f163

Browse files
committed
spring-projectsGH-8760 PostgresJdbcChannelMessageStore using DELETE ... RETURNING
Fixes spring-projects#8760 * Add PostgresJdbcChannelMessageStore and tests
1 parent 44433ed commit 459f163

File tree

8 files changed

+197
-15
lines changed

8 files changed

+197
-15
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message<?> message)
486486
* @param input Parameter may be null
487487
* @return Returns null when the input is null otherwise the UUID as String.
488488
*/
489-
private String getKey(Object input) {
489+
protected String getKey(Object input) {
490490
return input == null ? null : UUIDConverter.getUUID(input).toString();
491491
}
492492

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store;
18+
19+
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider;
20+
import org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider;
21+
import org.springframework.integration.store.MessageStore;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.util.Assert;
24+
25+
import javax.sql.DataSource;
26+
27+
/**
28+
* Implementation of {@link MessageStore} for Postgres using a single statement to poll for messages.
29+
*
30+
* @author Johannes Edmeier
31+
* @since 6.2
32+
*/
33+
public class PostgresJdbcMessageStore extends JdbcChannelMessageStore {
34+
35+
public PostgresJdbcMessageStore() {
36+
super();
37+
}
38+
39+
public PostgresJdbcMessageStore(DataSource dataSource) {
40+
super(dataSource);
41+
}
42+
43+
@Override
44+
public Message<?> pollMessageFromGroup(Object groupId) {
45+
return doPollForMessage(getKey(groupId));
46+
}
47+
48+
@Override
49+
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) {
50+
Assert.isInstanceOf(DeleteReturningPostgresChannelMessageStoreQueryProvider.class, channelMessageStoreQueryProvider,
51+
"The provided channelMessageStoreQueryProvider must be an instance of DeleteReturningPostgresChannelMessageStoreQueryProvider");
52+
super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.springframework.integration.jdbc.store.channel;
2+
3+
/**
4+
* @author Johannes Edmeier
5+
*
6+
* @since 6.2
7+
*/
8+
public class DeleteReturningPostgresChannelMessageStoreQueryProvider extends PostgresChannelMessageStoreQueryProvider {
9+
10+
@Override
11+
public String getPollFromGroupExcludeIdsQuery() {
12+
return """
13+
delete
14+
from %PREFIX%CHANNEL_MESSAGE
15+
where CTID = (select CTID
16+
from %PREFIX%CHANNEL_MESSAGE
17+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
18+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
19+
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
20+
order by CREATED_DATE, MESSAGE_SEQUENCE
21+
limit 1)
22+
returning MESSAGE_ID, MESSAGE_BYTES;
23+
""";
24+
}
25+
26+
@Override
27+
public String getPollFromGroupQuery() {
28+
return """
29+
delete
30+
from %PREFIX%CHANNEL_MESSAGE
31+
where CTID = (select CTID
32+
from %PREFIX%CHANNEL_MESSAGE
33+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
34+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
35+
order by CREATED_DATE, MESSAGE_SEQUENCE
36+
limit 1)
37+
returning MESSAGE_ID, MESSAGE_BYTES;
38+
""";
39+
}
40+
41+
@Override
42+
public String getPriorityPollFromGroupExcludeIdsQuery() {
43+
return """
44+
delete
45+
from %PREFIX%CHANNEL_MESSAGE
46+
where CTID = (select CTID
47+
from %PREFIX%CHANNEL_MESSAGE
48+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
49+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
50+
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
51+
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
52+
limit 1)
53+
returning MESSAGE_ID, MESSAGE_BYTES;
54+
""";
55+
}
56+
57+
@Override
58+
public String getPriorityPollFromGroupQuery() {
59+
return """
60+
delete
61+
from %PREFIX%CHANNEL_MESSAGE
62+
where CTID = (select CTID
63+
from %PREFIX%CHANNEL_MESSAGE
64+
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
65+
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
66+
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
67+
limit 1)
68+
returning MESSAGE_ID, MESSAGE_BYTES;
69+
""";
70+
}
71+
72+
}
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
-- Autogenerated: do not edit this file
22

3-
DROP INDEX INT_MESSAGE_IX1 ;
4-
DROP INDEX INT_CHANNEL_MSG_DATE_IDX ;
5-
DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ;
6-
DROP TABLE INT_MESSAGE ;
7-
DROP TABLE INT_MESSAGE_GROUP ;
8-
DROP TABLE INT_GROUP_TO_MESSAGE ;
9-
DROP TABLE INT_LOCK ;
10-
DROP TABLE INT_CHANNEL_MESSAGE ;
11-
DROP TABLE INT_METADATA_STORE ;
12-
DROP SEQUENCE INT_MESSAGE_SEQ ;
3+
DROP INDEX IF EXISTS INT_MESSAGE_IX1 ;
4+
DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ;
5+
DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ;
6+
DROP TABLE IF EXISTS INT_MESSAGE ;
7+
DROP TABLE IF EXISTS INT_MESSAGE_GROUP ;
8+
DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ;
9+
DROP TABLE IF EXISTS INT_LOCK ;
10+
DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ;
11+
DROP TABLE IF EXISTS INT_METADATA_STORE ;
12+
DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ;

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010

1111
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource"
1212
destroy-method="close">
13-
<property name="driverClassName" value="org.postgresql.Driver" />
13+
<property name="driverClassName" value="org.testcontainers.jdbc.ContainerDatabaseDriver" />
1414
<property name="initialSize" value="10"/>
15-
<property name="url"
16-
value="jdbc:postgresql:integration" />
15+
<property name="url" value="jdbc:tc:postgresql:13://hostname/testdb" />
1716
<property name="username" value="postgres" />
1817
<property name="password" value="postgres" />
1918
</bean>
2019

21-
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>
20+
<jdbc:initialize-database data-source="dataSource">
21+
<jdbc:script location="classpath:org/springframework/integration/jdbc/schema-drop-postgresql.sql" />
22+
<jdbc:script location="classpath:org/springframework/integration/jdbc/schema-postgresql.sql" />
23+
</jdbc:initialize-database>
24+
25+
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider"/>
2226

2327
</beans>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans xmlns="http://www.springframework.org/schema/beans"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
5+
6+
<import resource="classpath:org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml"/>
7+
8+
</beans>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store.channel;
18+
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.springframework.integration.jdbc.store.PostgresJdbcMessageStore;
21+
import org.springframework.test.context.ContextConfiguration;
22+
23+
/**
24+
* @author Johannes Edmeier
25+
*/
26+
@ContextConfiguration
27+
public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests {
28+
@BeforeEach
29+
@Override
30+
public void init() {
31+
messageStore = new PostgresJdbcMessageStore(dataSource);
32+
messageStore.setRegion(REGION);
33+
messageStore.setChannelMessageStoreQueryProvider(queryProvider);
34+
messageStore.afterPropertiesSet();
35+
messageStore.removeMessageGroup("AbstractJdbcChannelMessageStoreTests");
36+
}
37+
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,12 @@
77

88
<import resource="classpath:org/springframework/integration/jdbc/store/channel/TxTimeoutMessageStoreTests-context.xml"/>
99

10+
<bean id="store" class="org.springframework.integration.jdbc.store.PostgresJdbcMessageStore">
11+
<property name="dataSource" ref="dataSource"/>
12+
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
13+
<property name="region" value="TX_TIMEOUT"/>
14+
<property name="usingIdCache" value="true"/>
15+
</bean>
16+
1017
</beans>
1118

0 commit comments

Comments
 (0)