diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java index ed69a487d2c..632130358ec 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +21,18 @@ import javax.sql.DataSource; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.dao.DuplicateKeyException; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Isolation; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; /** @@ -36,7 +40,7 @@ * table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}. *

* This repository can't be shared between different {@link JdbcLockRegistry} instances. - * Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract, + * Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract, * where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s * for local synchronizations. * @@ -49,8 +53,8 @@ * * @since 4.3 */ -@Repository -public class DefaultLockRepository implements LockRepository, InitializingBean { +public class DefaultLockRepository + implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton { /** * Default value for the table prefix property. @@ -89,12 +93,21 @@ public class DefaultLockRepository implements LockRepository, InitializingBean { private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?"; + private ApplicationContext applicationContext; + + private PlatformTransactionManager transactionManager; + + private TransactionTemplate defaultTransactionTemplate; + + private TransactionTemplate readOnlyTransactionTemplate; + + private TransactionTemplate serializableTransactionTemplate; + /** * Constructor that initializes the client id that will be associated for * all the locks persisted by the store instance to a random {@link UUID}. * @param dataSource the {@link DataSource} used to maintain the lock repository. */ - @Autowired public DefaultLockRepository(DataSource dataSource) { this(dataSource, UUID.randomUUID().toString()); } @@ -124,21 +137,37 @@ public void setRegion(String region) { } /** - * Specify the prefix for target data base table used from queries. - * @param prefix the prefix to set (default INT_). + * Specify the prefix for target database table used from queries. + * @param prefix the prefix to set (default {@code INT_}). */ public void setPrefix(String prefix) { this.prefix = prefix; } /** - * Specify the time (in milliseconds) to expire dead locks. - * @param timeToLive the time to expire dead locks. + * Specify the time (in milliseconds) to expire dead-locks. + * @param timeToLive the time to expire dead-locks. */ public void setTimeToLive(int timeToLive) { this.ttl = timeToLive; } + /** + * Set a {@link PlatformTransactionManager} for operations. + * Otherwise, a primary {@link PlatformTransactionManager} bean is obtained + * from the application context. + * @param transactionManager the {@link PlatformTransactionManager} to use. + * @since 6.0 + */ + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + @Override public void afterPropertiesSet() { this.deleteQuery = String.format(this.deleteQuery, this.prefix); @@ -150,50 +179,84 @@ public void afterPropertiesSet() { this.renewQuery = String.format(this.renewQuery, this.prefix); } - @Transactional(propagation = Propagation.REQUIRES_NEW) + @Override + public void afterSingletonsInstantiated() { + if (this.transactionManager == null) { + try { + this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class); + } + catch (BeansException ex) { + throw new BeanInitializationException( + "A unique or primary 'PlatformTransactionManager' bean " + + "must be present in the application context.", ex); + } + } + + DefaultTransactionDefinition transactionDefinition = + new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + this.defaultTransactionTemplate = + new TransactionTemplate(this.transactionManager, transactionDefinition); + + // It is safe to reuse the transactionDefinition - the TransactionTemplate makes copy of its properties. + transactionDefinition.setReadOnly(true); + + this.readOnlyTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition); + + transactionDefinition.setReadOnly(false); + transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE); + + this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition); + } + @Override public void close() { - this.template.update(this.deleteAllQuery, this.region, this.id); + this.defaultTransactionTemplate.executeWithoutResult( + transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id)); } - @Transactional(propagation = Propagation.REQUIRES_NEW) @Override public void delete(String lock) { - this.template.update(this.deleteQuery, this.region, lock, this.id); + this.defaultTransactionTemplate.executeWithoutResult( + transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)); } - @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE) @Override public boolean acquire(String lock) { - if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id, - new Date(System.currentTimeMillis() - this.ttl)) > 0) { - return true; - } - try { - return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0; - } - catch (DuplicateKeyException e) { - return false; - } + return this.serializableTransactionTemplate.execute(transactionStatus -> { + if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id, + new Date(System.currentTimeMillis() - this.ttl)) > 0) { + return true; + } + try { + return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0; + } + catch (DuplicateKeyException e) { + return false; + } + }); } - @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true) @Override public boolean isAcquired(String lock) { - return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null - this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1; + return this.readOnlyTransactionTemplate.execute(transactionStatus -> + this.template.queryForObject(this.countQuery, // NOSONAR query never returns null + Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) + == 1); } - @Transactional(propagation = Propagation.REQUIRES_NEW) @Override public void deleteExpired() { - this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl)); + this.defaultTransactionTemplate.executeWithoutResult( + transactionStatus -> + this.template.update(this.deleteExpiredQuery, this.region, + new Date(System.currentTimeMillis() - this.ttl))); } - @Transactional(propagation = Propagation.REQUIRES_NEW) @Override public boolean renew(String lock) { - return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0; + return this.defaultTransactionTemplate.execute( + transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0); } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java index 1fc10a33ce9..c544b57d19c 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,10 +23,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.springframework.integration.jdbc.lock.DefaultLockRepository; import org.springframework.integration.jdbc.lock.JdbcLockRegistry; @@ -34,6 +34,7 @@ import org.springframework.integration.leader.DefaultCandidate; import org.springframework.integration.leader.event.LeaderEventPublisher; import org.springframework.integration.support.leader.LockRegistryLeaderInitiator; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; @@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests { public static EmbeddedDatabase dataSource; - @BeforeClass + @BeforeAll public static void init() { dataSource = new EmbeddedDatabaseBuilder() .setType(EmbeddedDatabaseType.H2) @@ -58,7 +59,7 @@ public static void init() { .build(); } - @AfterClass + @AfterAll public static void destroy() { dataSource.shutdown(); } @@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception { List initiators = new ArrayList<>(); for (int i = 0; i < 2; i++) { DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource); + lockRepository.setTransactionManager(new DataSourceTransactionManager(dataSource)); lockRepository.afterPropertiesSet(); + lockRepository.afterSingletonsInstantiated(); LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator( new JdbcLockRegistry(lockRepository), new DefaultCandidate("foo", "bar")); @@ -170,7 +173,7 @@ public void publishOnRevoked(Object source, Context context, String role) { } @Test - @Ignore("Looks like an embedded DBd is not fully cleared if we don't close application context") + @Disabled("Looks like an embedded DBd is not fully cleared if we don't close application context") public void testLostConnection() throws InterruptedException { CountDownLatch granted = new CountDownLatch(1); CountingPublisher countingPublisher = new CountingPublisher(granted); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java index 70b771ee437..5c44ba09d11 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java @@ -79,7 +79,7 @@ public void clear() { this.registry.expireUnusedOlderThan(0); this.client.close(); this.child = new AnnotationConfigApplicationContext(); - this.child.register(DefaultLockRepository.class); + this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource); this.child.setParent(this.context); this.child.refresh(); } @@ -195,7 +195,9 @@ public void testOnlyOneLock() throws Exception { for (int j = 0; j < 20; j++) { Callable task = () -> { DefaultLockRepository client = new DefaultLockRepository(this.dataSource); + client.setApplicationContext(this.context); client.afterPropertiesSet(); + client.afterSingletonsInstantiated(); Lock lock = new JdbcLockRegistry(client).obtain("foo"); try { if (locked.isEmpty() && lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) { @@ -231,9 +233,13 @@ public void testOnlyOneLock() throws Exception { @Test public void testExclusiveAccess() throws Exception { DefaultLockRepository client1 = new DefaultLockRepository(dataSource); + client1.setApplicationContext(this.context); client1.afterPropertiesSet(); - final DefaultLockRepository client2 = new DefaultLockRepository(dataSource); + client1.afterSingletonsInstantiated(); + DefaultLockRepository client2 = new DefaultLockRepository(dataSource); + client2.setApplicationContext(this.context); client2.afterPropertiesSet(); + client2.afterSingletonsInstantiated(); Lock lock1 = new JdbcLockRegistry(client1).obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch1 = new CountDownLatch(1); @@ -278,10 +284,14 @@ public void testExclusiveAccess() throws Exception { public void testOutOfDateLockTaken() throws Exception { DefaultLockRepository client1 = new DefaultLockRepository(dataSource); client1.setTimeToLive(100); + client1.setApplicationContext(this.context); client1.afterPropertiesSet(); + client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); client2.setTimeToLive(100); + client2.setApplicationContext(this.context); client2.afterPropertiesSet(); + client2.afterSingletonsInstantiated(); Lock lock1 = new JdbcLockRegistry(client1).obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch = new CountDownLatch(1); @@ -316,10 +326,14 @@ public void testOutOfDateLockTaken() throws Exception { public void testRenewLock() throws Exception { DefaultLockRepository client1 = new DefaultLockRepository(dataSource); client1.setTimeToLive(500); + client1.setApplicationContext(this.context); client1.afterPropertiesSet(); + client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); client2.setTimeToLive(500); + client2.setApplicationContext(this.context); client2.afterPropertiesSet(); + client2.afterSingletonsInstantiated(); JdbcLockRegistry registry = new JdbcLockRegistry(client1); Lock lock1 = registry.obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java index a1ea3cffc2e..40b79128d83 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.test.util.TestUtils; @@ -67,6 +68,9 @@ public class JdbcLockRegistryTests { @Autowired private DataSource dataSource; + @Autowired + private ApplicationContext context; + @BeforeEach public void clear() { this.registry.expireUnusedOlderThan(0); @@ -143,7 +147,9 @@ public void testReentrantLockInterruptibly() throws Exception { public void testReentrantLockAfterExpiration() throws Exception { DefaultLockRepository client = new DefaultLockRepository(dataSource); client.setTimeToLive(1); + client.setApplicationContext(this.context); client.afterPropertiesSet(); + client.afterSingletonsInstantiated(); JdbcLockRegistry registry = new JdbcLockRegistry(client); Lock lock1 = registry.obtain("foo"); assertThat(lock1.tryLock()).isTrue(); diff --git a/src/reference/asciidoc/jdbc.adoc b/src/reference/asciidoc/jdbc.adoc index 0237c2e7dd8..dd3f0bae51b 100644 --- a/src/reference/asciidoc/jdbc.adoc +++ b/src/reference/asciidoc/jdbc.adoc @@ -1092,6 +1092,8 @@ NOTE: The lock renewal can be done only if the lock is held by the current threa String with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`. See its JavaDocs for more information. +String with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context. + [[jdbc-metadata-store]] === JDBC Metadata Store diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 57628155758..46b678cd256 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -48,3 +48,7 @@ When providing a `RetryTemplate` on the inbound gateway or message-driven channe In addition, the new `KafkaErrorMessageSendingRecoverer` is provided; this can be used with a `DefaultErrorHandler` to avoid issues with long aggregated retry delays causing partitions rebalances. See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information. + +=== JDBC Changes + +The `DefaultLockRepository` can now be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context. \ No newline at end of file