Skip to content

Commit e39449b

Browse files
igorlovichartembilan
authored andcommitted
GH-8582: Add TX support for PostgresSubChannel
Fixes #8582 * Introduce a `PostgresSubscribableChannel.setTransactionManager()` to wrap a message polling and dispatching operation into a transaction * In addition add a `RetryTemplate` support around transaction attempts **Cherry-pick to `6.0.x`**
1 parent b326225 commit e39449b

File tree

4 files changed

+215
-50
lines changed

4 files changed

+215
-50
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
*
6060
* @author Rafael Winterhalter
6161
* @author Artem Bilan
62+
* @author Igor Lovich
6263
*
6364
* @since 6.0
6465
*/
@@ -149,6 +150,8 @@ public synchronized void start() {
149150
this.executor = executorToUse;
150151
}
151152
this.latch = new CountDownLatch(1);
153+
154+
CountDownLatch startingLatch = new CountDownLatch(1);
152155
this.future = executorToUse.submit(() -> {
153156
try {
154157
while (isActive()) {
@@ -171,6 +174,8 @@ public synchronized void start() {
171174
try {
172175
this.connection = conn;
173176
while (isActive()) {
177+
startingLatch.countDown();
178+
174179
PGNotification[] notifications = conn.getNotifications(0);
175180
// Unfortunately, there is no good way of interrupting a notification
176181
// poll but by closing its connection.
@@ -208,6 +213,16 @@ public synchronized void start() {
208213
this.latch.countDown();
209214
}
210215
});
216+
217+
try {
218+
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
219+
throw new IllegalStateException("Failed to start " + this);
220+
}
221+
}
222+
catch (InterruptedException ex) {
223+
Thread.currentThread().interrupt();
224+
throw new IllegalStateException("Failed to start " + this, ex);
225+
}
211226
}
212227

213228
private boolean isActive() {
@@ -234,8 +249,7 @@ public synchronized void stop() {
234249
}
235250
try {
236251
if (!this.latch.await(5, TimeUnit.SECONDS)) {
237-
throw new IllegalStateException("Failed to stop "
238-
+ PostgresChannelMessageTableSubscriber.class.getName());
252+
throw new IllegalStateException("Failed to stop " + this);
239253
}
240254
}
241255
catch (InterruptedException ignored) {

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,20 @@
1616

1717
package org.springframework.integration.jdbc.channel;
1818

19+
import java.util.Optional;
1920
import java.util.concurrent.Executor;
2021

22+
import org.springframework.core.log.LogAccessor;
2123
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2224
import org.springframework.integration.channel.AbstractSubscribableChannel;
2325
import org.springframework.integration.dispatcher.MessageDispatcher;
2426
import org.springframework.integration.dispatcher.UnicastingDispatcher;
2527
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
2628
import org.springframework.messaging.Message;
2729
import org.springframework.messaging.MessageHandler;
30+
import org.springframework.retry.support.RetryTemplate;
31+
import org.springframework.transaction.PlatformTransactionManager;
32+
import org.springframework.transaction.support.TransactionTemplate;
2833
import org.springframework.util.Assert;
2934

3035
/**
@@ -39,19 +44,28 @@
3944
*
4045
* @author Rafael Winterhalter
4146
* @author Artem Bilan
47+
* @author Igor Lovich
4248
*
4349
* @since 6.0
4450
*/
4551
public class PostgresSubscribableChannel extends AbstractSubscribableChannel
4652
implements PostgresChannelMessageTableSubscriber.Subscription {
4753

54+
private static final LogAccessor LOGGER = new LogAccessor(PostgresSubscribableChannel.class);
55+
4856
private final JdbcChannelMessageStore jdbcChannelMessageStore;
4957

5058
private final Object groupId;
5159

5260
private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
5361

54-
private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
62+
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
63+
64+
private TransactionTemplate transactionTemplate;
65+
66+
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
67+
68+
private Executor executor = new SimpleAsyncTaskExecutor();
5569

5670
/**
5771
* Create a subscribable channel for a Postgres database.
@@ -75,7 +89,30 @@ public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageSto
7589
*/
7690
public void setDispatcherExecutor(Executor executor) {
7791
Assert.notNull(executor, "An executor must be provided.");
78-
this.dispatcher = new UnicastingDispatcher(executor);
92+
this.executor = executor;
93+
}
94+
95+
/**
96+
* Set the transaction manager to use for message processing. Each message will be processed in a
97+
* separate transaction
98+
* @param transactionManager The transaction manager to use
99+
* @since 6.0.5
100+
* @see PlatformTransactionManager
101+
*/
102+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
103+
Assert.notNull(transactionManager, "A platform transaction manager must be provided.");
104+
this.transactionTemplate = new TransactionTemplate(transactionManager);
105+
}
106+
107+
/**
108+
* Set the retry template to use for retries in case of exception in downstream processing
109+
* @param retryTemplate The retry template to use
110+
* @since 6.0.5
111+
* @see RetryTemplate
112+
*/
113+
public void setRetryTemplate(RetryTemplate retryTemplate) {
114+
Assert.notNull(retryTemplate, "A retry template must be provided.");
115+
this.retryTemplate = retryTemplate;
79116
}
80117

81118
@Override
@@ -110,10 +147,37 @@ protected boolean doSend(Message<?> message, long timeout) {
110147

111148
@Override
112149
public void notifyUpdate() {
113-
Message<?> message;
114-
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
115-
this.dispatcher.dispatch(message);
116-
}
150+
this.executor.execute(() -> {
151+
try {
152+
Optional<Message<?>> dispatchedMessage;
153+
do {
154+
if (this.transactionTemplate != null) {
155+
dispatchedMessage =
156+
this.retryTemplate.execute(context ->
157+
this.transactionTemplate.execute(status ->
158+
pollMessage()
159+
.map(this::dispatch)));
160+
}
161+
else {
162+
dispatchedMessage =
163+
pollMessage()
164+
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
165+
}
166+
} while (dispatchedMessage.isPresent());
167+
}
168+
catch (Exception ex) {
169+
LOGGER.error(ex, "Exception during message dispatch");
170+
}
171+
});
172+
}
173+
174+
private Optional<Message<?>> pollMessage() {
175+
return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
176+
}
177+
178+
private Message<?> dispatch(Message<?> message) {
179+
this.dispatcher.dispatch(message);
180+
return message;
117181
}
118182

119183
@Override

0 commit comments

Comments
 (0)