Skip to content

Commit ab121ed

Browse files
committed
Adds an implementation for a Postres-compatible notification listener for a JdbcChannelMessageStore.
1 parent 8c9662a commit ab121ed

File tree

6 files changed

+412
-0
lines changed

6 files changed

+412
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ project('spring-integration-jdbc') {
713713
dependencies {
714714
api project(':spring-integration-core')
715715
api 'org.springframework:spring-jdbc'
716+
optionalApi "org.postgresql:postgresql:$postgresVersion"
716717

717718
testImplementation "com.h2database:h2:$h2Version"
718719
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import java.sql.SQLException;
20+
21+
import org.postgresql.jdbc.PgConnection;
22+
23+
@FunctionalInterface
24+
public interface PgConnectionSupplier {
25+
26+
PgConnection get() throws SQLException;
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import org.postgresql.PGNotification;
20+
import org.postgresql.jdbc.PgConnection;
21+
import org.springframework.context.SmartLifecycle;
22+
import org.springframework.core.log.LogAccessor;
23+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
24+
import org.springframework.integration.util.UUIDConverter;
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.messaging.SubscribableChannel;
27+
import org.springframework.util.Assert;
28+
29+
import java.sql.SQLException;
30+
import java.sql.Statement;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.*;
34+
35+
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
36+
37+
private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
38+
39+
private final PgConnectionSupplier connectionSupplier;
40+
41+
private final String tablePrefix;
42+
43+
@Nullable
44+
private ExecutorService executor;
45+
46+
private CountDownLatch latch = new CountDownLatch(0);
47+
48+
private Future<?> future = CompletableFuture.completedFuture(null);
49+
50+
@Nullable
51+
private volatile PgConnection connection;
52+
53+
private final Map<String, Set<PostgresChannelMessageTableSubscription>> subscriptions = new ConcurrentHashMap<>();
54+
55+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
56+
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
57+
}
58+
59+
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
60+
Assert.notNull(connectionSupplier, "A connectionSupplier must be provided.");
61+
Assert.notNull(tablePrefix, "A table prefix must be set.");
62+
this.connectionSupplier = connectionSupplier;
63+
this.tablePrefix = tablePrefix;
64+
}
65+
66+
public void setExecutor(ExecutorService executor) {
67+
this.executor = executor;
68+
}
69+
70+
public SubscribableChannel toSubscribableChannel(
71+
JdbcChannelMessageStore messageStore,
72+
Object groupId) {
73+
return PostgresChannelMessageTableSubscription.asSubscribableChannel(this, messageStore, groupId);
74+
}
75+
76+
boolean subscribe(PostgresChannelMessageTableSubscription subscription) {
77+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
78+
return subscriptions.add(subscription);
79+
}
80+
81+
boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
82+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(subscription.getRegion() + " " + getKey(subscription.getGroupId()));
83+
return subscriptions != null && subscriptions.remove(subscription);
84+
}
85+
86+
@Override
87+
public synchronized void start() {
88+
if (this.latch.getCount() > 0) {
89+
return;
90+
}
91+
ExecutorService executor = this.executor;
92+
if (executor == null) {
93+
executor = Executors.newSingleThreadExecutor(
94+
job -> {
95+
Thread t = new Thread(job);
96+
t.setDaemon(true);
97+
t.setName("postgres-channel-message-table-subscriber");
98+
return t;
99+
}
100+
);
101+
this.executor = executor;
102+
}
103+
this.latch = new CountDownLatch(1);
104+
this.future = executor.submit(() -> {
105+
try {
106+
while (isActive()) {
107+
try {
108+
PgConnection conn = this.connectionSupplier.get();
109+
try (Statement stmt = conn.createStatement()) {
110+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
111+
} catch (Throwable t) {
112+
try {
113+
conn.close();
114+
} catch (Throwable suppressed) {
115+
t.addSuppressed(suppressed);
116+
}
117+
throw t;
118+
}
119+
try {
120+
this.connection = conn;
121+
while (isActive()) {
122+
PGNotification[] notifications = conn.getNotifications(0);
123+
// Unfortunately, there is no good way of interrupting a notification poll but by closing its connection.
124+
if (!isActive()) {
125+
return;
126+
}
127+
if (notifications != null) {
128+
for (PGNotification notification : notifications) {
129+
String parameter = notification.getParameter();
130+
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(parameter);
131+
if (subscriptions == null) {
132+
continue;
133+
}
134+
for (PostgresChannelMessageTableSubscription subscription : subscriptions) {
135+
subscription.onPossibleUpdate();
136+
}
137+
}
138+
}
139+
}
140+
} finally {
141+
conn.close();
142+
}
143+
} catch (Exception e) {
144+
// The getNotifications method does not throw a meaningful message on interruption.
145+
// Therefore, we do not log an error, unless it occurred while active.
146+
if (isActive()) {
147+
LOGGER.error(e, "Failed to poll notifications from Postgres database");
148+
}
149+
} catch (Throwable t) {
150+
LOGGER.error(t, "Failed to poll notifications from Postgres database");
151+
return;
152+
}
153+
}
154+
} finally {
155+
this.latch.countDown();
156+
}
157+
});
158+
}
159+
160+
private boolean isActive() {
161+
if (Thread.interrupted()) {
162+
Thread.currentThread().interrupt();
163+
return false;
164+
}
165+
return true;
166+
}
167+
168+
@Override
169+
public synchronized void stop() {
170+
Future<?> future = this.future;
171+
if (future.isDone()) {
172+
return;
173+
}
174+
future.cancel(true);
175+
PgConnection conn = this.connection;
176+
if (conn != null) {
177+
try {
178+
conn.close();
179+
} catch (SQLException ignored) {
180+
}
181+
}
182+
try {
183+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
184+
throw new IllegalStateException("Failed to stop " + PostgresChannelMessageTableSubscriber.class.getName());
185+
}
186+
} catch (InterruptedException ignored) {
187+
}
188+
}
189+
190+
@Override
191+
public boolean isRunning() {
192+
return this.latch.getCount() > 0;
193+
}
194+
195+
private String getKey(Object input) {
196+
return input == null ? null : UUIDConverter.getUUID(input).toString();
197+
}
198+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.channel;
18+
19+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessageHandler;
22+
import org.springframework.messaging.SubscribableChannel;
23+
import org.springframework.util.Assert;
24+
25+
final class PostgresChannelMessageTableSubscription {
26+
27+
private final Object groupId;
28+
29+
private final String region;
30+
31+
private final Runnable onPossibleUpdate;
32+
33+
PostgresChannelMessageTableSubscription(Object groupId, String region, Runnable onPossibleUpdate) {
34+
this.groupId = groupId;
35+
this.region = region;
36+
this.onPossibleUpdate = onPossibleUpdate;
37+
}
38+
39+
Object getGroupId() {
40+
return this.groupId;
41+
}
42+
43+
String getRegion() {
44+
return this.region;
45+
}
46+
47+
void onPossibleUpdate() {
48+
this.onPossibleUpdate.run();
49+
}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (this == o) {
54+
return true;
55+
}
56+
if (o == null || getClass() != o.getClass()) {
57+
return false;
58+
}
59+
PostgresChannelMessageTableSubscription that = (PostgresChannelMessageTableSubscription) o;
60+
if (!this.groupId.equals(that.groupId)) {
61+
return false;
62+
}
63+
if (!this.region.equals(that.region)) {
64+
return false;
65+
}
66+
return this.onPossibleUpdate.equals(that.onPossibleUpdate);
67+
}
68+
69+
@Override
70+
public int hashCode() {
71+
int result = this.groupId.hashCode();
72+
result = 31 * result + this.region.hashCode();
73+
result = 31 * result + this.onPossibleUpdate.hashCode();
74+
return result;
75+
}
76+
77+
static SubscribableChannel asSubscribableChannel(
78+
PostgresChannelMessageTableSubscriber subscriber,
79+
JdbcChannelMessageStore messageStore,
80+
Object groupId) {
81+
return new SubscribableChannel() {
82+
@Override
83+
public boolean subscribe(MessageHandler handler) {
84+
Assert.isTrue(subscriber.isRunning(), PostgresChannelMessageTableSubscriber.class.getName() + " must be started");
85+
MessageHandlerDispatcher dispatcher = new MessageHandlerDispatcher(handler, messageStore, groupId);
86+
if (subscriber.subscribe(new PostgresChannelMessageTableSubscription(groupId,
87+
messageStore.getRegion(),
88+
dispatcher))) {
89+
dispatcher.run();
90+
return true;
91+
}
92+
else {
93+
return false;
94+
}
95+
}
96+
97+
@Override
98+
public boolean unsubscribe(MessageHandler handler) {
99+
return subscriber.unsubscribe(new PostgresChannelMessageTableSubscription(groupId,
100+
messageStore.getRegion(),
101+
new MessageHandlerDispatcher(handler, messageStore, groupId)));
102+
}
103+
104+
@Override
105+
public boolean send(Message<?> message, long timeout) {
106+
messageStore.addMessageToGroup(groupId, message);
107+
return true;
108+
}
109+
};
110+
}
111+
112+
private static final class MessageHandlerDispatcher implements Runnable {
113+
114+
private final MessageHandler handler;
115+
116+
private final JdbcChannelMessageStore messageStore;
117+
118+
private final Object groupId;
119+
120+
private MessageHandlerDispatcher(MessageHandler handler, JdbcChannelMessageStore messageStore, Object groupId) {
121+
this.handler = handler;
122+
this.messageStore = messageStore;
123+
this.groupId = groupId;
124+
}
125+
126+
@Override
127+
public void run() {
128+
Message<?> message = this.messageStore.pollMessageFromGroup(this.groupId);
129+
while (message != null) {
130+
this.handler.handleMessage(message);
131+
message = this.messageStore.pollMessageFromGroup(this.groupId);
132+
}
133+
}
134+
135+
@Override
136+
public boolean equals(Object o) {
137+
if (this == o) {
138+
return true;
139+
}
140+
if (o == null || getClass() != o.getClass()) {
141+
return false;
142+
}
143+
MessageHandlerDispatcher that = (MessageHandlerDispatcher) o;
144+
if (!this.handler.equals(that.handler)) {
145+
return false;
146+
}
147+
if (!this.messageStore.equals(that.messageStore)) {
148+
return false;
149+
}
150+
return this.groupId.equals(that.groupId);
151+
}
152+
153+
@Override
154+
public int hashCode() {
155+
int result = this.handler.hashCode();
156+
result = 31 * result + this.messageStore.hashCode();
157+
result = 31 * result + this.groupId.hashCode();
158+
return result;
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)