Skip to content

Commit cfd7704

Browse files
committed
spring-projectsGH-8582:Transactional support in PostgresSubscribableChannel
1 parent 6d7ee46 commit cfd7704

File tree

3 files changed

+250
-46
lines changed

3 files changed

+250
-46
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.Future;
29+
import java.util.concurrent.RejectedExecutionException;
2930
import java.util.concurrent.TimeUnit;
3031

3132
import org.postgresql.PGNotification;
@@ -59,6 +60,7 @@
5960
*
6061
* @author Rafael Winterhalter
6162
* @author Artem Bilan
63+
* @author Igor Lovich
6264
*
6365
* @since 6.0
6466
*/
@@ -75,15 +77,17 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
7577
@Nullable
7678
private ExecutorService executor;
7779

78-
private CountDownLatch latch = new CountDownLatch(0);
80+
private boolean userProvidedExecutor = false;
7981

80-
private Future<?> future = CompletableFuture.completedFuture(null);
82+
private CountDownLatch latch = new CountDownLatch(0);
8183

8284
@Nullable
8385
private volatile PgConnection connection;
86+
private Future<?> future = CompletableFuture.completedFuture(null);
8487

8588
/**
8689
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
90+
*
8791
* @param connectionSupplier The connection supplier for the targeted Postgres database.
8892
*/
8993
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
@@ -92,7 +96,8 @@ public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupp
9296

9397
/**
9498
* Create a new subscriber.
95-
* @param tablePrefix The table prefix of the {@link JdbcChannelMessageStore} to subscribe to.
99+
*
100+
* @param tablePrefix The table prefix of the {@link JdbcChannelMessageStore} to subscribe to.
96101
* @param connectionSupplier The connection supplier for the targeted Postgres database.
97102
*/
98103
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
@@ -106,14 +111,16 @@ public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupp
106111
* Define an executor to use for listening for new messages. Note that the Postgres SQL driver implements
107112
* listening for notifications as a blocking operation which will permanently block a thread of this executor
108113
* while running.
114+
*
109115
* @param executor The executor to use or {@code null} if an executor should be created by this class.
110116
*/
111117
public synchronized void setExecutor(@Nullable ExecutorService executor) {
112118
this.executor = executor;
113119
}
114120

