Skip to content

Commit 04dd9f7

Browse files
committed
spring-projectsGH-3733 Configure TxManager for DefLockRepository
Fixes spring-projects#3733 The `@Transactional` resolves a primary `TransactionManager` bean from the application context which might not be sufficient for all the use-case. To make it work with the custom (or specific) `TransactionManager` we have to extend a `DefaultLockRepository` and override all those `@Transactional` method * Change the logic of the `DefaultLockRepository` from `@Transactional` to the `TransactionTemplate` and use provided `TransactionManager` or resolve one from the application context * Adjust tests to use explicit `TransactionManager` and call `afterSingletonsInstantiated()` to initialize a default `TransactionTemplate` * Mention the change in the docs
1 parent cd84f16 commit 04dd9f7

File tree

6 files changed

+122
-47
lines changed

6 files changed

+122
-47
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,22 +21,25 @@
2121

2222
import javax.sql.DataSource;
2323

24+
import org.springframework.beans.BeansException;
2425
import org.springframework.beans.factory.InitializingBean;
25-
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.beans.factory.SmartInitializingSingleton;
27+
import org.springframework.context.ApplicationContext;
28+
import org.springframework.context.ApplicationContextAware;
2629
import org.springframework.dao.DuplicateKeyException;
2730
import org.springframework.jdbc.core.JdbcTemplate;
28-
import org.springframework.stereotype.Repository;
29-
import org.springframework.transaction.annotation.Isolation;
30-
import org.springframework.transaction.annotation.Propagation;
31-
import org.springframework.transaction.annotation.Transactional;
31+
import org.springframework.transaction.PlatformTransactionManager;
32+
import org.springframework.transaction.TransactionDefinition;
33+
import org.springframework.transaction.support.DefaultTransactionDefinition;
34+
import org.springframework.transaction.support.TransactionTemplate;
3235
import org.springframework.util.Assert;
3336

3437
/**
3538
* The default implementation of the {@link LockRepository} based on the
3639
* table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}.
3740
* <p>
3841
* This repository can't be shared between different {@link JdbcLockRegistry} instances.
39-
* Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
42+
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
4043
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
4144
* for local synchronizations.
4245
*
@@ -49,8 +52,8 @@
4952
*
5053
* @since 4.3
5154
*/
52-
@Repository
53-
public class DefaultLockRepository implements LockRepository, InitializingBean {
55+
public class DefaultLockRepository
56+
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
5457

5558
/**
5659
* Default value for the table prefix property.
@@ -89,12 +92,17 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {
8992

9093
private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";
9194

95+
private ApplicationContext applicationContext;
96+
97+
private PlatformTransactionManager transactionManager;
98+
99+
private TransactionTemplate defaultTransactionTemplate;
100+
92101
/**
93102
* Constructor that initializes the client id that will be associated for
94103
* all the locks persisted by the store instance to a random {@link UUID}.
95104
* @param dataSource the {@link DataSource} used to maintain the lock repository.
96105
*/
97-
@Autowired
98106
public DefaultLockRepository(DataSource dataSource) {
99107
this(dataSource, UUID.randomUUID().toString());
100108
}
@@ -124,21 +132,37 @@ public void setRegion(String region) {
124132
}
125133

126134
/**
127-
* Specify the prefix for target data base table used from queries.
128-
* @param prefix the prefix to set (default INT_).
135+
* Specify the prefix for target database table used from queries.
136+
* @param prefix the prefix to set (default {@code INT_}).
129137
*/
130138
public void setPrefix(String prefix) {
131139
this.prefix = prefix;
132140
}
133141

134142
/**
135-
* Specify the time (in milliseconds) to expire dead locks.
136-
* @param timeToLive the time to expire dead locks.
143+
* Specify the time (in milliseconds) to expire dead-locks.
144+
* @param timeToLive the time to expire dead-locks.
137145
*/
138146
public void setTimeToLive(int timeToLive) {
139147
this.ttl = timeToLive;
140148
}
141149

150+
/**
151+
* Set a {@link PlatformTransactionManager} for operations.
152+
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
153+
* from the application context.
154+
* @param transactionManager the {@link PlatformTransactionManager} to use.
155+
* @since 6.0
156+
*/
157+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
158+
this.transactionManager = transactionManager;
159+
}
160+
161+
@Override
162+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
163+
this.applicationContext = applicationContext;
164+
}
165+
142166
@Override
143167
public void afterPropertiesSet() {
144168
this.deleteQuery = String.format(this.deleteQuery, this.prefix);
@@ -150,50 +174,72 @@ public void afterPropertiesSet() {
150174
this.renewQuery = String.format(this.renewQuery, this.prefix);
151175
}
152176

153-
@Transactional(propagation = Propagation.REQUIRES_NEW)
177+
@Override
178+
public void afterSingletonsInstantiated() {
179+
if (this.transactionManager == null) {
180+
this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class);
181+
}
182+
this.defaultTransactionTemplate =
183+
new TransactionTemplate(this.transactionManager,
184+
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW));
185+
}
186+
154187
@Override
155188
public void close() {
156-
this.template.update(this.deleteAllQuery, this.region, this.id);
189+
this.defaultTransactionTemplate.executeWithoutResult(
190+
transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id));
157191
}
158192

