Skip to content

Commit 4b57363

Browse files
GH-3733 Configure TxManager for DefLockRepository (#3782)
* GH-3733 Configure TxManager for DefLockRepository Fixes #3733 The `@Transactional` resolves a primary `TransactionManager` bean from the application context which might not be sufficient for all the use-case. To make it work with the custom (or specific) `TransactionManager` we have to extend a `DefaultLockRepository` and override all those `@Transactional` method * Change the logic of the `DefaultLockRepository` from `@Transactional` to the `TransactionTemplate` and use provided `TransactionManager` or resolve one from the application context * Adjust tests to use explicit `TransactionManager` and call `afterSingletonsInstantiated()` to initialize a default `TransactionTemplate` * Mention the change in the docs * * Extracted all the `TransactionTemplate`s to the properties for caching * Add `BeanInitializationException` for no-unique `PlatformTransactionManager` bean in the `afterSingletonsInstantiated()` * Fix language in the exception message Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent cd84f16 commit 4b57363

File tree

6 files changed

+139
-47
lines changed

6 files changed

+139
-47
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

+99-36
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,22 +21,26 @@
2121

2222
import javax.sql.DataSource;
2323

24+
import org.springframework.beans.BeansException;
25+
import org.springframework.beans.factory.BeanInitializationException;
2426
import org.springframework.beans.factory.InitializingBean;
25-
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.beans.factory.SmartInitializingSingleton;
28+
import org.springframework.context.ApplicationContext;
29+
import org.springframework.context.ApplicationContextAware;
2630
import org.springframework.dao.DuplicateKeyException;
2731
import org.springframework.jdbc.core.JdbcTemplate;
28-
import org.springframework.stereotype.Repository;
29-
import org.springframework.transaction.annotation.Isolation;
30-
import org.springframework.transaction.annotation.Propagation;
31-
import org.springframework.transaction.annotation.Transactional;
32+
import org.springframework.transaction.PlatformTransactionManager;
33+
import org.springframework.transaction.TransactionDefinition;
34+
import org.springframework.transaction.support.DefaultTransactionDefinition;
35+
import org.springframework.transaction.support.TransactionTemplate;
3236
import org.springframework.util.Assert;
3337

3438
/**
3539
* The default implementation of the {@link LockRepository} based on the
3640
* table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}.
3741
* <p>
3842
* This repository can't be shared between different {@link JdbcLockRegistry} instances.
39-
* Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
43+
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
4044
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
4145
* for local synchronizations.
4246
*
@@ -49,8 +53,8 @@
4953
*
5054
* @since 4.3
5155
*/
52-
@Repository
53-
public class DefaultLockRepository implements LockRepository, InitializingBean {
56+
public class DefaultLockRepository
57+
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
5458

5559
/**
5660
* Default value for the table prefix property.
@@ -89,12 +93,21 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {
8993

9094
private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";
9195

96+
private ApplicationContext applicationContext;
97+
98+
private PlatformTransactionManager transactionManager;
99+
100+
private TransactionTemplate defaultTransactionTemplate;
101+
102+
private TransactionTemplate readOnlyTransactionTemplate;
103+
104+
private TransactionTemplate serializableTransactionTemplate;
105+
92106
/**
93107
* Constructor that initializes the client id that will be associated for
94108
* all the locks persisted by the store instance to a random {@link UUID}.
95109
* @param dataSource the {@link DataSource} used to maintain the lock repository.
96110
*/
97-
@Autowired
98111
public DefaultLockRepository(DataSource dataSource) {
99112
this(dataSource, UUID.randomUUID().toString());
100113
}
@@ -124,21 +137,37 @@ public void setRegion(String region) {
124137
}
125138

126139
/**
127-
* Specify the prefix for target data base table used from queries.
128-
* @param prefix the prefix to set (default INT_).
140+
* Specify the prefix for target database table used from queries.
141+
* @param prefix the prefix to set (default {@code INT_}).
129142
*/
130143
public void setPrefix(String prefix) {
131144
this.prefix = prefix;
132145
}
133146

134147
/**
135-
* Specify the time (in milliseconds) to expire dead locks.
136-
* @param timeToLive the time to expire dead locks.
148+
* Specify the time (in milliseconds) to expire dead-locks.
149+
* @param timeToLive the time to expire dead-locks.
137150
*/
138151
public void setTimeToLive(int timeToLive) {
139152
this.ttl = timeToLive;
140153
}
141154

155+
/**
156+
* Set a {@link PlatformTransactionManager} for operations.
157+
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
158+
* from the application context.
159+
* @param transactionManager the {@link PlatformTransactionManager} to use.
160+
* @since 6.0
161+
*/
162+
public void setTransactionManager(PlatformTransactionManager transactionManager) {
163+
this.transactionManager = transactionManager;
164+
}
165+
166+
@Override
167+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
168+
this.applicationContext = applicationContext;
169+
}
170+
142171
@Override
143172
public void afterPropertiesSet() {
144173
this.deleteQuery = String.format(this.deleteQuery, this.prefix);
@@ -150,50 +179,84 @@ public void afterPropertiesSet() {
150179
this.renewQuery = String.format(this.renewQuery, this.prefix);
151180
}
152181

153-
@Transactional(propagation = Propagation.REQUIRES_NEW)
182+
@Override
183+
public void afterSingletonsInstantiated() {
184+
if (this.transactionManager == null) {
185+
try {
186+
this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class);
187+
}
188+
catch (BeansException ex) {
189+
throw new BeanInitializationException(
190+
"A unique or primary 'PlatformTransactionManager' bean " +
191+
"must be present in the application context.", ex);
192+
}
193+
}
194+
195+
DefaultTransactionDefinition transactionDefinition =
196+
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
197+
198+
this.defaultTransactionTemplate =
199+
new TransactionTemplate(this.transactionManager, transactionDefinition);
200+
201+
// It is safe to reuse the transactionDefinition - the TransactionTemplate makes copy of its properties.
202+
transactionDefinition.setReadOnly(true);
203+
204+
this.readOnlyTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
205+
206+
transactionDefinition.setReadOnly(false);
207+
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
208+
209+
this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
210+
}
211+
154212
@Override
155213
public void close() {
156-
this.template.update(this.deleteAllQuery, this.region, this.id);
214+
this.defaultTransactionTemplate.executeWithoutResult(
215+
transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id));
157216
}
158217