115121
/**
116-
* Add a new subscription to this subscriber.
122+
* Add a new subscription to this subscriber.;
123+
*
117124
* @param subscription The subscription to register.
118125
* @return {@code true} if the subscription was not already added.
119126
*/
@@ -126,6 +133,7 @@ public boolean subscribe(Subscription subscription) {
126133

127134
/**
128135
* Remove a previous subscription from this subscriber.
136+
*
129137
* @param subscription The subscription to remove.
130138
* @return {@code true} if the subscription was previously registered and is now removed.
131139
*/
@@ -143,12 +151,17 @@ public synchronized void start() {
143151
ExecutorService executorToUse = this.executor;
144152
if (executorToUse == null) {
145153
CustomizableThreadFactory threadFactory =
146-
new CustomizableThreadFactory("postgres-channel-message-table-subscriber-");
154+
new CustomizableThreadFactory("postgres-channel-notifications-");
147155
threadFactory.setDaemon(true);
148-
executorToUse = Executors.newSingleThreadExecutor(threadFactory);
156+
executorToUse = Executors.newFixedThreadPool(2, threadFactory);
149157
this.executor = executorToUse;
150158
}
159+
else {
160+
this.userProvidedExecutor = true;
161+
}
151162
this.latch = new CountDownLatch(1);
163+
164+
CountDownLatch startingLatch = new CountDownLatch(1);
152165
this.future = executorToUse.submit(() -> {
153166
try {
154167
while (isActive()) {
@@ -166,11 +179,13 @@ public synchronized void start() {
166179
}
167180
throw ex;
168181
}
169-
this.subscriptionsMap.values()
170-
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
182+
this.subscriptionsMap.values().forEach(this::notifyAll);
183+
171184
try {
172185
this.connection = conn;
173186
while (isActive()) {
187+
startingLatch.countDown();
188+
174189
PGNotification[] notifications = conn.getNotifications(0);
175190
// Unfortunately, there is no good way of interrupting a notification
176191
// poll but by closing its connection.
@@ -184,9 +199,7 @@ public synchronized void start() {
184199
if (subscriptions == null) {
185200
continue;
186201
}
187-
for (Subscription subscription : subscriptions) {
188-
subscription.notifyUpdate();
189-
}
202+
notifyAll(subscriptions);
190203
}
191204
}
192205
}
@@ -208,6 +221,29 @@ public synchronized void start() {
208221
this.latch.countDown();
209222
}
210223
});
224+
225+
try {
226+
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
227+
throw new IllegalStateException("Failed to start "
228+
+ PostgresChannelMessageTableSubscriber.class.getName());
229+
}
230+
}
231+
catch (InterruptedException e) {
232+
Thread.currentThread().interrupt();
233+
throw new IllegalStateException("Failed to start "
234+
+ PostgresChannelMessageTableSubscriber.class.getName(), e);
235+
}
236+
}
237+
238+
private void notifyAll(Set<Subscription> subscriptions) {
239+
subscriptions.forEach(it -> {
240+
try {
241+
this.executor.submit(it::notifyUpdate);
242+
}
243+
catch (RejectedExecutionException e) {
244+
LOGGER.warn(e, "Executor rejected submission of notification task");
245+
}
246+
});
211247
}
212248

213249
private boolean isActive() {
@@ -232,6 +268,11 @@ public synchronized void stop() {
232268
catch (SQLException ignored) {
233269
}
234270
}
271+
272+
if (!this.userProvidedExecutor) {
273+
shutdownAndAwaitTermination(this.executor);
274+
}
275+
235276
try {
236277
if (!this.latch.await(5, TimeUnit.SECONDS)) {
237278
throw new IllegalStateException("Failed to stop "
@@ -242,6 +283,35 @@ public synchronized void stop() {
242283
}
243284
}
244285

286+
287+
/**
288+
* Gracefully shutdown an executor service. Taken from @see ExecutorService javadoc
289+
*
290+
* @param pool The pool to shut down
291+
*/
292+
private void shutdownAndAwaitTermination(@Nullable ExecutorService pool) {
293+
if (pool == null) {
294+
return;
295+
}
296+
pool.shutdown(); // Disable new tasks from being submitted
297+
try {
298+
// Wait a while for existing tasks to terminate
299+
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
300+
pool.shutdownNow(); // Cancel currently executing tasks
301+
// Wait a while for tasks to respond to being cancelled
302+
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
303+
LOGGER.warn("Unable to shutdown the executor service");
304+
}
305+
}
306+
}
307+
catch (InterruptedException ie) {
308+
// (Re-)Cancel if current thread also interrupted
309+
pool.shutdownNow();
310+
// Preserve interrupt status
311+
Thread.currentThread().interrupt();
312+
}
313+
}
314+
245315
@Override
246316
public boolean isRunning() {
247317
return this.latch.getCount() > 0;
@@ -269,12 +339,14 @@ public interface Subscription {
269339

270340
/**
271341
* Return the region for which this subscription receives notifications.
342+
*
272343
* @return The relevant region of the {@link JdbcChannelMessageStore}.
273344
*/
274345
String getRegion();
275346

276347
/**
277348
* Return the group id for which this subscription receives notifications.
349+
*
278350
* @return The group id of the {@link PostgresSubscribableChannel}.
279351
*/
280352
Object getGroupId();

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,17 @@
1616

1717
package org.springframework.integration.jdbc.channel;
1818

19-
import java.util.concurrent.Executor;
19+
import java.util.Optional;
2020

21-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2221
import org.springframework.integration.channel.AbstractSubscribableChannel;
2322
import org.springframework.integration.dispatcher.MessageDispatcher;
2423
import org.springframework.integration.dispatcher.UnicastingDispatcher;
2524
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
2625
import org.springframework.messaging.Message;
2726
import org.springframework.messaging.MessageHandler;
27+
import org.springframework.retry.support.RetryTemplate;
28+
import org.springframework.transaction.PlatformTransactionManager;
29+
import org.springframework.transaction.support.TransactionTemplate;
2830
import org.springframework.util.Assert;
2931

3032
/**
@@ -39,6 +41,7 @@
3941
*
4042
* @author Rafael Winterhalter
4143
* @author Artem Bilan
44+
* @author Igor Lovich
4245
*
4346
* @since 6.0
4447
*/
@@ -47,11 +50,15 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
4750

4851
private final JdbcChannelMessageStore jdbcChannelMessageStore;
4952

53+
private TransactionTemplate transactionTemplate;
54+
5055
private final Object groupId;
5156

5257
private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
5358

54-
private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
59+
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
60+
61+
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
5562

5663
/**
5764
* Create a subscribable channel for a Postgres database.
@@ -70,12 +77,22 @@ public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageSto
7077
}
7178

7279
/**
73-
* Set the executor to use for dispatching newly received messages.
74-
* @param executor The executor to use.
80+
* Sets the transaction manager to use for message processing. Each message will be processed in a
81+
* separate transaction
82+
* @param transactionManager The transaction manager to use
7583
*/
76-
public void setDispatcherExecutor(Executor executor) {
77-
Assert.notNull(executor, "An executor must be provided.");
78-
this.dispatcher = new UnicastingDispatcher(executor);
84+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
85+
Assert.notNull(transactionManager, "A platform transaction manager must be provided.");
86+
this.transactionTemplate = new TransactionTemplate(transactionManager);
87+
}
88+
89+
/**
90+
* Sets retry template to use for retries in case of exception in downstream processing
91+
* @param retryTemplate The retry template to use
92+
*/
93+
public void setRetryTemplate(RetryTemplate retryTemplate) {
94+
Assert.notNull(retryTemplate, "A retry template must be provided.");
95+
this.retryTemplate = retryTemplate;
7996
}
8097

8198
@Override
@@ -110,10 +127,29 @@ protected boolean doSend(Message<?> message, long timeout) {
110127

111128
@Override
112129
public void notifyUpdate() {
113-
Message<?> message;
114-
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
115-
this.dispatcher.dispatch(message);
116-
}
130+
Optional<Message<?>> dispatchedMessage;
131+
132+
do {
133+
if (this.transactionTemplate != null) {
134+
dispatchedMessage = this.retryTemplate.execute(context ->
135+
this.transactionTemplate.execute(status ->
136+
Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
137+
.map(this::dispatch)
138+
)
139+
);
140+
}
141+
else {
142+
dispatchedMessage = Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
143+
.map(message ->
144+
this.retryTemplate.execute(context -> dispatch(message))
145+
);
146+
}
147+
} while (dispatchedMessage.isPresent());
148+
}
149+
150+
private Message<?> dispatch(Message<?> message) {
151+
this.dispatcher.dispatch(message);
152+
return message;
117153
}
118154

119155
@Override

0 commit comments

Comments
 (0)