159-
@Transactional(propagation = Propagation.REQUIRES_NEW)
160193
@Override
161194
public void delete(String lock) {
162-
this.template.update(this.deleteQuery, this.region, lock, this.id);
195+
this.defaultTransactionTemplate.executeWithoutResult(
196+
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
163197
}
164198

165-
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
166199
@Override
167200
public boolean acquire(String lock) {
168-
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
169-
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
170-
return true;
171-
}
172-
try {
173-
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
174-
}
175-
catch (DuplicateKeyException e) {
176-
return false;
177-
}
201+
DefaultTransactionDefinition transactionDefinition =
202+
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
203+
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
204+
return new TransactionTemplate(this.transactionManager, transactionDefinition)
205+
.execute(transactionStatus -> {
206+
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
207+
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
208+
return true;
209+
}
210+
try {
211+
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
212+
}
213+
catch (DuplicateKeyException e) {
214+
return false;
215+
}
216+
});
178217
}
179218

180-
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
181219
@Override
182220
public boolean isAcquired(String lock) {
183-
return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null
184-
this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1;
221+
DefaultTransactionDefinition transactionDefinition =
222+
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
223+
transactionDefinition.setReadOnly(true);
224+
return new TransactionTemplate(this.transactionManager, transactionDefinition)
225+
.execute(transactionStatus ->
226+
this.template.queryForObject(this.countQuery, // NOSONAR query never returns null
227+
Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl))
228+
== 1);
185229
}
186230

187-
@Transactional(propagation = Propagation.REQUIRES_NEW)
188231
@Override
189232
public void deleteExpired() {
190-
this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl));
233+
this.defaultTransactionTemplate.executeWithoutResult(
234+
transactionStatus ->
235+
this.template.update(this.deleteExpiredQuery, this.region,
236+
new Date(System.currentTimeMillis() - this.ttl)));
191237
}
192238

193-
@Transactional(propagation = Propagation.REQUIRES_NEW)
194239
@Override
195240
public boolean renew(String lock) {
196-
return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0;
241+
return this.defaultTransactionTemplate.execute(
242+
transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0);
197243
}
198244

199245
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,17 +23,18 @@
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
2525