159-
@Transactional(propagation = Propagation.REQUIRES_NEW)
160218
@Override
161219
public void delete(String lock) {
162-
this.template.update(this.deleteQuery, this.region, lock, this.id);
220+
this.defaultTransactionTemplate.executeWithoutResult(
221+
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
163222
}
164223

165-
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
166224
@Override
167225
public boolean acquire(String lock) {
168-
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
169-
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
170-
return true;
171-
}
172-
try {
173-
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
174-
}
175-
catch (DuplicateKeyException e) {
176-
return false;
177-
}
226+
return this.serializableTransactionTemplate.execute(transactionStatus -> {
227+
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
228+
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
229+
return true;
230+
}
231+
try {
232+
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
233+
}
234+
catch (DuplicateKeyException e) {
235+
return false;
236+
}
237+
});
178238
}
179239

180-
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
181240
@Override
182241
public boolean isAcquired(String lock) {
183-
return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null
184-
this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1;
242+
return this.readOnlyTransactionTemplate.execute(transactionStatus ->
243+
this.template.queryForObject(this.countQuery, // NOSONAR query never returns null
244+
Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl))
245+
== 1);
185246
}
186247

187-
@Transactional(propagation = Propagation.REQUIRES_NEW)
188248
@Override
189249
public void deleteExpired() {
190-
this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl));
250+
this.defaultTransactionTemplate.executeWithoutResult(
251+
transactionStatus ->
252+
this.template.update(this.deleteExpiredQuery, this.region,
253+
new Date(System.currentTimeMillis() - this.ttl)));
191254
}
192255

193-
@Transactional(propagation = Propagation.REQUIRES_NEW)
194256
@Override
195257
public boolean renew(String lock) {
196-
return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0;
258+
return this.defaultTransactionTemplate.execute(
259+
transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0);
197260
}
198261

199262
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,17 +23,18 @@
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
2525

