Skip to content

GH-3733 Configure TxManager for DefLockRepository #3782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,22 +21,25 @@

import javax.sql.DataSource;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

/**
* The default implementation of the {@link LockRepository} based on the
* table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}.
* <p>
* This repository can't be shared between different {@link JdbcLockRegistry} instances.
* Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
* for local synchronizations.
*
Expand All @@ -49,8 +52,8 @@
*
* @since 4.3
*/
@Repository
public class DefaultLockRepository implements LockRepository, InitializingBean {
public class DefaultLockRepository
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {

/**
* Default value for the table prefix property.
Expand Down Expand Up @@ -89,12 +92,17 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {

private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";

private ApplicationContext applicationContext;

private PlatformTransactionManager transactionManager;

private TransactionTemplate defaultTransactionTemplate;

/**
* Constructor that initializes the client id that will be associated for
* all the locks persisted by the store instance to a random {@link UUID}.
* @param dataSource the {@link DataSource} used to maintain the lock repository.
*/
@Autowired
public DefaultLockRepository(DataSource dataSource) {
this(dataSource, UUID.randomUUID().toString());
}
Expand Down Expand Up @@ -124,21 +132,37 @@ public void setRegion(String region) {
}

/**
* Specify the prefix for target data base table used from queries.
* @param prefix the prefix to set (default INT_).
* Specify the prefix for target database table used from queries.
* @param prefix the prefix to set (default {@code INT_}).
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}

/**
* Specify the time (in milliseconds) to expire dead locks.
* @param timeToLive the time to expire dead locks.
* Specify the time (in milliseconds) to expire dead-locks.
* @param timeToLive the time to expire dead-locks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = timeToLive;
}

/**
* Set a {@link PlatformTransactionManager} for operations.
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
* from the application context.
* @param transactionManager the {@link PlatformTransactionManager} to use.
* @since 6.0
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
this.deleteQuery = String.format(this.deleteQuery, this.prefix);
Expand All @@ -150,50 +174,72 @@ public void afterPropertiesSet() {
this.renewQuery = String.format(this.renewQuery, this.prefix);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void afterSingletonsInstantiated() {
if (this.transactionManager == null) {
this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe try/catch with a friendly message to indicate if there is not one unique TM in the context, the registry needs one to be passed in?

}
this.defaultTransactionTemplate =
new TransactionTemplate(this.transactionManager,
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW));
}

@Override
public void close() {
this.template.update(this.deleteAllQuery, this.region, this.id);
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void delete(String lock) {
this.template.update(this.deleteQuery, this.region, lock, this.id);
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
}

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
@Override
public boolean acquire(String lock) {
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
}
catch (DuplicateKeyException e) {
return false;
}
DefaultTransactionDefinition transactionDefinition =
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a field?

return new TransactionTemplate(this.transactionManager, transactionDefinition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we store this template in a field too?

.execute(transactionStatus -> {
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
}
catch (DuplicateKeyException e) {
return false;
}
});
}

@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
@Override
public boolean isAcquired(String lock) {
return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null
this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1;
DefaultTransactionDefinition transactionDefinition =
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
transactionDefinition.setReadOnly(true);
return new TransactionTemplate(this.transactionManager, transactionDefinition)
.execute(transactionStatus ->
this.template.queryForObject(this.countQuery, // NOSONAR query never returns null
Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl))
== 1);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void deleteExpired() {
this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl));
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus ->
this.template.update(this.deleteExpiredQuery, this.region,
new Date(System.currentTimeMillis() - this.ttl)));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public boolean renew(String lock) {
return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0;
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,17 +23,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.leader.Context;
import org.springframework.integration.leader.DefaultCandidate;
import org.springframework.integration.leader.event.LeaderEventPublisher;
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
Expand All @@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests {

public static EmbeddedDatabase dataSource;

@BeforeClass
@BeforeAll
public static void init() {
dataSource = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
Expand All @@ -58,7 +59,7 @@ public static void init() {
.build();
}

@AfterClass
@AfterAll
public static void destroy() {
dataSource.shutdown();
}
Expand All @@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception {
List<LockRegistryLeaderInitiator> initiators = new ArrayList<>();
for (int i = 0; i < 2; i++) {
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
lockRepository.setTransactionManager(new DataSourceTransactionManager(dataSource));
lockRepository.afterPropertiesSet();
lockRepository.afterSingletonsInstantiated();
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(
new JdbcLockRegistry(lockRepository),
new DefaultCandidate("foo", "bar"));
Expand Down Expand Up @@ -170,7 +173,7 @@ public void publishOnRevoked(Object source, Context context, String role) {
}

@Test
@Ignore("Looks like an embedded DBd is not fully cleared if we don't close application context")
@Disabled("Looks like an embedded DBd is not fully cleared if we don't close application context")
public void testLostConnection() throws InterruptedException {
CountDownLatch granted = new CountDownLatch(1);
CountingPublisher countingPublisher = new CountingPublisher(granted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void clear() {
this.registry.expireUnusedOlderThan(0);
this.client.close();
this.child = new AnnotationConfigApplicationContext();
this.child.register(DefaultLockRepository.class);
this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource);
this.child.setParent(this.context);
this.child.refresh();
}
Expand Down Expand Up @@ -195,7 +195,9 @@ public void testOnlyOneLock() throws Exception {
for (int j = 0; j < 20; j++) {
Callable<Boolean> task = () -> {
DefaultLockRepository client = new DefaultLockRepository(this.dataSource);
client.setApplicationContext(this.context);
client.afterPropertiesSet();
client.afterSingletonsInstantiated();
Lock lock = new JdbcLockRegistry(client).obtain("foo");
try {
if (locked.isEmpty() && lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {
Expand Down Expand Up @@ -231,9 +233,13 @@ public void testOnlyOneLock() throws Exception {
@Test
public void testExclusiveAccess() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch1 = new CountDownLatch(1);
Expand Down Expand Up @@ -278,10 +284,14 @@ public void testExclusiveAccess() throws Exception {
public void testOutOfDateLockTaken() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(100);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setTimeToLive(100);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -316,10 +326,14 @@ public void testOutOfDateLockTaken() throws Exception {
public void testRenewLock() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(500);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setTimeToLive(500);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
Lock lock1 = registry.obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class JdbcLockRegistryTests {
@Autowired
private DataSource dataSource;

@Autowired
private ApplicationContext context;

@BeforeEach
public void clear() {
this.registry.expireUnusedOlderThan(0);
Expand Down Expand Up @@ -143,7 +147,9 @@ public void testReentrantLockInterruptibly() throws Exception {
public void testReentrantLockAfterExpiration() throws Exception {
DefaultLockRepository client = new DefaultLockRepository(dataSource);
client.setTimeToLive(1);
client.setApplicationContext(this.context);
client.afterPropertiesSet();
client.afterSingletonsInstantiated();
JdbcLockRegistry registry = new JdbcLockRegistry(client);
Lock lock1 = registry.obtain("foo");
assertThat(lock1.tryLock()).isTrue();
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ NOTE: The lock renewal can be done only if the lock is held by the current threa
String with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
See its JavaDocs for more information.

String with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.

[[jdbc-metadata-store]]
=== JDBC Metadata Store

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ When providing a `RetryTemplate` on the inbound gateway or message-driven channe
In addition, the new `KafkaErrorMessageSendingRecoverer` is provided; this can be used with a `DefaultErrorHandler` to avoid issues with long aggregated retry delays causing partitions rebalances.

See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information.

=== JDBC Changes

The `DefaultLockRepository` can now be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.