Skip to content

Commit 1fd9694

Browse files
committed
spring-projectsGH-8582:Transactional support in PostgresSubscribableChannel
1 parent 6d7ee46 commit 1fd9694

File tree

4 files changed

+248
-42
lines changed

4 files changed

+248
-42
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.Future;
29+
import java.util.concurrent.RejectedExecutionException;
2930
import java.util.concurrent.TimeUnit;
3031

3132
import org.postgresql.PGNotification;
@@ -59,6 +60,7 @@
5960
*
6061
* @author Rafael Winterhalter
6162
* @author Artem Bilan
63+
* @author Igor Lovich
6264
*
6365
* @since 6.0
6466
*/
@@ -77,6 +79,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
7779

7880
private CountDownLatch latch = new CountDownLatch(0);
7981

82+
private boolean userProvidedExecutor = false;
83+
8084
private Future<?> future = CompletableFuture.completedFuture(null);
8185

8286
@Nullable
@@ -143,12 +147,17 @@ public synchronized void start() {
143147
ExecutorService executorToUse = this.executor;
144148
if (executorToUse == null) {
145149
CustomizableThreadFactory threadFactory =
146-
new CustomizableThreadFactory("postgres-channel-message-table-subscriber-");
150+
new CustomizableThreadFactory("postgres-channel-notifications-");
147151
threadFactory.setDaemon(true);
148-
executorToUse = Executors.newSingleThreadExecutor(threadFactory);
152+
executorToUse = Executors.newFixedThreadPool(2, threadFactory);
149153
this.executor = executorToUse;
150154
}
155+
else {
156+
this.userProvidedExecutor = true;
157+
}
151158
this.latch = new CountDownLatch(1);
159+
160+
CountDownLatch startingLatch = new CountDownLatch(1);
152161
this.future = executorToUse.submit(() -> {
153162
try {
154163
while (isActive()) {
@@ -166,11 +175,13 @@ public synchronized void start() {
166175
}
167176
throw ex;
168177
}
169-
this.subscriptionsMap.values()
170-
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
178+
this.subscriptionsMap.values().forEach(this::notifyAll);
179+
171180
try {
172181
this.connection = conn;
173182
while (isActive()) {
183+
startingLatch.countDown();
184+
174185
PGNotification[] notifications = conn.getNotifications(0);
175186
// Unfortunately, there is no good way of interrupting a notification
176187
// poll but by closing its connection.
@@ -184,9 +195,7 @@ public synchronized void start() {
184195
if (subscriptions == null) {
185196
continue;
186197
}
187-
for (Subscription subscription : subscriptions) {
188-
subscription.notifyUpdate();
189-
}
198+
notifyAll(subscriptions);
190199
}
191200
}
192201
}
@@ -208,6 +217,29 @@ public synchronized void start() {
208217
this.latch.countDown();
209218
}
210219
});
220+
221+
try {
222+
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
223+
throw new IllegalStateException("Failed to start "
224+
+ PostgresChannelMessageTableSubscriber.class.getName());
225+
}
226+
}
227+
catch (InterruptedException e) {
228+
Thread.currentThread().interrupt();
229+
throw new IllegalStateException("Failed to start "
230+
+ PostgresChannelMessageTableSubscriber.class.getName(), e);
231+
}
232+
}
233+
234+
private void notifyAll(Set<Subscription> subscriptions) {
235+
subscriptions.forEach(it -> {
236+
try {
237+
this.executor.submit(it::notifyUpdate);
238+
}
239+
catch (RejectedExecutionException e) {
240+
LOGGER.warn(e, "Executor rejected submission of notification task");
241+
}
242+
});
211243
}
212244

