Skip to content

Commit 0197628

Browse files
committed
Adds an implementation for a Postres-compatible notification listener for a JdbcChannelMessageStore.
1 parent 8c9662a commit 0197628

File tree

6 files changed

+245
-0
lines changed

6 files changed

+245
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ project('spring-integration-jdbc') {
713713
dependencies {
714714
api project(':spring-integration-core')
715715
api 'org.springframework:spring-jdbc'
716+
optionalApi "org.postgresql:postgresql:$postgresVersion"
716717

717718
testImplementation "com.h2database:h2:$h2Version"
718719
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.postgresql.jdbc.PgConnection;
4+
5+
import java.sql.SQLException;
6+
7+
@FunctionalInterface
8+
public interface PgConnectionSupplier {
9+
10+
PgConnection get() throws SQLException;
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.postgresql.PGNotification;
4+
import org.postgresql.jdbc.PgConnection;
5+
import org.springframework.context.SmartLifecycle;
6+
import org.springframework.core.log.LogAccessor;
7+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
8+
import org.springframework.integration.util.UUIDConverter;
9+
import org.springframework.lang.Nullable;
10+
import org.springframework.util.Assert;
11+
12+
import java.sql.Statement;
13+
import java.util.Map;
14+
import java.util.Set;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
20+
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
21+
22+
private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
23+
24+
private final PgConnectionSupplier connectionSupplier;
25+
26+
private final String tablePrefix;
27+
28+
@Nullable
29+
private Executor executor;
30+
31+
private final AtomicBoolean running = new AtomicBoolean();
32+
33+
private volatile Object identity;
34+
35+
private final Map<String, Set<PostgresChannelMessageTableSubscription>> subscriptions = new ConcurrentHashMap<>();
36+
37+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
38+
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
39+
}
40+
41+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
42+
Assert.notNull(connectionSupplier, "A connectionSupplier must be provided.");
43+
Assert.notNull(tablePrefix, "A table prefix must be set.");
44+
this.connectionSupplier = connectionSupplier;
45+
this.tablePrefix = tablePrefix;
46+
}
47+
48+
public void setExecutor(Executor executor) {
49+
this.executor = executor;
50+
}
51+
52+
public boolean subscribe(PostgresChannelMessageTableSubscription subscription) {
53+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
54+
return subscriptions.add(subscription);
55+
}
56+
57+
public boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
58+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(subscription.getRegion() + getKey(subscription.getGroupId()));
59+
return subscriptions != null && subscriptions.remove(subscription);
60+
}
61+
62+
@Override
63+
public void start() {
64+
if (running.getAndSet(true)) {
65+
return;
66+
}
67+
Executor executor = this.executor;
68+
if (executor == null) {
69+
executor = Executors.newSingleThreadExecutor(job -> {
70+
Thread t = new Thread(job);
71+
t.setDaemon(true);
72+
t.setName("postgres-channel-message-table-subscriber");
73+
return t;
74+
});
75+
this.executor = executor;
76+
}
77+
// Avoid that stop/start sequence reactivates previously stopped thread.
78+
Object current = new Object();
79+
identity = current;
80+
executor.execute(() -> {
81+
try {
82+
while (isActive(current)) {
83+
try {
84+
PgConnection conn = this.connectionSupplier.get();
85+
try (Statement stmt = conn.createStatement()) {
86+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
87+
} catch (Throwable t) {
88+
try {
89+
conn.close();
90+
} catch (Throwable suppressed) {
91+
t.addSuppressed(suppressed);
92+
}
93+
throw t;
94+
}
95+
try {
96+
while (isActive(current)) {
97+
PGNotification[] notifications = conn.getNotifications(0);
98+
// Unfortunately, there is no good way of interrupting a notification poll.
99+
if (!isActive(current)) {
100+
return;
101+
}
102+
if (notifications != null) {
103+
for (PGNotification notification : notifications) {
104+
String parameter = notification.getParameter();
105+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(parameter);
106+
if (subscriptions == null) {
107+
continue;
108+
}
109+
for (PostgresChannelMessageTableSubscription subscription : subscriptions) {
110+
subscription.onPossibleUpdate();
111+
}
112+
}
113+
}
114+
}
115+
} finally {
116+
conn.close();
117+
}
118+
} catch (Exception e) {
119+
// The getNotifications method does not throw a meaningful message on interruption.
120+
// Therefore, we do not log an error, unless it occurred while active.
121+
if (isActive(current)) {
122+
LOGGER.error(e, "Failed to poll notifications from Postgres database");
123+
}
124+
} catch (Throwable t) {
125+
LOGGER.error(t, "Failed to poll notifications from Postgres database");
126+
return;
127+
}
128+
}
129+
} finally {
130+
running.set(false);
131+
}
132+
});
133+
}
134+
135+
private boolean isActive(Object identity) {
136+
if (identity != this.identity) {
137+
return false;
138+
} else if (!running.get()) {
139+
return false;
140+
} else if (Thread.interrupted()) {
141+
running.set(false);
142+
return false;
143+
} else {
144+
return true;
145+
}
146+
}
147+
148+
@Override
149+
public void stop() {
150+
running.set(false);
151+
}
152+
153+
@Override
154+
public boolean isRunning() {
155+
return running.get();
156+
}
157+
158+
private String getKey(Object input) {
159+
return input == null ? null : UUIDConverter.getUUID(input).toString();
160+
}
161+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
4+
import org.springframework.messaging.Message;
5+
import org.springframework.messaging.MessageHandler;
6+
7+
public final class PostgresChannelMessageTableSubscription {
8+
9+
private final Object groupId;
10+
11+
private final String region;
12+
13+
private final Runnable onPossibleUpdate;
14+
15+
public PostgresChannelMessageTableSubscription(Object groupId, Runnable onPossibleUpdate) {
16+
this(groupId, JdbcChannelMessageStore.DEFAULT_REGION, onPossibleUpdate);
17+
}
18+
19+
public PostgresChannelMessageTableSubscription(Object groupId, String region, Runnable onPossibleUpdate) {
20+
this.groupId = groupId;
21+
this.region = region;
22+
this.onPossibleUpdate = onPossibleUpdate;
23+
}
24+
25+
public Object getGroupId() {
26+
return groupId;
27+
}
28+
29+
public String getRegion() {
30+
return region;
31+
}
32+
33+
public void onPossibleUpdate() {
34+
onPossibleUpdate.run();
35+
}
36+
37+
public static PostgresChannelMessageTableSubscription ofMessageStore(JdbcChannelMessageStore messageStore,
38+
Object groupId,
39+
MessageHandler handler) {
40+
return new PostgresChannelMessageTableSubscription(groupId, messageStore.getRegion(), () -> {
41+
Message<?> message = messageStore.pollMessageFromGroup(groupId);
42+
while (message != null) {
43+
handler.handleMessage(message);
44+
message = messageStore.pollMessageFromGroup(groupId);
45+
}
46+
});
47+
}
48+
}

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ public void setRegion(String region) {
273273
this.region = region;
274274
}
275275

