Skip to content

Commit 92de4ca

Browse files
committed
* Introduce PostgresChannelMessageTableSubscriber.Subscription contract
* Implement a `PostgresSubscribableChannel`
1 parent ab121ed commit 92de4ca

File tree

2 files changed

+104
-8
lines changed

2 files changed

+104
-8
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
5050
@Nullable
5151
private volatile PgConnection connection;
5252

53-
private final Map<String, Set<PostgresChannelMessageTableSubscription>> subscriptions = new ConcurrentHashMap<>();
53+
private final Map<String, Set<Subscription>> subscriptions = new ConcurrentHashMap<>();
5454

5555
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
5656
this(connectionSupplier, JdbcChannelMessageStore.DEFAULT_TABLE_PREFIX);
@@ -73,13 +73,13 @@ public SubscribableChannel toSubscribableChannel(
7373
return PostgresChannelMessageTableSubscription.asSubscribableChannel(this, messageStore, groupId);
7474
}
7575

76-
boolean subscribe(PostgresChannelMessageTableSubscription subscription) {
77-
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
76+
boolean subscribe(Subscription subscription) {
77+
Set<Subscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
7878
return subscriptions.add(subscription);
7979
}
8080

81-
boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
82-
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(subscription.getRegion() + " " + getKey(subscription.getGroupId()));
81+
boolean unsubscribe(Subscription subscription) {
82+
Set<Subscription> subscriptions = this.subscriptions.get(subscription.getRegion() + " " + getKey(subscription.getGroupId()));
8383
return subscriptions != null && subscriptions.remove(subscription);
8484
}
8585

@@ -127,12 +127,12 @@ public synchronized void start() {
127127
if (notifications != null) {
128128
for (PGNotification notification : notifications) {
129129
String parameter = notification.getParameter();
130-
Set<PostgresChannelMessageTableSubscription> subscriptions = this.subscriptions.get(parameter);
130+
Set<Subscription> subscriptions = this.subscriptions.get(parameter);
131131
if (subscriptions == null) {
132132
continue;
133133
}
134-
for (PostgresChannelMessageTableSubscription subscription : subscriptions) {
135-
subscription.onPossibleUpdate();
134+
for (Subscription subscription : subscriptions) {
135+
subscription.notifyUpdate();
136136
}
137137
}
138138
}
@@ -195,4 +195,15 @@ public boolean isRunning() {
195195
private String getKey(Object input) {
196196
return input == null ? null : UUIDConverter.getUUID(input).toString();
197197
}
198+
199+
public interface Subscription {
200+
201+
void notifyUpdate();
202+
203+
String getRegion();
204+
205+
Object getGroupId();
206+
207+
}
208+
198209
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.springframework.integration.jdbc.channel;
2+
3+
import java.util.concurrent.Executor;
4+
5+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6+
import org.springframework.integration.channel.AbstractSubscribableChannel;
7+
import org.springframework.integration.dispatcher.MessageDispatcher;
8+
import org.springframework.integration.dispatcher.UnicastingDispatcher;
9+
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
10+
import org.springframework.messaging.Message;
11+
import org.springframework.messaging.MessageHandler;
12+
13+
public class PostgresSubscribableChannel extends AbstractSubscribableChannel
14+
implements PostgresChannelMessageTableSubscriber.Subscription {
15+
16+
private final JdbcChannelMessageStore jdbcChannelMessageStore;
17+
18+
private final Object groupId;
19+
20+
private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
21+
22+
private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
23+
24+
public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageStore,
25+
Object groupId, PostgresChannelMessageTableSubscriber messageTableSubscriber) {
26+
27+
this.jdbcChannelMessageStore = jdbcChannelMessageStore;
28+
this.groupId = groupId;
29+
this.messageTableSubscriber = messageTableSubscriber;
30+
}
31+
32+
public void setDispatcherExecutor(Executor executor) {
33+
this.dispatcher = new UnicastingDispatcher(executor);
34+
}
35+
36+
@Override
37+
public boolean subscribe(MessageHandler handler) {
38+
boolean subscribed = super.subscribe(handler);
39+
if (this.dispatcher.getHandlerCount() == 1) {
40+
this.messageTableSubscriber.subscribe(this);
41+
notifyUpdate();
42+
}
43+
return subscribed;
44+
}
45+
46+
@Override
47+
public boolean unsubscribe(MessageHandler handle) {
48+
boolean unsubscribed = super.unsubscribe(handle);
49+
if (this.dispatcher.getHandlerCount() == 0) {
50+
this.messageTableSubscriber.unsubscribe(this);
51+
notifyUpdate();
52+
}
53+
return unsubscribed;
54+
}
55+
56+
@Override
57+
protected MessageDispatcher getDispatcher() {
58+
return this.dispatcher;
59+
}
60+
61+
@Override
62+
protected boolean doSend(Message<?> message, long timeout) {
63+
this.jdbcChannelMessageStore.addMessageToGroup(this.groupId, message);
64+
return true;
65+
}
66+
67+
@Override
68+
public void notifyUpdate() {
69+
Message<?> message;
70+
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
71+
this.dispatcher.dispatch(message);
72+
}
73+
}
74+
75+
@Override
76+
public String getRegion() {
77+
return this.jdbcChannelMessageStore.getRegion();
78+
}
79+
80+
@Override
81+
public Object getGroupId() {
82+
return this.groupId;
83+
}
84+
85+
}

0 commit comments

Comments
 (0)