26-
import org.junit.AfterClass;
27-
import org.junit.BeforeClass;
28-
import org.junit.Ignore;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Disabled;
29+
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
3232
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
3333
import org.springframework.integration.leader.Context;
3434
import org.springframework.integration.leader.DefaultCandidate;
3535
import org.springframework.integration.leader.event.LeaderEventPublisher;
3636
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
37+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
3738
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
3839
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
3940
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests {
4950

5051
public static EmbeddedDatabase dataSource;
5152

52-
@BeforeClass
53+
@BeforeAll
5354
public static void init() {
5455
dataSource = new EmbeddedDatabaseBuilder()
5556
.setType(EmbeddedDatabaseType.H2)
@@ -58,7 +59,7 @@ public static void init() {
5859
.build();
5960
}
6061

61-
@AfterClass
62+
@AfterAll
6263
public static void destroy() {
6364
dataSource.shutdown();
6465
}
@@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception {
7071
List<LockRegistryLeaderInitiator> initiators = new ArrayList<>();
7172
for (int i = 0; i < 2; i++) {
7273
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
74+
lockRepository.setTransactionManager(new DataSourceTransactionManager(dataSource));
7375
lockRepository.afterPropertiesSet();
76+
lockRepository.afterSingletonsInstantiated();
7477
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(
7578
new JdbcLockRegistry(lockRepository),
7679
new DefaultCandidate("foo", "bar"));
@@ -170,7 +173,7 @@ public void publishOnRevoked(Object source, Context context, String role) {
170173
}
171174

172175
@Test
173-
@Ignore("Looks like an embedded DBd is not fully cleared if we don't close application context")
176+
@Disabled("Looks like an embedded DBd is not fully cleared if we don't close application context")
174177
public void testLostConnection() throws InterruptedException {
175178
CountDownLatch granted = new CountDownLatch(1);
176179
CountingPublisher countingPublisher = new CountingPublisher(granted);

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void clear() {
7979
this.registry.expireUnusedOlderThan(0);
8080
this.client.close();
8181
this.child = new AnnotationConfigApplicationContext();
82-
this.child.register(DefaultLockRepository.class);
82+
this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource);
8383
this.child.setParent(this.context);
8484
this.child.refresh();
8585
}
@@ -195,7 +195,9 @@ public void testOnlyOneLock() throws Exception {
195195
for (int j = 0; j < 20; j++) {
196196
Callable<Boolean> task = () -> {
197197
DefaultLockRepository client = new DefaultLockRepository(this.dataSource);
198+
client.setApplicationContext(this.context);
198199
client.afterPropertiesSet();
200+
client.afterSingletonsInstantiated();
199201
Lock lock = new JdbcLockRegistry(client).obtain("foo");
200202
try {
201203
if (locked.isEmpty() && lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {
@@ -231,9 +233,13 @@ public void testOnlyOneLock() throws Exception {
231233
@Test
232234
public void testExclusiveAccess() throws Exception {
233235
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
236+
client1.setApplicationContext(this.context);
234237
client1.afterPropertiesSet();
235-
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
238+
client1.afterSingletonsInstantiated();
239+
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
240+
client2.setApplicationContext(this.context);
236241
client2.afterPropertiesSet();
242+
client2.afterSingletonsInstantiated();
237243
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
238244
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
239245
final CountDownLatch latch1 = new CountDownLatch(1);
@@ -278,10 +284,14 @@ public void testExclusiveAccess() throws Exception {
278284
public void testOutOfDateLockTaken() throws Exception {
279285
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
280286
client1.setTimeToLive(100);
287+
client1.setApplicationContext(this.context);
281288
client1.afterPropertiesSet();
289+
client1.afterSingletonsInstantiated();
282290
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
283291
client2.setTimeToLive(100);
292+
client2.setApplicationContext(this.context);
284293
client2.afterPropertiesSet();
294+
client2.afterSingletonsInstantiated();
285295
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
286296
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
287297
final CountDownLatch latch = new CountDownLatch(1);
@@ -316,10 +326,14 @@ public void testOutOfDateLockTaken() throws Exception {
316326
public void testRenewLock() throws Exception {
317327
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
318328
client1.setTimeToLive(500);
329+
client1.setApplicationContext(this.context);
319330
client1.afterPropertiesSet();
331+
client1.afterSingletonsInstantiated();
320332
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
321333
client2.setTimeToLive(500);
334+
client2.setApplicationContext(this.context);
322335
client2.afterPropertiesSet();
336+
client2.afterSingletonsInstantiated();
323337
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
324338
Lock lock1 = registry.obtain("foo");
325339
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import org.junit.jupiter.api.Test;
3737

3838
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.ApplicationContext;
3940
import org.springframework.core.task.AsyncTaskExecutor;
4041
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4142
import org.springframework.integration.test.util.TestUtils;
@@ -67,6 +68,9 @@ public class JdbcLockRegistryTests {
6768
@Autowired
6869
private DataSource dataSource;
6970

71+
@Autowired
72+
private ApplicationContext context;
73+
7074
@BeforeEach
7175
public void clear() {
7276
this.registry.expireUnusedOlderThan(0);
@@ -143,7 +147,9 @@ public void testReentrantLockInterruptibly() throws Exception {
143147
public void testReentrantLockAfterExpiration() throws Exception {
144148
DefaultLockRepository client = new DefaultLockRepository(dataSource);
145149
client.setTimeToLive(1);
150+
client.setApplicationContext(this.context);
146151
client.afterPropertiesSet();
152+
client.afterSingletonsInstantiated();
147153
JdbcLockRegistry registry = new JdbcLockRegistry(client);
148154
Lock lock1 = registry.obtain("foo");
149155
assertThat(lock1.tryLock()).isTrue();

src/reference/asciidoc/jdbc.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,8 @@ NOTE: The lock renewal can be done only if the lock is held by the current threa
10921092
String with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
10931093
See its JavaDocs for more information.
10941094

1095+
String with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.
1096+
10951097
[[jdbc-metadata-store]]
10961098
=== JDBC Metadata Store
10971099

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,7 @@ When providing a `RetryTemplate` on the inbound gateway or message-driven channe
4848
In addition, the new `KafkaErrorMessageSendingRecoverer` is provided; this can be used with a `DefaultErrorHandler` to avoid issues with long aggregated retry delays causing partitions rebalances.
4949

5050
See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information.
51+
52+
=== JDBC Changes
53+
54+
The `DefaultLockRepository` can now be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.

0 commit comments

Comments
 (0)