213245
private boolean isActive() {
@@ -232,6 +264,11 @@ public synchronized void stop() {
232264
catch (SQLException ignored) {
233265
}
234266
}
267+
268+
if (!this.userProvidedExecutor) {
269+
shutdownAndAwaitTermination(this.executor);
270+
}
271+
235272
try {
236273
if (!this.latch.await(5, TimeUnit.SECONDS)) {
237274
throw new IllegalStateException("Failed to stop "
@@ -242,6 +279,35 @@ public synchronized void stop() {
242279
}
243280
}
244281

282+
283+
/**
284+
* Gracefully shutdown an executor service. Taken from @see ExecutorService javadoc
285+
*
286+
* @param pool The pool to shut down
287+
*/
288+
private void shutdownAndAwaitTermination(@Nullable ExecutorService pool) {
289+
if (pool == null) {
290+
return;
291+
}
292+
pool.shutdown(); // Disable new tasks from being submitted
293+
try {
294+
// Wait a while for existing tasks to terminate
295+
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
296+
pool.shutdownNow(); // Cancel currently executing tasks
297+
// Wait a while for tasks to respond to being cancelled
298+
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
299+
LOGGER.warn("Unable to shutdown the executor service");
300+
}
301+
}
302+
}
303+
catch (InterruptedException ie) {
304+
// (Re-)Cancel if current thread also interrupted
305+
pool.shutdownNow();
306+
// Preserve interrupt status
307+
Thread.currentThread().interrupt();
308+
}
309+
}
310+
245311
@Override
246312
public boolean isRunning() {
247313
return this.latch.getCount() > 0;

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,17 @@
1616

1717
package org.springframework.integration.jdbc.channel;
1818

19-
import java.util.concurrent.Executor;
19+
import java.util.Optional;
2020

21-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2221
import org.springframework.integration.channel.AbstractSubscribableChannel;
2322
import org.springframework.integration.dispatcher.MessageDispatcher;
2423
import org.springframework.integration.dispatcher.UnicastingDispatcher;
2524
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
2625
import org.springframework.messaging.Message;
2726
import org.springframework.messaging.MessageHandler;
27+
import org.springframework.retry.support.RetryTemplate;
28+
import org.springframework.transaction.PlatformTransactionManager;
29+
import org.springframework.transaction.support.TransactionTemplate;
2830
import org.springframework.util.Assert;
2931

3032
/**
@@ -39,6 +41,7 @@
3941
*
4042
* @author Rafael Winterhalter
4143
* @author Artem Bilan
44+
* @author Igor Lovich
4245
*
4346
* @since 6.0
4447
*/
@@ -47,11 +50,15 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
4750

4851
private final JdbcChannelMessageStore jdbcChannelMessageStore;
4952

53+
private TransactionTemplate transactionTemplate;
54+
5055
private final Object groupId;
5156

5257
private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
5358

54-
private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
59+
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
60+
61+
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
5562

5663
/**
5764
* Create a subscribable channel for a Postgres database.
@@ -70,12 +77,22 @@ public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageSto
7077
}
7178

7279
/**
73-
* Set the executor to use for dispatching newly received messages.
74-
* @param executor The executor to use.
80+
* Sets the transaction manager to use for message processing. Each message will be processed in a
81+
* separate transaction
82+
* @param transactionManager The transaction manager to use
7583
*/
76-
public void setDispatcherExecutor(Executor executor) {
77-
Assert.notNull(executor, "An executor must be provided.");
78-
this.dispatcher = new UnicastingDispatcher(executor);
84+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
85+
Assert.notNull(transactionManager, "A platform transaction manager must be provided.");
86+
this.transactionTemplate = new TransactionTemplate(transactionManager);
87+
}
88+
89+
/**
90+
* Sets retry template to use for retries in case of exception in downstream processing
91+
* @param retryTemplate The retry template to use
92+
*/
93+
public void setRetryTemplate(RetryTemplate retryTemplate) {
94+
Assert.notNull(retryTemplate, "A retry template must be provided.");
95+
this.retryTemplate = retryTemplate;
7996
}
8097

8198
@Override
@@ -110,10 +127,29 @@ protected boolean doSend(Message<?> message, long timeout) {
110127

111128
@Override
112129
public void notifyUpdate() {
113-
Message<?> message;
114-
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
115-
this.dispatcher.dispatch(message);
116-
}
130+
Optional<Message<?>> dispatchedMessage;
131+
132+
do {
133+
if (this.transactionTemplate != null) {
134+
dispatchedMessage = this.retryTemplate.execute(context ->
135+
this.transactionTemplate.execute(status ->
136+
Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
137+
.map(this::dispatch)
138+
)
139+
);
140+
}
141+
else {
142+
dispatchedMessage = Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
143+
.map(message ->
144+
this.retryTemplate.execute(context -> dispatch(message))
145+
);
146+
}
147+
} while (dispatchedMessage.isPresent());
148+
}
149+
150+
private Message<?> dispatch(Message<?> message) {
151+
this.dispatcher.dispatch(message);
152+
return message;
117153
}
118154

119155
@Override

0 commit comments

Comments
 (0)