diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java new file mode 100644 index 00000000000..7185e2b3d07 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.locks; + +/** + * A {@link LockRegistry} implementing this interface supports the renewal of the time to live of a lock + * + * @author Alexandre Strubel + * + * @since 5.4 + */ +public interface RenewableLockRegistry extends LockRegistry { + + /** + * Renew the time to live of the lock is associated with the parameter object. + * The lock must be held by the current thread + * @param lockKey The object with which the lock is associated. + */ + void renewLock(Object lockKey); + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java index 60a8be51397..f12cf1106b7 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ * @author Artem Bilan * @author Glenn Renfro * @author Gary Russell + * @author Alexandre Strubel * * @since 4.3 */ @@ -179,4 +180,9 @@ private void deleteExpired(String lock) { new Date(System.currentTimeMillis() - this.ttl)); } + @Override + public boolean renew(String lock) { + return this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java index 658ed302e5f..9bede5f10c2 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java @@ -30,6 +30,7 @@ import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.dao.TransientDataAccessException; import org.springframework.integration.support.locks.ExpirableLockRegistry; +import org.springframework.integration.support.locks.RenewableLockRegistry; import org.springframework.integration.util.UUIDConverter; import org.springframework.transaction.TransactionTimedOutException; import org.springframework.util.Assert; @@ -49,10 +50,11 @@ * @author Bartosz Rempuszewski * @author Gary Russell * @author Alexandre Strubel + * @author Stefan Vassilev * * @since 4.3 */ -public class JdbcLockRegistry implements ExpirableLockRegistry { +public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry { private static final int DEFAULT_IDLE = 100; @@ -101,6 +103,19 @@ public void expireUnusedOlderThan(long age) { } } + @Override + public void renewLock(Object lockKey) { + Assert.isInstanceOf(String.class, lockKey); + String path = pathFor((String) lockKey); + JdbcLock jdbcLock = this.locks.get(path); + if (jdbcLock == null) { + throw new IllegalStateException("Could not found mutex at " + path); + } + if (!jdbcLock.renew()) { + throw new IllegalStateException("Could not renew mutex at " + path); + } + } + private static final class JdbcLock implements Lock { private final LockRepository mutex; @@ -232,7 +247,7 @@ private boolean doLock() { @Override public void unlock() { if (!this.delegate.isHeldByCurrentThread()) { - throw new IllegalMonitorStateException("You do not own mutex at " + this.path); + throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path); } if (this.delegate.getHoldCount() > 1) { this.delegate.unlock(); @@ -243,7 +258,7 @@ public void unlock() { this.mutex.delete(this.path); return; } - catch (TransientDataAccessException e) { + catch (TransientDataAccessException | TransactionTimedOutException e) { // try again } catch (Exception e) { @@ -264,6 +279,27 @@ public boolean isAcquiredInThisProcess() { return this.mutex.isAcquired(this.path); } + public boolean renew() { + if (!this.delegate.isHeldByCurrentThread()) { + throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path); + } + while (true) { + try { + boolean renewed = this.mutex.renew(this.path); + if (renewed) { + this.lastUsed = System.currentTimeMillis(); + } + return renewed; + } + catch (TransientDataAccessException | TransactionTimedOutException e) { + // try again + } + catch (Exception e) { + throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.path, e); + } + } + } + } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java index cbe2aed50a1..efe26d09f8f 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ * has to be declared as a bean. * * @author Dave Syer + * @author Alexandre Strubel + * * @since 4.3 */ public interface LockRepository extends Closeable { @@ -34,6 +36,8 @@ public interface LockRepository extends Closeable { boolean acquire(String lock); + boolean renew(String lock); + @Override void close(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java index e37b7e78901..dc694cc485e 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ * @author Dave Syer * @author Artem Bilan * @author Glenn Renfro + * @author Alexandre Strubel * * @since 4.3 */ @@ -276,4 +277,110 @@ public void testExclusiveAccess() throws Exception { } } + @Test + public void testOutOfDateLockTaken() throws Exception { + DefaultLockRepository client1 = new DefaultLockRepository(dataSource); + client1.setTimeToLive(500); + client1.afterPropertiesSet(); + final DefaultLockRepository client2 = new DefaultLockRepository(dataSource); + client2.afterPropertiesSet(); + Lock lock1 = new JdbcLockRegistry(client1).obtain("foo"); + final BlockingQueue data = new LinkedBlockingQueue<>(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + lock1.lockInterruptibly(); + Thread.sleep(500); + new SimpleAsyncTaskExecutor() + .execute(() -> { + Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + try { + latch1.countDown(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + lock2.lockInterruptibly(); + stopWatch.stop(); + data.add(1); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + lock2.unlock(); + } + latch2.countDown(); + }); + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue(); + data.add(2); + lock1.unlock(); + for (int i = 0; i < 2; i++) { + Integer integer = data.poll(10, TimeUnit.SECONDS); + assertThat(integer).isNotNull(); + assertThat(integer.intValue()).isEqualTo(i + 1); + } + } + + @Test + public void testRenewLock() throws Exception { + DefaultLockRepository client1 = new DefaultLockRepository(dataSource); + client1.setTimeToLive(500); + client1.afterPropertiesSet(); + final DefaultLockRepository client2 = new DefaultLockRepository(dataSource); + client2.afterPropertiesSet(); + JdbcLockRegistry registry = new JdbcLockRegistry(client1); + Lock lock1 = registry.obtain("foo"); + final BlockingQueue data = new LinkedBlockingQueue<>(); + final CountDownLatch latch1 = new CountDownLatch(2); + final CountDownLatch latch2 = new CountDownLatch(1); + lock1.lockInterruptibly(); + new SimpleAsyncTaskExecutor() + .execute(() -> { + Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + try { + latch1.countDown(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + lock2.lockInterruptibly(); + stopWatch.stop(); + data.add(4); + Thread.sleep(10); + data.add(5); + Thread.sleep(10); + data.add(6); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + lock2.unlock(); + } + }); + new SimpleAsyncTaskExecutor() + .execute(() -> { + try { + latch1.countDown(); + Thread.sleep(1000); + data.add(1); + Thread.sleep(100); + data.add(2); + Thread.sleep(100); + data.add(3); + latch2.countDown(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); + while (latch2.getCount() > 0) { + Thread.sleep(100); + registry.renewLock("foo"); + } + lock1.unlock(); + for (int i = 0; i < 6; i++) { + Integer integer = data.poll(10, TimeUnit.SECONDS); + assertThat(integer).isNotNull(); + assertThat(integer.intValue()).isEqualTo(i + 1); + } + } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java index 44d1fefd389..7f170f3e046 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -40,6 +41,7 @@ /** * @author Dave Syer * @author Artem Bilan + * @author Stefan Vassilev * * @since 4.3 */ @@ -168,7 +170,7 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception { lock1.unlock(); Object ise = result.get(10, TimeUnit.SECONDS); assertThat(ise).isInstanceOf(IllegalMonitorStateException.class); - assertThat(((Exception) ise).getMessage()).contains("You do not own"); + assertThat(((Exception) ise).getMessage()).contains("own"); } @Test @@ -263,7 +265,25 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception { lock.unlock(); Object imse = result.get(10, TimeUnit.SECONDS); assertThat(imse).isInstanceOf(IllegalMonitorStateException.class); - assertThat(((Exception) imse).getMessage()).contains("You do not own"); + assertThat(((Exception) imse).getMessage()).contains("own"); } + @Test + public void testLockRenew() { + final Lock lock = this.registry.obtain("foo"); + + assertThat(lock.tryLock()).isTrue(); + try { + registry.renewLock("foo"); + } finally { + lock.unlock(); + } + } + + @Test + public void testLockRenewLockNotOwned() { + this.registry.obtain("foo"); + + Assertions.assertThrows(IllegalMonitorStateException.class, () -> registry.renewLock("foo")); + } } diff --git a/src/reference/asciidoc/jdbc.adoc b/src/reference/asciidoc/jdbc.adoc index 30390a75632..81279807eb5 100644 --- a/src/reference/asciidoc/jdbc.adoc +++ b/src/reference/asciidoc/jdbc.adoc @@ -1085,6 +1085,11 @@ If so, you can specify the `id` to be associated with the `DefaultLockRepository Starting with version 5.1.8, the `JdbcLockRegistry` can be configured with the `idleBetweenTries` - a `Duration` to sleep between lock record insert/update executions. By default it is `100` milliseconds and in some environments non-leaders pollute connections with data source too often. +Starting with version 5.4, the `RenewableLockRegistry` interface has been introduced and added to `JdbcLockRegistry`. +The `renewLock()` method must be called during locked process in case of the locked process would be longer than time to live of the lock. +So the time to live can be highly reduce and deployments can retake a lost lock quickly. +NB. The lock renewal can be done only if the lock is held by the current thread, so the locked process has to be executed in another thread. + [[jdbc-metadata-store]] === JDBC Metadata Store diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index be4ec644be9..82e01e8f33d 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -30,6 +30,11 @@ See <<./r2dbc.adoc#r2dbc,R2DBC Support>> for more information. The Channel Adapters for Redis Stream support have been introduced. See <<./redis.adoc#redis-stream-outbound,Redis Stream Outbound Channel Adapter>> for more information. +==== Renewable Lock Registry + +A Renewable lock registry has been introduced to allow renew lease of a distributed lock. +See <<./jdbc.adoc#jdbc-lock-registry,JDBC implementation>> for more information. + [[x5.4-general]] === General Changes