diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java index ed69a487d2c..632130358ec 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +21,18 @@ import javax.sql.DataSource; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.dao.DuplicateKeyException; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Isolation; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; /** @@ -36,7 +40,7 @@ * table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}. *
* This repository can't be shared between different {@link JdbcLockRegistry} instances.
- * Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
+ * Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
* for local synchronizations.
*
@@ -49,8 +53,8 @@
*
* @since 4.3
*/
-@Repository
-public class DefaultLockRepository implements LockRepository, InitializingBean {
+public class DefaultLockRepository
+ implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
/**
* Default value for the table prefix property.
@@ -89,12 +93,21 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {
private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";
+ private ApplicationContext applicationContext;
+
+ private PlatformTransactionManager transactionManager;
+
+ private TransactionTemplate defaultTransactionTemplate;
+
+ private TransactionTemplate readOnlyTransactionTemplate;
+
+ private TransactionTemplate serializableTransactionTemplate;
+
/**
* Constructor that initializes the client id that will be associated for
* all the locks persisted by the store instance to a random {@link UUID}.
* @param dataSource the {@link DataSource} used to maintain the lock repository.
*/
- @Autowired
public DefaultLockRepository(DataSource dataSource) {
this(dataSource, UUID.randomUUID().toString());
}
@@ -124,21 +137,37 @@ public void setRegion(String region) {
}
/**
- * Specify the prefix for target data base table used from queries.
- * @param prefix the prefix to set (default INT_).
+ * Specify the prefix for target database table used from queries.
+ * @param prefix the prefix to set (default {@code INT_}).
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}
/**
- * Specify the time (in milliseconds) to expire dead locks.
- * @param timeToLive the time to expire dead locks.
+ * Specify the time (in milliseconds) to expire dead-locks.
+ * @param timeToLive the time to expire dead-locks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = timeToLive;
}
+ /**
+ * Set a {@link PlatformTransactionManager} for operations.
+ * Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
+ * from the application context.
+ * @param transactionManager the {@link PlatformTransactionManager} to use.
+ * @since 6.0
+ */
+ public void setTransactionManager(PlatformTransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
@Override
public void afterPropertiesSet() {
this.deleteQuery = String.format(this.deleteQuery, this.prefix);
@@ -150,50 +179,84 @@ public void afterPropertiesSet() {
this.renewQuery = String.format(this.renewQuery, this.prefix);
}
- @Transactional(propagation = Propagation.REQUIRES_NEW)
+ @Override
+ public void afterSingletonsInstantiated() {
+ if (this.transactionManager == null) {
+ try {
+ this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class);
+ }
+ catch (BeansException ex) {
+ throw new BeanInitializationException(
+ "A unique or primary 'PlatformTransactionManager' bean " +
+ "must be present in the application context.", ex);
+ }
+ }
+
+ DefaultTransactionDefinition transactionDefinition =
+ new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+
+ this.defaultTransactionTemplate =
+ new TransactionTemplate(this.transactionManager, transactionDefinition);
+
+ // It is safe to reuse the transactionDefinition - the TransactionTemplate makes copy of its properties.
+ transactionDefinition.setReadOnly(true);
+
+ this.readOnlyTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
+
+ transactionDefinition.setReadOnly(false);
+ transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
+
+ this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
+ }
+
@Override
public void close() {
- this.template.update(this.deleteAllQuery, this.region, this.id);
+ this.defaultTransactionTemplate.executeWithoutResult(
+ transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id));
}
- @Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void delete(String lock) {
- this.template.update(this.deleteQuery, this.region, lock, this.id);
+ this.defaultTransactionTemplate.executeWithoutResult(
+ transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
}
- @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
@Override
public boolean acquire(String lock) {
- if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
- new Date(System.currentTimeMillis() - this.ttl)) > 0) {
- return true;
- }
- try {
- return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
- }
- catch (DuplicateKeyException e) {
- return false;
- }
+ return this.serializableTransactionTemplate.execute(transactionStatus -> {
+ if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
+ new Date(System.currentTimeMillis() - this.ttl)) > 0) {
+ return true;
+ }
+ try {
+ return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
+ }
+ catch (DuplicateKeyException e) {
+ return false;
+ }
+ });
}
- @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
@Override
public boolean isAcquired(String lock) {
- return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null
- this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1;
+ return this.readOnlyTransactionTemplate.execute(transactionStatus ->
+ this.template.queryForObject(this.countQuery, // NOSONAR query never returns null
+ Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl))
+ == 1);
}
- @Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void deleteExpired() {
- this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl));
+ this.defaultTransactionTemplate.executeWithoutResult(
+ transactionStatus ->
+ this.template.update(this.deleteExpiredQuery, this.region,
+ new Date(System.currentTimeMillis() - this.ttl)));
}
- @Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public boolean renew(String lock) {
- return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0;
+ return this.defaultTransactionTemplate.execute(
+ transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0);
}
}
diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java
index 1fc10a33ce9..c544b57d19c 100644
--- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java
+++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
@@ -34,6 +34,7 @@
import org.springframework.integration.leader.DefaultCandidate;
import org.springframework.integration.leader.event.LeaderEventPublisher;
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests {
public static EmbeddedDatabase dataSource;
- @BeforeClass
+ @BeforeAll
public static void init() {
dataSource = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
@@ -58,7 +59,7 @@ public static void init() {
.build();
}
- @AfterClass
+ @AfterAll
public static void destroy() {
dataSource.shutdown();
}
@@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception {
List