Skip to content

Commit 41c6066

Browse files
committed
Adds a sample implementation for a JDBC subscribable channel using Postres LISTEN/NOTIFY.
1 parent 8c9662a commit 41c6066

File tree

6 files changed

+244
-0
lines changed

6 files changed

+244
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ project('spring-integration-jdbc') {
713713
dependencies {
714714
api project(':spring-integration-core')
715715
api 'org.springframework:spring-jdbc'
716+
compileOnly "org.postgresql:postgresql:$postgresVersion"
716717

717718
testImplementation "com.h2database:h2:$h2Version"
718719
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.postgresql.jdbc.PgConnection;
4+
5+
import java.sql.SQLException;
6+
7+
@FunctionalInterface
8+
public interface PgConnectionSupplier {
9+
10+
PgConnection get() throws SQLException;
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.postgresql.PGNotification;
4+
import org.postgresql.jdbc.PgConnection;
5+
import org.springframework.context.SmartLifecycle;
6+
import org.springframework.core.log.LogAccessor;
7+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
8+
import org.springframework.integration.util.UUIDConverter;
9+
import org.springframework.util.Assert;
10+
11+
import java.sql.Statement;
12+
import java.util.Map;
13+
import java.util.Set;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.Executor;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
19+
public class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
20+
21+
private final LogAccessor logger = new LogAccessor(this.getClass()); // NOSONAR final
22+
23+
private final PgConnectionSupplier connectionSupplier;
24+
25+
private final Executor executor;
26+
27+
private final String tablePrefix;
28+
29+
private final AtomicBoolean running = new AtomicBoolean();
30+
31+
private volatile Object identity;
32+
33+
private final Map<String, Set<PostgresChannelMessageTableSubscription>> subscriptions = new ConcurrentHashMap<>();
34+
35+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
36+
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
37+
}
38+
39+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
40+
this(connectionSupplier, Executors.newSingleThreadExecutor(job -> {
41+
Thread t = new Thread(job);
42+
t.setDaemon(true);
43+
t.setName("");
44+
return t;
45+
}), tablePrefix);
46+
}
47+
48+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, Executor executor) {
49+
this(connectionSupplier, executor, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
50+
}
51+
52+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, Executor executor, String tablePrefix) {
53+
Assert.notNull(connectionSupplier, "A connectionSupplier must be provided.");
54+
Assert.notNull(executor, "A executor must be provided.");
55+
Assert.notNull(tablePrefix, "A table prefix must be set.");
56+
this.connectionSupplier = connectionSupplier;
57+
this.executor = executor;
58+
this.tablePrefix = tablePrefix;
59+
}
60+
61+
public boolean subscribe(PostgresChannelMessageTableSubscription subscription) {
62+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.computeIfAbsent(getKey(subscription.getGroupId()) + subscription.getRegion(), ignored -> ConcurrentHashMap.newKeySet());
63+
return subscriptions.add(subscription);
64+
}
65+
66+
public boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
67+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(getKey(subscription.getGroupId()) + subscription.getRegion());
68+
return subscriptions != null && subscriptions.remove(subscription);
69+
}
70+
71+
@Override
72+
public void start() {
73+
if (running.getAndSet(true)) {
74+
return;
75+
}
76+
// Avoid that stop/start sequence reactivates previously stopped thread.
77+
Object current = new Object();
78+
identity = current;
79+
executor.execute(() -> {
80+
try {
81+
while (isActive(current)) {
82+
try {
83+
PgConnection conn = this.connectionSupplier.get();
84+
try (Statement stmt = conn.createStatement()) {
85+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
86+
} catch (Throwable t) {
87+
try {
88+
conn.close();
89+
} catch (Throwable suppressed) {
90+
t.addSuppressed(suppressed);
91+
}
92+
throw t;
93+
}
94+
try {
95+
while (isActive(current)) {
96+
PGNotification[] notifications = conn.getNotifications(0);
97+
// Unfortunately, there is no good way of interrupting a notification poll.
98+
if (!isActive(current)) {
99+
return;
100+
}
101+
if (notifications != null) {
102+
for (PGNotification notification : notifications) {
103+
String parameter = notification.getParameter();
104+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(parameter);
105+
if (subscriptions == null) {
106+
continue;
107+
}
108+
for (PostgresChannelMessageTableSubscription subscription : subscriptions) {
109+
subscription.onPossibleUpdate();
110+
}
111+
}
112+
}
113+
}
114+
} finally {
115+
conn.close();
116+
}
117+
} catch (Exception e) {
118+
// The getNotifications method does not throw a meaningful message on interruption.
119+
// Therefore, we do not log an error, unless it occurred while active.
120+
if (isActive(current)) {
121+
logger.error(e, "Failed to poll notifications from Postgres database");
122+
}
123+
} catch (Throwable t) {
124+
logger.error(t, "Failed to poll notifications from Postgres database");
125+
return;
126+
}
127+
}
128+
} finally {
129+
running.set(false);
130+
}
131+
});
132+
}
133+
134+
private boolean isActive(Object identity) {
135+
if (identity != this.identity) {
136+
return false;
137+
} else if (!running.get()) {
138+
return false;
139+
} else if (Thread.interrupted()) {
140+
running.set(false);
141+
return false;
142+
} else {
143+
return true;
144+
}
145+
}
146+
147+
@Override
148+
public void stop() {
149+
running.set(false);
150+
}
151+
152+
@Override
153+
public boolean isRunning() {
154+
return running.get();
155+
}
156+
157+
private String getKey(Object input) {
158+
return input == null ? null : UUIDConverter.getUUID(input).toString();
159+
}
160+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
4+
import org.springframework.messaging.Message;
5+
import org.springframework.messaging.MessageHandler;
6+
7+
public final class PostgresChannelMessageTableSubscription {
8+
9+
private final Object groupId;
10+
11+
private final String region;
12+
13+
private final Runnable onPossibleUpdate;
14+
15+
public PostgresChannelMessageTableSubscription(Object groupId, Runnable onPossibleUpdate) {
16+
this(groupId, JdbcChannelMessageStore.DEFAULT_REGION, onPossibleUpdate);
17+
}
18+
19+
public PostgresChannelMessageTableSubscription(Object groupId, String region, Runnable onPossibleUpdate) {
20+
this.groupId = groupId;
21+
this.region = region;
22+
this.onPossibleUpdate = onPossibleUpdate;
23+
}
24+
25+
public Object getGroupId() {
26+
return groupId;
27+
}
28+
29+
public String getRegion() {
30+
return region;
31+
}
32+
33+
public void onPossibleUpdate() {
34+
onPossibleUpdate.run();
35+
}
36+
37+
public static PostgresChannelMessageTableSubscription ofMessageStore(JdbcChannelMessageStore messageStore,
38+
Object groupId,
39+
MessageHandler handler) {
40+
return new PostgresChannelMessageTableSubscription(groupId, messageStore.getRegion(), () -> {
41+
Message<?> message = messageStore.pollMessageFromGroup(groupId);
42+
while (message != null) {
43+
handler.handleMessage(message);
44+
message = messageStore.pollMessageFromGroup(groupId);
45+
}
46+
});
47+
}
48+
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ public void setRegion(String region) {
273273
this.region = region;
274274
}
275275

276+
/**
277+
* Returns the current region that was set or {@link #DEFAULT_REGION}, which is the default.
278+
* @return the set region name
279+
*/
280+
public String getRegion() {
281+
return region;
282+
}
283+
276284
/**
277285
* A converter for serializing messages to byte arrays for storage.
278286
* @param serializer The serializer to set

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,19 @@ CREATE TABLE INT_METADATA_STORE (
5858
REGION VARCHAR(100) NOT NULL,
5959
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
6060
);
61+
62+
-- This is only needed if using PostgresChannelMessageSubscriber
63+
-- CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
64+
-- RETURNS TRIGGER AS
65+
-- $BODY$
66+
-- BEGIN
67+
-- EXECUTE 'NOTIFY int_channel_message_notify, ''' || QUOTE_LITERAL(NEW.GROUP_KEY || NEW.REGION) || '''';
68+
-- RETURN NEW;
69+
-- END;
70+
-- $BODY$
71+
-- LANGUAGE PLPGSQL;
72+
--
73+
-- CREATE TRIGGER INT_CHANNEL_MESSAGE_NOTIFY_TRG
74+
-- AFTER INSERT ON INT_CHANNEL_MESSAGE
75+
-- FOR EACH ROW
76+
-- EXECUTE PROCEDURE INT_CHANNEL_MESSAGE_NOTIFY_FCT()

0 commit comments

Comments
 (0)