Skip to content

Commit f020321

Browse files
committed
spring-projectsGH-8582:Transactional support in PostgresSubscribableChannel
1 parent 6d7ee46 commit f020321

File tree

4 files changed

+210
-29
lines changed

4 files changed

+210
-29
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
*
6060
* @author Rafael Winterhalter
6161
* @author Artem Bilan
62+
* @author Igor Lovich
6263
*
6364
* @since 6.0
6465
*/
@@ -149,6 +150,8 @@ public synchronized void start() {
149150
this.executor = executorToUse;
150151
}
151152
this.latch = new CountDownLatch(1);
153+
154+
CountDownLatch startingLatch = new CountDownLatch(1);
152155
this.future = executorToUse.submit(() -> {
153156
try {
154157
while (isActive()) {
@@ -171,6 +174,8 @@ public synchronized void start() {
171174
try {
172175
this.connection = conn;
173176
while (isActive()) {
177+
startingLatch.countDown();
178+
174179
PGNotification[] notifications = conn.getNotifications(0);
175180
// Unfortunately, there is no good way of interrupting a notification
176181
// poll but by closing its connection.
@@ -208,6 +213,18 @@ public synchronized void start() {
208213
this.latch.countDown();
209214
}
210215
});
216+
217+
try {
218+
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
219+
throw new IllegalStateException("Failed to start "
220+
+ PostgresChannelMessageTableSubscriber.class.getName());
221+
}
222+
}
223+
catch (InterruptedException e) {
224+
Thread.currentThread().interrupt();
225+
throw new IllegalStateException("Failed to start "
226+
+ PostgresChannelMessageTableSubscriber.class.getName(), e);
227+
}
211228
}
212229

213230
private boolean isActive() {

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,20 @@
1616

1717
package org.springframework.integration.jdbc.channel;
1818

19+
import java.util.Optional;
1920
import java.util.concurrent.Executor;
2021

22+
import org.springframework.core.log.LogAccessor;
2123
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2224
import org.springframework.integration.channel.AbstractSubscribableChannel;
2325
import org.springframework.integration.dispatcher.MessageDispatcher;
2426
import org.springframework.integration.dispatcher.UnicastingDispatcher;
2527
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
2628
import org.springframework.messaging.Message;
2729
import org.springframework.messaging.MessageHandler;
30+
import org.springframework.retry.support.RetryTemplate;
31+
import org.springframework.transaction.PlatformTransactionManager;
32+
import org.springframework.transaction.support.TransactionTemplate;
2833
import org.springframework.util.Assert;
2934

3035
/**
@@ -39,19 +44,28 @@
3944
*
4045
* @author Rafael Winterhalter
4146
* @author Artem Bilan
47+
* @author Igor Lovich
4248
*
4349
* @since 6.0
4450
*/
4551
public class PostgresSubscribableChannel extends AbstractSubscribableChannel
4652
implements PostgresChannelMessageTableSubscriber.Subscription {
4753

54+
private static final LogAccessor LOGGER = new LogAccessor(PostgresSubscribableChannel.class);
55+
4856
private final JdbcChannelMessageStore jdbcChannelMessageStore;
4957

58+
private TransactionTemplate transactionTemplate;
59+
5060
private final Object groupId;
5161

5262
private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
5363

54-
private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
64+
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
65+
66+
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
67+
68+
private Executor executor = new SimpleAsyncTaskExecutor();
5569

5670
/**
5771
* Create a subscribable channel for a Postgres database.
@@ -75,7 +89,26 @@ public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageSto
7589
*/
7690
public void setDispatcherExecutor(Executor executor) {
7791
Assert.notNull(executor, "An executor must be provided.");
78-
this.dispatcher = new UnicastingDispatcher(executor);
92+
this.executor = executor;
93+
}
94+
95+
/**
96+
* Sets the transaction manager to use for message processing. Each message will be processed in a
97+
* separate transaction
98+
* @param transactionManager The transaction manager to use
99+
*/
100+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
101+
Assert.notNull(transactionManager, "A platform transaction manager must be provided.");
102+
this.transactionTemplate = new TransactionTemplate(transactionManager);
103+
}
104+
105+
/**
106+
* Sets retry template to use for retries in case of exception in downstream processing
107+
* @param retryTemplate The retry template to use
108+
*/
109+
public void setRetryTemplate(RetryTemplate retryTemplate) {
110+
Assert.notNull(retryTemplate, "A retry template must be provided.");
111+
this.retryTemplate = retryTemplate;
79112
}
80113

81114
@Override
@@ -110,10 +143,37 @@ protected boolean doSend(Message<?> message, long timeout) {
110143

111144
@Override
112145
public void notifyUpdate() {
113-
Message<?> message;
114-
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
115-
this.dispatcher.dispatch(message);
116-
}
146+
this.executor.execute(() -> {
147+
try {
148+
Optional<Message<?>> dispatchedMessage;
149+
150+
do {
151+
if (this.transactionTemplate != null) {
152+
dispatchedMessage = this.retryTemplate.execute(context ->
153+
this.transactionTemplate.execute(status ->
154+
Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
155+
.map(this::dispatch)
156+
)
157+
);
158+
}
159+
else {
160+
dispatchedMessage = Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
161+
.map(message ->
162+
this.retryTemplate.execute(context -> dispatch(message))
163+
);
164+
}
165+
166+
} while (dispatchedMessage.isPresent());
167+
}
168+
catch (Exception e) {
169+
LOGGER.error(e, "Exception during message dispatch");
170+
}
171+
});
172+
}
173+
174+
private Message<?> dispatch(Message<?> message) {
175+
this.dispatcher.dispatch(message);
176+
return message;
117177
}
118178

119179
@Override

0 commit comments

Comments
 (0)