276+
/**
277+
* Returns the current region that was set or {@link #DEFAULT_REGION}, which is the default.
278+
* @return the set region name
279+
*/
280+
public String getRegion() {
281+
return region;
282+
}
283+
276284
/**
277285
* A converter for serializing messages to byte arrays for storage.
278286
* @param serializer The serializer to set

spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,19 @@ CREATE TABLE INT_METADATA_STORE (
5858
REGION VARCHAR(100) NOT NULL,
5959
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
6060
);
61+
62+
-- This is only needed if using PostgresChannelMessageSubscriber
63+
-- CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
64+
-- RETURNS TRIGGER AS
65+
-- $BODY$
66+
-- BEGIN
67+
-- EXECUTE 'NOTIFY int_channel_message_notify, ''' || QUOTE_LITERAL(NEW.REGION || NEW.GROUP_KEY) || '''';
68+
-- RETURN NEW;
69+
-- END;
70+
-- $BODY$
71+
-- LANGUAGE PLPGSQL;
72+
--
73+
-- CREATE TRIGGER INT_CHANNEL_MESSAGE_NOTIFY_TRG
74+
-- AFTER INSERT ON INT_CHANNEL_MESSAGE
75+
-- FOR EACH ROW
76+
-- EXECUTE PROCEDURE INT_CHANNEL_MESSAGE_NOTIFY_FCT()

0 commit comments

Comments
 (0)