From f2434ca24b3581e4572708f4fe73b431afea4c59 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Fri, 8 Nov 2024 17:29:18 +0300 Subject: [PATCH 1/6] feat: prepare hibernate release --- hibernate-dialect/CHANGELOG.md | 4 ++++ hibernate-dialect/pom.xml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hibernate-dialect/CHANGELOG.md b/hibernate-dialect/CHANGELOG.md index 87c003e..e7ec4e7 100644 --- a/hibernate-dialect/CHANGELOG.md +++ b/hibernate-dialect/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.2.0 ## + +- Added custom decimal jdbc codes `DECIMAL_31_9`, `DECIMAL_35_0`, `DECIMAL_35_9` + ## 1.1.0 ## - Added hint for scan queries diff --git a/hibernate-dialect/pom.xml b/hibernate-dialect/pom.xml index 3c8e9d7..9d39fba 100644 --- a/hibernate-dialect/pom.xml +++ b/hibernate-dialect/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects hibernate-ydb-dialect - 1.1.0 + 1.2.0 jar From 50aba4b75ba20bf0f9162ac3e0f6af2c01fc3d87 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 13:30:55 +0300 Subject: [PATCH 2/6] feat: added leader info in semaphore data --- shedlock-ydb/pom.xml | 2 +- .../YdbCoordinationServiceLockProvider.java | 50 +++++++++++++++---- .../lock/provider/YdbLockProviderTest.java | 18 ++----- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml index 82234b9..543d217 100644 --- a/shedlock-ydb/pom.xml +++ b/shedlock-ydb/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects shedlock-ydb - 0.1.0 + 0.2.0 jar diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index 352de67..a7d6cb5 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -1,15 +1,20 @@ package tech.ydb.lock.provider; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.Optional; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.support.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; /** @@ -19,21 +24,38 @@ public class YdbCoordinationServiceLockProvider implements LockProvider { private static final Logger logger = LoggerFactory.getLogger(YdbCoordinationServiceLockProvider.class); private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb"; private static final int ATTEMPT_CREATE_NODE = 10; + private static final String INSTANCE_INFO = + "{Hostname=" + Utils.getHostname() + ", " + "Current PID=" + ProcessHandle.current().pid() + "}"; + private static final byte[] INSTANCE_INFO_BYTES = INSTANCE_INFO.getBytes(StandardCharsets.UTF_8); private final YdbConnection ydbConnection; private final CoordinationClient coordinationClient; + private volatile CoordinationSession coordinationSession; + public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) { this.ydbConnection = ydbConnection; this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport()); } + @PostConstruct public void init() { for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); if (status.isSuccess()) { - return; + coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME); + + var statusCS = coordinationSession.connect().join(); + + if (statusCS.isSuccess()) { + logger.info("Created coordination node session [{}]", coordinationSession); + + return; + } + if (i == ATTEMPT_CREATE_NODE - 1) { + statusCS.expectSuccess("Failed creating coordination node session"); + } } if (i == ATTEMPT_CREATE_NODE - 1) { @@ -44,22 +66,23 @@ public void init() { @Override public Optional lock(LockConfiguration lockConfiguration) { - var coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME); - - coordinationSession.connect().join() - .expectSuccess("Failed creating coordination node session"); + logger.info("Instance[{}] is trying to become a leader...", INSTANCE_INFO); - logger.debug("Created coordination node session"); - - var semaphoreLease = coordinationSession.acquireEphemeralSemaphore(lockConfiguration.getName(), true, - lockConfiguration.getLockAtMostFor()).join(); + Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( + lockConfiguration.getName(), + true, + INSTANCE_INFO_BYTES, + lockConfiguration.getLockAtMostFor() + ).join(); if (semaphoreLease.isSuccess()) { - logger.debug("Semaphore acquired"); + logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", INSTANCE_INFO, + semaphoreLease.getValue().getSemaphoreName()); return Optional.of(new YdbSimpleLock(semaphoreLease.getValue())); } else { - logger.debug("Semaphore is not acquired"); + logger.info("Instance[{}] did not acquire semaphore", INSTANCE_INFO); + return Optional.empty(); } } @@ -67,12 +90,17 @@ public Optional lock(LockConfiguration lockConfiguration) { private record YdbSimpleLock(SemaphoreLease semaphoreLease) implements SimpleLock { @Override public void unlock() { + logger.info("Instance[{}] released semaphore[SemaphoreName={}]", INSTANCE_INFO, semaphoreLease.getSemaphoreName()); + semaphoreLease.release().join(); } } @PreDestroy private void close() throws SQLException { + // closing coordination session + coordinationSession.close(); + ydbConnection.close(); } } diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 2666768..556e781 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -2,11 +2,9 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import net.javacrumbs.shedlock.core.LockConfiguration; @@ -55,11 +53,9 @@ public void integrationTest() throws ExecutionException, InterruptedException { var executorServer = Executors.newFixedThreadPool(10); var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); - var futures = new ArrayList>(); - for (int i = 0; i < 100; i++) { - final var ii = i; - futures.add(executorServer.submit(() -> { + for (int i = 0; i < 10; i++) { + executorServer.submit(() -> { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { @@ -78,18 +74,14 @@ public void integrationTest() throws ExecutionException, InterruptedException { throw new RuntimeException(e); } - atomicInt.addAndGet(ii); + atomicInt.addAndGet(50); locked.set(false); simpleLock.unlock(); }); } - })); + }).get(); } - for (Future future : futures) { - future.get(); - } - - Assertions.assertEquals(4950, atomicInt.get()); + Assertions.assertEquals(500, atomicInt.get()); } } From 4385fe6f43593fb4d7a6f8981522e8e9831667b7 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 14:28:10 +0300 Subject: [PATCH 3/6] feat: added leader info in semaphore data --- .../YdbCoordinationServiceLockProvider.java | 35 +++++++++++++++++-- .../src/test/resources/application.properties | 4 ++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index a7d6cb5..9f85e24 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -2,6 +2,7 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; +import java.time.Instant; import java.util.Optional; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -14,6 +15,7 @@ import tech.ydb.coordination.CoordinationClient; import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; @@ -25,7 +27,7 @@ public class YdbCoordinationServiceLockProvider implements LockProvider { private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb"; private static final int ATTEMPT_CREATE_NODE = 10; private static final String INSTANCE_INFO = - "{Hostname=" + Utils.getHostname() + ", " + "Current PID=" + ProcessHandle.current().pid() + "}"; + "Hostname=" + Utils.getHostname() + ", " + "Current PID=" + ProcessHandle.current().pid(); private static final byte[] INSTANCE_INFO_BYTES = INSTANCE_INFO.getBytes(StandardCharsets.UTF_8); private final YdbConnection ydbConnection; @@ -66,7 +68,36 @@ public void init() { @Override public Optional lock(LockConfiguration lockConfiguration) { - logger.info("Instance[{}] is trying to become a leader...", INSTANCE_INFO); + var now = Instant.now(); + + String instanceInfo = "Hostname=" + Utils.getHostname() + ", " + + "Current PID=" + ProcessHandle.current().pid() + ", " + + "CreatedAt=" + now; + + logger.info("Instance[{}] is trying to become a leader...", instanceInfo); + + var describeResult = coordinationSession.describeSemaphore( + lockConfiguration.getName(), + DescribeSemaphoreMode.WITH_OWNERS + ).join(); + + if (describeResult.isSuccess()) { + var describe = describeResult.getValue(); + var describePayload = new String(describe.getData(), StandardCharsets.UTF_8); + + logger.debug("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload); + + Instant createdLeaderTimestampUTC = Instant.parse(describePayload.split(",")[2].split("=")[1]); + + if (now.isAfter(createdLeaderTimestampUTC.plus(lockConfiguration.getLockAtMostFor()))) { + var deleteResult = coordinationSession.deleteSemaphore(describe.getName(), true).join(); + logger.debug("Delete semaphore[Name={}] result: {}", describe.getName(), deleteResult); + } + } else { + // no success, ephemeral semaphore is not created + + logger.debug("Semaphore[Name={}] not found", lockConfiguration.getName()); + } Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( lockConfiguration.getName(), diff --git a/shedlock-ydb/src/test/resources/application.properties b/shedlock-ydb/src/test/resources/application.properties index 09763b8..f1c2a3d 100644 --- a/shedlock-ydb/src/test/resources/application.properties +++ b/shedlock-ydb/src/test/resources/application.properties @@ -1 +1,3 @@ -spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver \ No newline at end of file +spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver + +logging.level.tech.ydb.lock.provider=debug \ No newline at end of file From eb6106c32ffde8b974962d5bc026b1263d0810f5 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 14:30:34 +0300 Subject: [PATCH 4/6] feat: micro fix --- .../YdbCoordinationServiceLockProvider.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index 9f85e24..5a9e012 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -26,9 +26,6 @@ public class YdbCoordinationServiceLockProvider implements LockProvider { private static final Logger logger = LoggerFactory.getLogger(YdbCoordinationServiceLockProvider.class); private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb"; private static final int ATTEMPT_CREATE_NODE = 10; - private static final String INSTANCE_INFO = - "Hostname=" + Utils.getHostname() + ", " + "Current PID=" + ProcessHandle.current().pid(); - private static final byte[] INSTANCE_INFO_BYTES = INSTANCE_INFO.getBytes(StandardCharsets.UTF_8); private final YdbConnection ydbConnection; private final CoordinationClient coordinationClient; @@ -102,26 +99,26 @@ public Optional lock(LockConfiguration lockConfiguration) { Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( lockConfiguration.getName(), true, - INSTANCE_INFO_BYTES, + instanceInfo.getBytes(StandardCharsets.UTF_8), lockConfiguration.getLockAtMostFor() ).join(); if (semaphoreLease.isSuccess()) { - logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", INSTANCE_INFO, + logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo, semaphoreLease.getValue().getSemaphoreName()); - return Optional.of(new YdbSimpleLock(semaphoreLease.getValue())); + return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo)); } else { - logger.info("Instance[{}] did not acquire semaphore", INSTANCE_INFO); + logger.info("Instance[{}] did not acquire semaphore", instanceInfo); return Optional.empty(); } } - private record YdbSimpleLock(SemaphoreLease semaphoreLease) implements SimpleLock { + private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo) implements SimpleLock { @Override public void unlock() { - logger.info("Instance[{}] released semaphore[SemaphoreName={}]", INSTANCE_INFO, semaphoreLease.getSemaphoreName()); + logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); semaphoreLease.release().join(); } From 51459dbde2281d5e6452fa19c5cf7177d490e694 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 18:10:27 +0300 Subject: [PATCH 5/6] feat: micro fix --- .../YdbCoordinationServiceLockProvider.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index 5a9e012..695964e 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -4,7 +4,6 @@ import java.sql.SQLException; import java.time.Instant; import java.util.Optional; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.LockProvider; @@ -15,7 +14,6 @@ import tech.ydb.coordination.CoordinationClient; import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; -import tech.ydb.coordination.settings.DescribeSemaphoreMode; import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; @@ -37,7 +35,6 @@ public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) { this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport()); } - @PostConstruct public void init() { for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); @@ -73,29 +70,6 @@ public Optional lock(LockConfiguration lockConfiguration) { logger.info("Instance[{}] is trying to become a leader...", instanceInfo); - var describeResult = coordinationSession.describeSemaphore( - lockConfiguration.getName(), - DescribeSemaphoreMode.WITH_OWNERS - ).join(); - - if (describeResult.isSuccess()) { - var describe = describeResult.getValue(); - var describePayload = new String(describe.getData(), StandardCharsets.UTF_8); - - logger.debug("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload); - - Instant createdLeaderTimestampUTC = Instant.parse(describePayload.split(",")[2].split("=")[1]); - - if (now.isAfter(createdLeaderTimestampUTC.plus(lockConfiguration.getLockAtMostFor()))) { - var deleteResult = coordinationSession.deleteSemaphore(describe.getName(), true).join(); - logger.debug("Delete semaphore[Name={}] result: {}", describe.getName(), deleteResult); - } - } else { - // no success, ephemeral semaphore is not created - - logger.debug("Semaphore[Name={}] not found", lockConfiguration.getName()); - } - Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( lockConfiguration.getName(), true, From 1c4e5ceb82b4c4953cfa64d4c8e15db762b3b445 Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 14 Nov 2024 19:37:52 +0300 Subject: [PATCH 6/6] feat: micro fix --- .../YdbCoordinationServiceLockProvider.java | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index 695964e..d198486 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -11,9 +11,11 @@ import net.javacrumbs.shedlock.support.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.retry.RetryForever; import tech.ydb.coordination.CoordinationClient; import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.settings.CoordinationSessionSettings; import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; @@ -28,8 +30,6 @@ public class YdbCoordinationServiceLockProvider implements LockProvider { private final YdbConnection ydbConnection; private final CoordinationClient coordinationClient; - private volatile CoordinationSession coordinationSession; - public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) { this.ydbConnection = ydbConnection; this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport()); @@ -39,21 +39,6 @@ public void init() { for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); - if (status.isSuccess()) { - coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME); - - var statusCS = coordinationSession.connect().join(); - - if (statusCS.isSuccess()) { - logger.info("Created coordination node session [{}]", coordinationSession); - - return; - } - if (i == ATTEMPT_CREATE_NODE - 1) { - statusCS.expectSuccess("Failed creating coordination node session"); - } - } - if (i == ATTEMPT_CREATE_NODE - 1) { status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME); } @@ -70,6 +55,22 @@ public Optional lock(LockConfiguration lockConfiguration) { logger.info("Instance[{}] is trying to become a leader...", instanceInfo); + var coordinationSession = coordinationClient.createSession( + YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder() + .withRetryPolicy(new RetryForever(500)) + .build() + ); + + var statusCS = coordinationSession.connect().join(); + + if (!statusCS.isSuccess()) { + logger.info("Failed creating coordination session [{}]", coordinationSession); + + return Optional.empty(); + } + + logger.info("Created coordination node session [{}]", coordinationSession); + Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( lockConfiguration.getName(), true, @@ -77,11 +78,13 @@ public Optional lock(LockConfiguration lockConfiguration) { lockConfiguration.getLockAtMostFor() ).join(); + logger.debug(coordinationSession.toString()); + if (semaphoreLease.isSuccess()) { logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo, semaphoreLease.getValue().getSemaphoreName()); - return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo)); + return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession)); } else { logger.info("Instance[{}] did not acquire semaphore", instanceInfo); @@ -89,20 +92,20 @@ public Optional lock(LockConfiguration lockConfiguration) { } } - private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo) implements SimpleLock { + private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo, + CoordinationSession coordinationSession) implements SimpleLock { @Override public void unlock() { logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); semaphoreLease.release().join(); + + coordinationSession.close(); } } @PreDestroy private void close() throws SQLException { - // closing coordination session - coordinationSession.close(); - ydbConnection.close(); } }