Skip to content

Commit 5d39c70

Browse files
committed
Adds an implementation for a Postres-compatible notification listener for a JdbcChannelMessageStore.
1 parent 8c9662a commit 5d39c70

File tree

6 files changed

+323
-0
lines changed

6 files changed

+323
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ project('spring-integration-jdbc') {
713713
dependencies {
714714
api project(':spring-integration-core')
715715
api 'org.springframework:spring-jdbc'
716+
optionalApi "org.postgresql:postgresql:$postgresVersion"
716717

717718
testImplementation "com.h2database:h2:$h2Version"
718719
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import java.sql.SQLException;
20+
21+
import org.postgresql.jdbc.PgConnection;
22+
23+
@FunctionalInterface
24+
public interface PgConnectionSupplier {
25+
26+
PgConnection get() throws SQLException;
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import java.sql.SQLException;
20+
import java.sql.Statement;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.postgresql.PGNotification;
32+
import org.postgresql.jdbc.PgConnection;
33+
34+
import org.springframework.context.SmartLifecycle;
35+
import org.springframework.core.log.LogAccessor;
36+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
37+
import org.springframework.integration.util.UUIDConverter;
38+
import org.springframework.lang.Nullable;
39+
import org.springframework.util.Assert;
40+
41+
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
42+
43+
private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
44+
45+
private final PgConnectionSupplier connectionSupplier;
46+
47+
private final String tablePrefix;
48+
49+
@Nullable
50+
private ExecutorService executor;
51+
52+
private CountDownLatch latch = new CountDownLatch(0);
53+
54+
private Future<?> future = CompletableFuture.completedFuture(null);
55+
56+
@Nullable
57+
private volatile PgConnection connection;
58+
59+
private final Map<String, Set<PostgresChannelMessageTableSubscription>> subscriptions = new ConcurrentHashMap<>();
60+
61+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
62+
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
63+
}
64+
65+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
66+
Assert.notNull(connectionSupplier, "A connectionSupplier must be provided.");
67+
Assert.notNull(tablePrefix, "A table prefix must be set.");
68+
this.connectionSupplier = connectionSupplier;
69+
this.tablePrefix = tablePrefix;
70+
}
71+
72+
public void setExecutor(ExecutorService executor) {
73+
this.executor = executor;
74+
}
75+
76+
public boolean subscribe(PostgresChannelMessageTableSubscription subscription) {
77+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
78+
return subscriptions.add(subscription);
79+
}
80+
81+
public boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
82+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(subscription.getRegion() + " " + getKey(subscription.getGroupId()));
83+
return subscriptions != null && subscriptions.remove(subscription);
84+
}
85+
86+
@Override
87+
public synchronized void start() {
88+
if (this.latch.getCount() > 0) {
89+
return;
90+
}
91+
ExecutorService executor = this.executor;
92+
if (executor == null) {
93+
executor = Executors.newSingleThreadExecutor(
94+
job -> {
95+
Thread t = new Thread(job);
96+
t.setDaemon(true);
97+
t.setName("postgres-channel-message-table-subscriber");
98+
return t;
99+
}
100+
);
101+
this.executor = executor;
102+
}
103+
this.latch = new CountDownLatch(1);
104+
this.future = executor.submit(() -> {
105+
try {
106+
while (isActive()) {
107+
try {
108+
PgConnection conn = this.connectionSupplier.get();
109+
try (Statement stmt = conn.createStatement()) {
110+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
111+
}
112+
catch (Throwable t) {
113+
try {
114+
conn.close();
115+
}
116+
catch (Throwable suppressed) {
117+
t.addSuppressed(suppressed);
118+
}
119+
throw t;
120+
}
121+
try {
122+
this.connection = conn;
123+
while (isActive()) {
124+
PGNotification[] notifications = conn.getNotifications(0);
125+
// Unfortunately, there is no good way of interrupting a notification poll but by closing its connection.
126+
if (!isActive()) {
127+
return;
128+
}
129+
if (notifications != null) {
130+
for (PGNotification notification : notifications) {
131+
String parameter = notification.getParameter();
132+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(parameter);
133+
if (subscriptions == null) {
134+
continue;
135+
}
136+
for (PostgresChannelMessageTableSubscription subscription : subscriptions) {
137+
subscription.onPossibleUpdate();
138+
}
139+
}
140+
}
141+
}
142+
}
143+
finally {
144+
conn.close();
145+
}
146+
}
147+
catch (Exception e) {
148+
// The getNotifications method does not throw a meaningful message on interruption.
149+
// Therefore, we do not log an error, unless it occurred while active.
150+
if (isActive()) {
151+
LOGGER.error(e, "Failed to poll notifications from Postgres database");
152+
}
153+
}
154+
catch (Throwable t) {
155+
LOGGER.error(t, "Failed to poll notifications from Postgres database");
156+
return;
157+
}
158+
}
159+
}
160+
finally {
161+
this.latch.countDown();
162+
}
163+
});
164+
}
165+
166+
private boolean isActive() {
167+
if (Thread.interrupted()) {
168+
Thread.currentThread().interrupt();
169+
return false;
170+
}
171+
return true;
172+
}
173+
174+
@Override
175+
public synchronized void stop() {
176+
Future<?> future = this.future;
177+
if (future.isDone()) {
178+
return;
179+
}
180+
future.cancel(true);
181+
PgConnection conn = this.connection;
182+
if (conn != null) {
183+
try {
184+
conn.close();
185+
}
186+
catch (SQLException ignored) {
187+
}
188+
}
189+
try {
190+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
191+
throw new IllegalStateException("Failed to stop " + PostgresChannelMessageTableSubscriber.class.getName());
192+
}
193+
}
194+
catch (InterruptedException ignored) {
195+
}
196+
}
197+
198+
@Override
199+
public boolean isRunning() {
200+
return this.latch.getCount() > 0;
201+
}
202+
203+
private String getKey(Object input) {
204+
return input == null ? null : UUIDConverter.getUUID(input).toString();
205+
}
206+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessageHandler;
22+
23+
public final class PostgresChannelMessageTableSubscription {
24+
25+
private final Object groupId;
26+
27+
private final String region;
28+
29+
private final Runnable onPossibleUpdate;
30+
31+
public PostgresChannelMessageTableSubscription(Object groupId, Runnable onPossibleUpdate) {
32+
this(groupId, JdbcChannelMessageStore.DEFAULT_REGION, onPossibleUpdate);
33+
}
34+
35+
public PostgresChannelMessageTableSubscription(Object groupId, String region, Runnable onPossibleUpdate) {
36+
this.groupId = groupId;
37+
this.region = region;
38+
this.onPossibleUpdate = onPossibleUpdate;
39+
}
40+
41+
public Object getGroupId() {
42+
return this.groupId;
43+
}
44+
45+
public String getRegion() {
46+
return this.region;
47+
}
48+
49+
public void onPossibleUpdate() {
50+
this.onPossibleUpdate.run();
51+
}
52+
53+
public static PostgresChannelMessageTableSubscription ofMessageStore(
54+
JdbcChannelMessageStore messageStore,
55+
Object groupId,
56+
MessageHandler handler) {
57+
return new PostgresChannelMessageTableSubscription(groupId, messageStore.getRegion(), () -> {
58+
Message<?> message = messageStore.pollMessageFromGroup(groupId);
59+
while (message != null) {
60+
handler.handleMessage(message);
61+
message = messageStore.pollMessageFromGroup(groupId);
62+
}
63+
});
64+
}
65+
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ public void setRegion(String region) {
273273
this.region = region;
274274
}
275275

276+
/**
277+
* Returns the current region that was set or {@link #DEFAULT_REGION}, which is the default.
278+
* @return the set region name
279+
*/
280+
public String getRegion() {
281+
return this.region;
282+
}
283+
276284
/**
277285
* A converter for serializing messages to byte arrays for storage.
278286
* @param serializer The serializer to set

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,19 @@ CREATE TABLE INT_METADATA_STORE (
5858
REGION VARCHAR(100) NOT NULL,
5959
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
6060
);
61+
62+
-- This is only needed if using PostgresChannelMessageSubscriber
63+
-- CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
64+
-- RETURNS TRIGGER AS
65+
-- $BODY$
66+
-- BEGIN
67+
-- EXECUTE 'NOTIFY int_channel_message_notify, ''' || QUOTE_LITERAL(NEW.REGION || ' ' || NEW.GROUP_KEY) || '''';
68+
-- RETURN NEW;
69+
-- END;
70+
-- $BODY$
71+
-- LANGUAGE PLPGSQL;
72+
--
73+
-- CREATE TRIGGER INT_CHANNEL_MESSAGE_NOTIFY_TRG
74+
-- AFTER INSERT ON INT_CHANNEL_MESSAGE
75+
-- FOR EACH ROW
76+
-- EXECUTE PROCEDURE INT_CHANNEL_MESSAGE_NOTIFY_FCT()

0 commit comments

Comments
 (0)