26-
import org.junit.AfterClass;
27-
import org.junit.BeforeClass;
28-
import org.junit.Ignore;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Disabled;
29+
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
3232
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
3333
import org.springframework.integration.leader.Context;
3434
import org.springframework.integration.leader.DefaultCandidate;
3535
import org.springframework.integration.leader.event.LeaderEventPublisher;
3636
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
37+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
3738
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
3839
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
3940
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests {
4950

5051
public static EmbeddedDatabase dataSource;
5152

52-
@BeforeClass
53+
@BeforeAll
5354
public static void init() {
5455
dataSource = new EmbeddedDatabaseBuilder()
5556
.setType(EmbeddedDatabaseType.H2)
@@ -58,7 +59,7 @@ public static void init() {
5859
.build();
5960
}
6061

61-
@AfterClass
62+
@AfterAll
6263
public static void destroy() {
6364
dataSource.shutdown();
6465
}
@@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception {
7071
List<LockRegistryLeaderInitiator> initiators = new ArrayList<>();
7172
for (int i = 0; i < 2; i++) {
7273
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
74+
lockRepository.setTransactionManager(new DataSourceTransactionManager(dataSource));
7375
lockRepository.afterPropertiesSet();
76+
lockRepository.afterSingletonsInstantiated();
7477
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(
7578
new JdbcLockRegistry(lockRepository),
7679
new DefaultCandidate("foo", "bar"));
@@ -170,7 +173,7 @@ public void publishOnRevoked(Object source, Context context, String role) {
170173
}
171174

172175
@Test
173-
@Ignore("Looks like an embedded DBd is not fully cleared if we don't close application context")
176+
@Disabled("Looks like an embedded DBd is not fully cleared if we don't close application context")
174177
public void testLostConnection() throws InterruptedException {
175178
CountDownLatch granted = new CountDownLatch(1);
176179
CountingPublisher countingPublisher = new CountingPublisher(granted);

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void clear() {
7979
this.registry.expireUnusedOlderThan(0);
8080
this.client.close();
8181
this.child = new AnnotationConfigApplicationContext();
82-
this.child.register(DefaultLockRepository.class);
82+
this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource);
8383
this.child.setParent(this.context);
8484
this.child.refresh();
8585
}
@@ -195,7 +195,9 @@ public void testOnlyOneLock() throws Exception {
195195
for (int j = 0; j < 20; j++) {
196196
Callable<Boolean> task = () -> {
197197
DefaultLockRepository client = new DefaultLockRepository(this.dataSource);
198+
client.setApplicationContext(this.context);
198199
client.afterPropertiesSet();
200+
client.afterSingletonsInstantiated();
199201
Lock lock = new JdbcLockRegistry(client).obtain("foo");
200202
try {
201203
if (locked.isEmpty() && lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {
@@ -231,9 +233,13 @@ public void testOnlyOneLock() throws Exception {
231233
@Test
232234
public void testExclusiveAccess() throws Exception {
233235
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
236+
client1.setApplicationContext(this.context);
234237
client1.afterPropertiesSet();
235-
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
238+
client1.afterSingletonsInstantiated();
239+
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
240+
client2.setApplicationContext(this.context);
236241
client2.afterPropertiesSet();
242+
client2.afterSingletonsInstantiated();
237243
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
238244
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
239245
final CountDownLatch latch1 = new CountDownLatch(1);
@@ -278,10 +284,14 @@ public void testExclusiveAccess() throws Exception {
278284
public void testOutOfDateLockTaken() throws Exception {
279285
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
280286
client1.setTimeToLive(100);
287+
client1.setApplicationContext(this.context);
281288
client1.afterPropertiesSet();
289+
client1.afterSingletonsInstantiated();
282290
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
283291
client2.setTimeToLive(100);
292+
client2.setApplicationContext(this.context);
284293
client2.afterPropertiesSet();
294+
client2.afterSingletonsInstantiated();
285295
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
286296
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
287297
final CountDownLatch latch = new CountDownLatch(1);
@@ -316,10 +326,14 @@ public void testOutOfDateLockTaken() throws Exception {
316326
public void testRenewLock() throws Exception {
317327
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
318328
client1.setTimeToLive(500);
329+
client1.setApplicationContext(this.context);
319330
client1.afterPropertiesSet();
331+
client1.afterSingletonsInstantiated();
320332
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
321333
client2.setTimeToLive(500);
334+
client2.setApplicationContext(this.context);
322335
client2.afterPropertiesSet();
336+
client2.afterSingletonsInstantiated();
323337
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
324338
Lock lock1 = registry.obtain("foo");
325339
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import org.junit.jupiter.api.Test;
3737

3838
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.ApplicationContext;
3940
import org.springframework.core.task.AsyncTaskExecutor;
4041
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4142
import org.springframework.integration.test.util.TestUtils;
@@ -67,6 +68,9 @@ public class JdbcLockRegistryTests {
6768
@Autowired
6869
private DataSource dataSource;
6970

71+
@Autowired
72+
private ApplicationContext context;
73+
7074
@BeforeEach
7175
public void clear() {
7276
this.registry.expireUnusedOlderThan(0);
@@ -143,7 +147,9 @@ public void testReentrantLockInterruptibly() throws Exception {
143147
public void testReentrantLockAfterExpiration() throws Exception {
144148
DefaultLockRepository client = new DefaultLockRepository(dataSource);
145149
client.setTimeToLive(1);
150+
client.setApplicationContext(this.context);
146151
client.afterPropertiesSet();
152+
client.afterSingletonsInstantiated();
147153
JdbcLockRegistry registry = new JdbcLockRegistry(client);
148154
Lock lock1 = registry.obtain("foo");
149155
assertThat(lock1.tryLock()).isTrue();

src/reference/asciidoc/jdbc.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,8 @@ NOTE: The lock renewal can be done only if the lock is held by the current threa
10921092
String with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
10931093
See its JavaDocs for more information.
10941094

1095+
String with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.
1096+
10951097
[[jdbc-metadata-store]]
10961098
=== JDBC Metadata Store
10971099

0 commit comments

Comments
 (0)