From f2434ca24b3581e4572708f4fe73b431afea4c59 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Fri, 8 Nov 2024 17:29:18 +0300 Subject: [PATCH 1/8] feat: prepare hibernate release --- hibernate-dialect/CHANGELOG.md | 4 ++++ hibernate-dialect/pom.xml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hibernate-dialect/CHANGELOG.md b/hibernate-dialect/CHANGELOG.md index 87c003e..e7ec4e7 100644 --- a/hibernate-dialect/CHANGELOG.md +++ b/hibernate-dialect/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.2.0 ## + +- Added custom decimal jdbc codes `DECIMAL_31_9`, `DECIMAL_35_0`, `DECIMAL_35_9` + ## 1.1.0 ## - Added hint for scan queries diff --git a/hibernate-dialect/pom.xml b/hibernate-dialect/pom.xml index 3c8e9d7..9d39fba 100644 --- a/hibernate-dialect/pom.xml +++ b/hibernate-dialect/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects hibernate-ydb-dialect - 1.1.0 + 1.2.0 jar From 7682a1d1a092f9b7db3bae32b0aaf9afe2b141e8 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 19:59:28 +0300 Subject: [PATCH 2/8] feat: micro fix --- .../YdbCoordinationServiceLockProvider.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index d198486..4ef66c6 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -16,6 +16,7 @@ import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; import tech.ydb.coordination.settings.CoordinationSessionSettings; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; @@ -64,12 +65,27 @@ public Optional lock(LockConfiguration lockConfiguration) { var statusCS = coordinationSession.connect().join(); if (!statusCS.isSuccess()) { - logger.info("Failed creating coordination session [{}]", coordinationSession); + logger.warn("Failed creating coordination session [{}]", coordinationSession); return Optional.empty(); } - logger.info("Created coordination node session [{}]", coordinationSession); + logger.debug("Created coordination node session [{}]", coordinationSession); + + var describeResult = coordinationSession.describeSemaphore( + lockConfiguration.getName(), + DescribeSemaphoreMode.WITH_OWNERS + ).join(); + + if (describeResult.isSuccess()) { + var describe = describeResult.getValue(); + + if (!describe.getOwnersList().isEmpty()) { + var describePayload = new String(describe.getOwnersList().get(0).getData(), StandardCharsets.UTF_8); + + logger.info("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload); + } + } Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( lockConfiguration.getName(), @@ -81,12 +97,12 @@ public Optional lock(LockConfiguration lockConfiguration) { logger.debug(coordinationSession.toString()); if (semaphoreLease.isSuccess()) { - logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo, + logger.debug("Instance[{}] acquired semaphore [SemaphoreName={}]", instanceInfo, semaphoreLease.getValue().getSemaphoreName()); return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession)); } else { - logger.info("Instance[{}] did not acquire semaphore", instanceInfo); + logger.debug("Instance[{}] did not acquire semaphore", instanceInfo); return Optional.empty(); } @@ -96,7 +112,7 @@ private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo, CoordinationSession coordinationSession) implements SimpleLock { @Override public void unlock() { - logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); + logger.info("Instance[{}] released semaphore [SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); semaphoreLease.release().join(); From 9ec55bbd3c633ea8541bb5ed10383b949bbe96f9 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 15:05:54 +0300 Subject: [PATCH 3/8] feat: shedlock over jdbc --- shedlock-ydb/pom.xml | 16 +-- .../YdbCoordinationServiceLockProvider.java | 127 ------------------ .../lock/provider/YdbJDBCLockProvider.java | 89 ++++++++++++ .../YdbLockProviderConfiguration.java | 12 +- .../lock/provider/YdbLockProviderTest.java | 25 +++- 5 files changed, 117 insertions(+), 152 deletions(-) delete mode 100644 shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java create mode 100644 shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml index 543d217..3699fe0 100644 --- a/shedlock-ydb/pom.xml +++ b/shedlock-ydb/pom.xml @@ -38,8 +38,8 @@ 5.9.3 2.17.2 - 2.2.6 - 2.2.2 + 2.3.6 + 2.3.5 3.2.3 5.15.0 @@ -53,13 +53,6 @@ - - tech.ydb - ydb-sdk-bom - ${ydb.sdk.version} - pom - import - org.springframework.boot spring-boot-dependencies @@ -71,10 +64,6 @@ - - tech.ydb - ydb-sdk-coordination - net.javacrumbs.shedlock shedlock-spring @@ -101,6 +90,7 @@ tech.ydb.test ydb-junit5-support + ${ydb.sdk.version} test diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java deleted file mode 100644 index 4ef66c6..0000000 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ /dev/null @@ -1,127 +0,0 @@ -package tech.ydb.lock.provider; - -import java.nio.charset.StandardCharsets; -import java.sql.SQLException; -import java.time.Instant; -import java.util.Optional; -import javax.annotation.PreDestroy; -import net.javacrumbs.shedlock.core.LockConfiguration; -import net.javacrumbs.shedlock.core.LockProvider; -import net.javacrumbs.shedlock.core.SimpleLock; -import net.javacrumbs.shedlock.support.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.ydb.common.retry.RetryForever; -import tech.ydb.coordination.CoordinationClient; -import tech.ydb.coordination.CoordinationSession; -import tech.ydb.coordination.SemaphoreLease; -import tech.ydb.coordination.settings.CoordinationSessionSettings; -import tech.ydb.coordination.settings.DescribeSemaphoreMode; -import tech.ydb.core.Result; -import tech.ydb.jdbc.YdbConnection; - -/** - * @author Kirill Kurdyukov - */ -public class YdbCoordinationServiceLockProvider implements LockProvider { - private static final Logger logger = LoggerFactory.getLogger(YdbCoordinationServiceLockProvider.class); - private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb"; - private static final int ATTEMPT_CREATE_NODE = 10; - - private final YdbConnection ydbConnection; - private final CoordinationClient coordinationClient; - - public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) { - this.ydbConnection = ydbConnection; - this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport()); - } - - public void init() { - for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { - var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); - - if (i == ATTEMPT_CREATE_NODE - 1) { - status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME); - } - } - } - - @Override - public Optional lock(LockConfiguration lockConfiguration) { - var now = Instant.now(); - - String instanceInfo = "Hostname=" + Utils.getHostname() + ", " + - "Current PID=" + ProcessHandle.current().pid() + ", " + - "CreatedAt=" + now; - - logger.info("Instance[{}] is trying to become a leader...", instanceInfo); - - var coordinationSession = coordinationClient.createSession( - YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder() - .withRetryPolicy(new RetryForever(500)) - .build() - ); - - var statusCS = coordinationSession.connect().join(); - - if (!statusCS.isSuccess()) { - logger.warn("Failed creating coordination session [{}]", coordinationSession); - - return Optional.empty(); - } - - logger.debug("Created coordination node session [{}]", coordinationSession); - - var describeResult = coordinationSession.describeSemaphore( - lockConfiguration.getName(), - DescribeSemaphoreMode.WITH_OWNERS - ).join(); - - if (describeResult.isSuccess()) { - var describe = describeResult.getValue(); - - if (!describe.getOwnersList().isEmpty()) { - var describePayload = new String(describe.getOwnersList().get(0).getData(), StandardCharsets.UTF_8); - - logger.info("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload); - } - } - - Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( - lockConfiguration.getName(), - true, - instanceInfo.getBytes(StandardCharsets.UTF_8), - lockConfiguration.getLockAtMostFor() - ).join(); - - logger.debug(coordinationSession.toString()); - - if (semaphoreLease.isSuccess()) { - logger.debug("Instance[{}] acquired semaphore [SemaphoreName={}]", instanceInfo, - semaphoreLease.getValue().getSemaphoreName()); - - return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession)); - } else { - logger.debug("Instance[{}] did not acquire semaphore", instanceInfo); - - return Optional.empty(); - } - } - - private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo, - CoordinationSession coordinationSession) implements SimpleLock { - @Override - public void unlock() { - logger.info("Instance[{}] released semaphore [SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); - - semaphoreLease.release().join(); - - coordinationSession.close(); - } - } - - @PreDestroy - private void close() throws SQLException { - ydbConnection.close(); - } -} diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java new file mode 100644 index 0000000..decf714 --- /dev/null +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java @@ -0,0 +1,89 @@ +package tech.ydb.lock.provider; + +import java.sql.SQLException; +import java.util.Optional; +import javax.sql.DataSource; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.support.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Kirill Kurdyukov + */ +public class YdbJDBCLockProvider implements LockProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class); + private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " + + "Current PID=" + ProcessHandle.current().pid(); + + private final DataSource dataSource; + + public YdbJDBCLockProvider(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public Optional lock(LockConfiguration lockConfiguration) { + try (var connection = dataSource.getConnection()) { + var autoCommit = connection.getAutoCommit(); + try { + connection.setAutoCommit(false); + + var selectPS = connection.prepareStatement("SELECT lock_until, locked_by FROM shedlock " + + "WHERE name = ? AND lock_until > CurrentUtcTimestamp() + ?"); + + selectPS.setString(1, lockConfiguration.getName()); + selectPS.setObject(2, lockConfiguration.getLockAtMostFor()); + + var haveLeader = false; + try (var rs = selectPS.executeQuery()) { + haveLeader = rs.next(); + } + + if (haveLeader) { + return Optional.empty(); + } + + var upsertPS = connection.prepareStatement("" + + "UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " + + "VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)" + ); + + upsertPS.setObject(1, lockConfiguration.getName()); + upsertPS.setObject(2, lockConfiguration.getLockAtMostFor()); + upsertPS.setObject(3, LOCKED_BY); + upsertPS.execute(); + + connection.commit(); + + LOGGER.debug("Instance[{}] is leader", LOCKED_BY); + + return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource)); + } finally { + connection.setAutoCommit(autoCommit); + } + } catch (SQLException e) { + LOGGER.debug(String.format("Instance[{%s}] acquire lock is failed", LOCKED_BY), e); + + return Optional.empty(); + } + } + + private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock { + @Override + public void unlock() { + try (var connection = dataSource.getConnection()) { + var ps = connection.prepareStatement( + "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?"); + ps.setObject(1, name); + ps.execute(); + } catch (SQLException e) { + LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e); + + throw new RuntimeException(e); + } + } + } +} diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java index a3507bb..6a24fa5 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java @@ -1,10 +1,9 @@ package tech.ydb.lock.provider; -import java.sql.SQLException; import javax.sql.DataSource; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import tech.ydb.jdbc.YdbConnection; /** * @author Kirill Kurdyukov @@ -13,11 +12,8 @@ @Configuration public class YdbLockProviderConfiguration { @Bean - public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException { - var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class)); - - provider.init(); - - return provider; + @ConditionalOnBean(DataSource.class) + public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) { + return new YdbJDBCLockProvider(dataSource); } } diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 556e781..0ca237b 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -1,5 +1,6 @@ package tech.ydb.lock.provider; +import java.sql.SQLException; import java.time.Duration; import java.time.Instant; import java.util.Optional; @@ -7,6 +8,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.sql.DataSource; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.SimpleLock; import org.junit.jupiter.api.Assertions; @@ -46,10 +48,25 @@ private static String jdbcUrl() { } @Autowired - private YdbCoordinationServiceLockProvider lockProvider; + private YdbJDBCLockProvider lockProvider; + + @Autowired + private DataSource dataSource; @Test - public void integrationTest() throws ExecutionException, InterruptedException { + public void integrationTest() throws ExecutionException, InterruptedException, SQLException { + try (var connection = dataSource.getConnection()) { + var statement = connection.createStatement(); + statement.execute( + "CREATE TABLE shedlock(" + + "name TEXT NOT NULL, " + + "lock_until TIMESTAMP NOT NULL," + + "locked_at TIMESTAMP NOT NULL," + + "locked_by TEXT NOT NULL, " + + "PRIMARY KEY (name)" + + ");"); + } + var executorServer = Executors.newFixedThreadPool(10); var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); @@ -59,8 +76,8 @@ public void integrationTest() throws ExecutionException, InterruptedException { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { - optinal = lockProvider.lock( - new LockConfiguration(Instant.now(), "semaphore", Duration.ZERO, Duration.ZERO)); + optinal = lockProvider.lock(new LockConfiguration( + Instant.now(), "semaphore", Duration.ofSeconds(10), Duration.ZERO)); optinal.ifPresent(simpleLock -> { if (locked.get()) { From 8604d5c1f31db79ef397463edb87442da02313e2 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 16:35:12 +0300 Subject: [PATCH 4/8] fix --- shedlock-ydb/pom.xml | 2 +- .../ydb/lock/provider/YdbJDBCLockProvider.java | 15 +++++++++++---- .../ydb/lock/provider/YdbLockProviderTest.java | 13 ++++++++++--- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml index 3699fe0..4810187 100644 --- a/shedlock-ydb/pom.xml +++ b/shedlock-ydb/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects shedlock-ydb - 0.2.0 + 0.3.0 jar diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java index decf714..6f0b3b8 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java @@ -75,10 +75,17 @@ private record YdbJDBCLock(String name, DataSource dataSource) implements Simple @Override public void unlock() { try (var connection = dataSource.getConnection()) { - var ps = connection.prepareStatement( - "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?"); - ps.setObject(1, name); - ps.execute(); + var autoCommit = connection.getAutoCommit(); + + try { + connection.setAutoCommit(true); + var ps = connection.prepareStatement( + "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?"); + ps.setObject(1, name); + ps.execute(); + } finally { + connection.setAutoCommit(autoCommit); + } } catch (SQLException e) { LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e); diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 0ca237b..9797e73 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -3,9 +3,11 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; @@ -67,12 +69,13 @@ public void integrationTest() throws ExecutionException, InterruptedException, S ");"); } + var lockFutures = new ArrayList>(); var executorServer = Executors.newFixedThreadPool(10); var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); for (int i = 0; i < 10; i++) { - executorServer.submit(() -> { + lockFutures.add(executorServer.submit(() -> { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { @@ -96,9 +99,13 @@ public void integrationTest() throws ExecutionException, InterruptedException, S simpleLock.unlock(); }); } - }).get(); + })); } - Assertions.assertEquals(500, atomicInt.get()); + for (var future : lockFutures) { + future.get(); + } + + Assertions.assertEquals(4950, atomicInt.get()); } } From 669f25e54fe4dfdcb20b835b04ee7a0ed436c43b Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 17:44:35 +0300 Subject: [PATCH 5/8] feat update --- .../lock/provider/YdbJDBCLockProvider.java | 45 +++++++++++++------ .../lock/provider/YdbLockProviderTest.java | 24 +++++++--- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java index 6f0b3b8..0934d88 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java @@ -31,19 +31,17 @@ public Optional lock(LockConfiguration lockConfiguration) { try { connection.setAutoCommit(false); - var selectPS = connection.prepareStatement("SELECT lock_until, locked_by FROM shedlock " + - "WHERE name = ? AND lock_until > CurrentUtcTimestamp() + ?"); + var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " + + "WHERE name = ? AND lock_until > CurrentUtcTimestamp()"); selectPS.setString(1, lockConfiguration.getName()); - selectPS.setObject(2, lockConfiguration.getLockAtMostFor()); - var haveLeader = false; try (var rs = selectPS.executeQuery()) { - haveLeader = rs.next(); - } - - if (haveLeader) { - return Optional.empty(); + if (rs.next()) { + LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}", + LOCKED_BY, rs.getString(1), rs.getString(2)); + return Optional.empty(); + } } var upsertPS = connection.prepareStatement("" + @@ -65,31 +63,50 @@ public Optional lock(LockConfiguration lockConfiguration) { connection.setAutoCommit(autoCommit); } } catch (SQLException e) { - LOGGER.debug(String.format("Instance[{%s}] acquire lock is failed", LOCKED_BY), e); + LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY); return Optional.empty(); } } private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock { + private static final int ATTEMPT_RELEASE_LOCK = 10; + @Override public void unlock() { + for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) { + try { + LOGGER.debug("Instance[{}] trying unlock..", LOCKED_BY); + + doUnlock(); + + return; + } catch (SQLException e) { + if (i == ATTEMPT_RELEASE_LOCK - 1) { + throw new RuntimeException(e); + } + } + } + } + + private void doUnlock() throws SQLException { try (var connection = dataSource.getConnection()) { var autoCommit = connection.getAutoCommit(); try { connection.setAutoCommit(true); var ps = connection.prepareStatement( - "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?"); - ps.setObject(1, name); + "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?"); + ps.setString(1, name); + ps.setString(2, LOCKED_BY); ps.execute(); } finally { connection.setAutoCommit(autoCommit); } } catch (SQLException e) { - LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e); + LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e); - throw new RuntimeException(e); + throw e; } } } diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 9797e73..1ff4f00 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -8,6 +8,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; @@ -16,6 +17,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.DynamicPropertyRegistry; @@ -27,6 +30,7 @@ */ @SpringBootTest(classes = TestApp.class) public class YdbLockProviderTest { + private static final Logger logger = LoggerFactory.getLogger(YdbLockProviderTest.class); @RegisterExtension private static final YdbHelperExtension ydb = new YdbHelperExtension(); @@ -74,19 +78,20 @@ public void integrationTest() throws ExecutionException, InterruptedException, S var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { lockFutures.add(executorServer.submit(() -> { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { optinal = lockProvider.lock(new LockConfiguration( - Instant.now(), "semaphore", Duration.ofSeconds(10), Duration.ZERO)); + Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO)); optinal.ifPresent(simpleLock -> { - if (locked.get()) { + if (locked.compareAndExchange(false, true)) { + logger.debug("Failed test! System has two leaders"); + throw new RuntimeException(); } - locked.set(true); try { Thread.sleep(100); @@ -96,8 +101,17 @@ public void integrationTest() throws ExecutionException, InterruptedException, S atomicInt.addAndGet(50); locked.set(false); + + logger.info("Leader does UNLOCK!"); + simpleLock.unlock(); }); + + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(3_000)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } })); } @@ -106,6 +120,6 @@ public void integrationTest() throws ExecutionException, InterruptedException, S future.get(); } - Assertions.assertEquals(4950, atomicInt.get()); + Assertions.assertEquals(5000, atomicInt.get()); } } From f6a07c953cf72a1c470fe0deb423e0071fadfc89 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 18:07:08 +0300 Subject: [PATCH 6/8] feat update --- .../main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java index 0934d88..c15935e 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java @@ -76,8 +76,6 @@ private record YdbJDBCLock(String name, DataSource dataSource) implements Simple public void unlock() { for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) { try { - LOGGER.debug("Instance[{}] trying unlock..", LOCKED_BY); - doUnlock(); return; From 4f3723acb8ccc7c4954b8afca700990a216273b6 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 18:16:32 +0300 Subject: [PATCH 7/8] fix test --- .../java/tech/ydb/lock/provider/YdbLockProviderTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 1ff4f00..6e38753 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -87,7 +87,7 @@ public void integrationTest() throws ExecutionException, InterruptedException, S Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO)); optinal.ifPresent(simpleLock -> { - if (locked.compareAndExchange(false, true)) { + if (locked.compareAndSet(false, true)) { logger.debug("Failed test! System has two leaders"); throw new RuntimeException(); @@ -100,11 +100,10 @@ public void integrationTest() throws ExecutionException, InterruptedException, S } atomicInt.addAndGet(50); - locked.set(false); - logger.info("Leader does UNLOCK!"); simpleLock.unlock(); + locked.set(false); }); try { From 1eed680e9bd8d6fa594411a9ea7e130d72a84b5b Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Mon, 18 Nov 2024 18:44:11 +0300 Subject: [PATCH 8/8] fix test --- .../java/tech/ydb/lock/provider/YdbLockProviderTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 6e38753..99dca48 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -87,8 +87,8 @@ public void integrationTest() throws ExecutionException, InterruptedException, S Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO)); optinal.ifPresent(simpleLock -> { - if (locked.compareAndSet(false, true)) { - logger.debug("Failed test! System has two leaders"); + if (!locked.compareAndSet(false, true)) { + logger.error("Failed test! System has two leaders"); throw new RuntimeException(); } @@ -100,10 +100,9 @@ public void integrationTest() throws ExecutionException, InterruptedException, S } atomicInt.addAndGet(50); + locked.set(false); logger.info("Leader does UNLOCK!"); - simpleLock.unlock(); - locked.set(false); }); try {