Skip to content

GH-3272: Support lease renewal for distributed locks #3317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

/**
* A {@link LockRegistry} implementing this interface supports the renewal of the time to live of a lock
*
* @author Alexandre Strubel
*
* @since 5.4
*/
public interface RenewableLockRegistry extends LockRegistry {

/**
* Renew the time to live of the lock is associated with the parameter object.
* The lock must be held by the current thread
* @param lockKey The object with which the lock is associated.
*/
void renewLock(Object lockKey);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@
* @author Artem Bilan
* @author Glenn Renfro
* @author Gary Russell
* @author Alexandre Strubel
*
* @since 4.3
*/
Expand Down Expand Up @@ -179,4 +180,9 @@ private void deleteExpired(String lock) {
new Date(System.currentTimeMillis() - this.ttl));
}

@Override
public boolean renew(String lock) {
return this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.integration.support.locks.RenewableLockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.transaction.TransactionTimedOutException;
import org.springframework.util.Assert;
Expand All @@ -49,10 +50,11 @@
* @author Bartosz Rempuszewski
* @author Gary Russell
* @author Alexandre Strubel
* @author Stefan Vassilev
*
* @since 4.3
*/
public class JdbcLockRegistry implements ExpirableLockRegistry {
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {

private static final int DEFAULT_IDLE = 100;

Expand Down Expand Up @@ -101,6 +103,19 @@ public void expireUnusedOlderThan(long age) {
}
}

@Override
public void renewLock(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = pathFor((String) lockKey);
JdbcLock jdbcLock = this.locks.get(path);
if (jdbcLock == null) {
throw new IllegalStateException("Could not found mutex at " + path);
}
if (!jdbcLock.renew()) {
throw new IllegalStateException("Could not renew mutex at " + path);
}
}

private static final class JdbcLock implements Lock {

private final LockRepository mutex;
Expand Down Expand Up @@ -232,7 +247,7 @@ private boolean doLock() {
@Override
public void unlock() {
if (!this.delegate.isHeldByCurrentThread()) {
throw new IllegalMonitorStateException("You do not own mutex at " + this.path);
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
}
if (this.delegate.getHoldCount() > 1) {
this.delegate.unlock();
Expand All @@ -243,7 +258,7 @@ public void unlock() {
this.mutex.delete(this.path);
return;
}
catch (TransientDataAccessException e) {
catch (TransientDataAccessException | TransactionTimedOutException e) {
// try again
}
catch (Exception e) {
Expand All @@ -264,6 +279,27 @@ public boolean isAcquiredInThisProcess() {
return this.mutex.isAcquired(this.path);
}

public boolean renew() {
if (!this.delegate.isHeldByCurrentThread()) {
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
}
while (true) {
try {
boolean renewed = this.mutex.renew(this.path);
if (renewed) {
this.lastUsed = System.currentTimeMillis();
}
return renewed;
}
catch (TransientDataAccessException | TransactionTimedOutException e) {
// try again
}
catch (Exception e) {
throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.path, e);
}
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
* has to be declared as a bean.
*
* @author Dave Syer
* @author Alexandre Strubel
*
* @since 4.3
*/
public interface LockRepository extends Closeable {
Expand All @@ -34,6 +36,8 @@ public interface LockRepository extends Closeable {

boolean acquire(String lock);

boolean renew(String lock);

@Override
void close();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,7 @@
* @author Dave Syer
* @author Artem Bilan
* @author Glenn Renfro
* @author Alexandre Strubel
*
* @since 4.3
*/
Expand Down Expand Up @@ -276,4 +277,110 @@ public void testExclusiveAccess() throws Exception {
}
}

@Test
public void testOutOfDateLockTaken() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(500);
client1.afterPropertiesSet();
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.afterPropertiesSet();
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
lock1.lockInterruptibly();
Thread.sleep(500);
new SimpleAsyncTaskExecutor()
.execute(() -> {
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
try {
latch1.countDown();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
lock2.lockInterruptibly();
stopWatch.stop();
data.add(1);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
lock2.unlock();
}
latch2.countDown();
});
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
data.add(2);
lock1.unlock();
for (int i = 0; i < 2; i++) {
Integer integer = data.poll(10, TimeUnit.SECONDS);
assertThat(integer).isNotNull();
assertThat(integer.intValue()).isEqualTo(i + 1);
}
}

@Test
public void testRenewLock() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(500);
client1.afterPropertiesSet();
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.afterPropertiesSet();
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
Lock lock1 = registry.obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch1 = new CountDownLatch(2);
final CountDownLatch latch2 = new CountDownLatch(1);
lock1.lockInterruptibly();
new SimpleAsyncTaskExecutor()
.execute(() -> {
Lock lock2 = new JdbcLockRegistry(client2).obtain("foo");
try {
latch1.countDown();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
lock2.lockInterruptibly();
stopWatch.stop();
data.add(4);
Thread.sleep(10);
data.add(5);
Thread.sleep(10);
data.add(6);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
lock2.unlock();
}
});
new SimpleAsyncTaskExecutor()
.execute(() -> {
try {
latch1.countDown();
Thread.sleep(1000);
data.add(1);
Thread.sleep(100);
data.add(2);
Thread.sleep(100);
data.add(3);
latch2.countDown();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
while (latch2.getCount() > 0) {
Thread.sleep(100);
registry.renewLock("foo");
}
lock1.unlock();
for (int i = 0; i < 6; i++) {
Integer integer = data.poll(10, TimeUnit.SECONDS);
assertThat(integer).isNotNull();
assertThat(integer.intValue()).isEqualTo(i + 1);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -40,6 +41,7 @@
/**
* @author Dave Syer
* @author Artem Bilan
* @author Stefan Vassilev
*
* @since 4.3
*/
Expand Down Expand Up @@ -168,7 +170,7 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception {
lock1.unlock();
Object ise = result.get(10, TimeUnit.SECONDS);
assertThat(ise).isInstanceOf(IllegalMonitorStateException.class);
assertThat(((Exception) ise).getMessage()).contains("You do not own");
assertThat(((Exception) ise).getMessage()).contains("own");
}

@Test
Expand Down Expand Up @@ -263,7 +265,25 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
lock.unlock();
Object imse = result.get(10, TimeUnit.SECONDS);
assertThat(imse).isInstanceOf(IllegalMonitorStateException.class);
assertThat(((Exception) imse).getMessage()).contains("You do not own");
assertThat(((Exception) imse).getMessage()).contains("own");
}

@Test
public void testLockRenew() {
final Lock lock = this.registry.obtain("foo");

assertThat(lock.tryLock()).isTrue();
try {
registry.renewLock("foo");
} finally {
lock.unlock();
}
}

@Test
public void testLockRenewLockNotOwned() {
this.registry.obtain("foo");

Assertions.assertThrows(IllegalMonitorStateException.class, () -> registry.renewLock("foo"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer AssertJ assertions instead: assertThatExceptionOfType()

}
}
5 changes: 5 additions & 0 deletions src/reference/asciidoc/jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,11 @@ If so, you can specify the `id` to be associated with the `DefaultLockRepository
Starting with version 5.1.8, the `JdbcLockRegistry` can be configured with the `idleBetweenTries` - a `Duration` to sleep between lock record insert/update executions.
By default it is `100` milliseconds and in some environments non-leaders pollute connections with data source too often.

Starting with version 5.4, the `RenewableLockRegistry` interface has been introduced and added to `JdbcLockRegistry`.
The `renewLock()` method must be called during locked process in case of the locked process would be longer than time to live of the lock.
So the time to live can be highly reduce and deployments can retake a lost lock quickly.
NB. The lock renewal can be done only if the lock is held by the current thread, so the locked process has to be executed in another thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what is this NB about.
Can we have a more official language in the docs, please?

Copy link
Contributor

@garyrussell garyrussell Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB (latin Nota Bene - "note well") is commonly used in real (British) English - it is not common in the USA.

Use AsciiDoctor NOTE: instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right! Didn't think through. This sentence is really NOTE:.

Thank you, Gary!

OK. So, I'm pulling this locally and fixing the latest simple concerns on merge.


[[jdbc-metadata-store]]
=== JDBC Metadata Store

Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ See <<./r2dbc.adoc#r2dbc,R2DBC Support>> for more information.
The Channel Adapters for Redis Stream support have been introduced.
See <<./redis.adoc#redis-stream-outbound,Redis Stream Outbound Channel Adapter>> for more information.

==== Renewable Lock Registry

A Renewable lock registry has been introduced to allow renew lease of a distributed lock.
See <<./jdbc.adoc#jdbc-lock-registry,JDBC implementation>> for more information.

[[x5.4-general]]
=== General Changes

Expand Down