diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml index 82234b9..543d217 100644 --- a/shedlock-ydb/pom.xml +++ b/shedlock-ydb/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects shedlock-ydb - 0.1.0 + 0.2.0 jar diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java index 352de67..d198486 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java @@ -1,15 +1,22 @@ package tech.ydb.lock.provider; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; +import java.time.Instant; import java.util.Optional; import javax.annotation.PreDestroy; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.support.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.retry.RetryForever; import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.settings.CoordinationSessionSettings; +import tech.ydb.core.Result; import tech.ydb.jdbc.YdbConnection; /** @@ -32,10 +39,6 @@ public void init() { for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); - if (status.isSuccess()) { - return; - } - if (i == ATTEMPT_CREATE_NODE - 1) { status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME); } @@ -44,30 +47,60 @@ public void init() { @Override public Optional lock(LockConfiguration lockConfiguration) { - var coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME); + var now = Instant.now(); + + String instanceInfo = "Hostname=" + Utils.getHostname() + ", " + + "Current PID=" + ProcessHandle.current().pid() + ", " + + "CreatedAt=" + now; + + logger.info("Instance[{}] is trying to become a leader...", instanceInfo); - coordinationSession.connect().join() - .expectSuccess("Failed creating coordination node session"); + var coordinationSession = coordinationClient.createSession( + YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder() + .withRetryPolicy(new RetryForever(500)) + .build() + ); - logger.debug("Created coordination node session"); + var statusCS = coordinationSession.connect().join(); - var semaphoreLease = coordinationSession.acquireEphemeralSemaphore(lockConfiguration.getName(), true, - lockConfiguration.getLockAtMostFor()).join(); + if (!statusCS.isSuccess()) { + logger.info("Failed creating coordination session [{}]", coordinationSession); + + return Optional.empty(); + } + + logger.info("Created coordination node session [{}]", coordinationSession); + + Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( + lockConfiguration.getName(), + true, + instanceInfo.getBytes(StandardCharsets.UTF_8), + lockConfiguration.getLockAtMostFor() + ).join(); + + logger.debug(coordinationSession.toString()); if (semaphoreLease.isSuccess()) { - logger.debug("Semaphore acquired"); + logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo, + semaphoreLease.getValue().getSemaphoreName()); - return Optional.of(new YdbSimpleLock(semaphoreLease.getValue())); + return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession)); } else { - logger.debug("Semaphore is not acquired"); + logger.info("Instance[{}] did not acquire semaphore", instanceInfo); + return Optional.empty(); } } - private record YdbSimpleLock(SemaphoreLease semaphoreLease) implements SimpleLock { + private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo, + CoordinationSession coordinationSession) implements SimpleLock { @Override public void unlock() { + logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); + semaphoreLease.release().join(); + + coordinationSession.close(); } } diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 2666768..556e781 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -2,11 +2,9 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import net.javacrumbs.shedlock.core.LockConfiguration; @@ -55,11 +53,9 @@ public void integrationTest() throws ExecutionException, InterruptedException { var executorServer = Executors.newFixedThreadPool(10); var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); - var futures = new ArrayList>(); - for (int i = 0; i < 100; i++) { - final var ii = i; - futures.add(executorServer.submit(() -> { + for (int i = 0; i < 10; i++) { + executorServer.submit(() -> { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { @@ -78,18 +74,14 @@ public void integrationTest() throws ExecutionException, InterruptedException { throw new RuntimeException(e); } - atomicInt.addAndGet(ii); + atomicInt.addAndGet(50); locked.set(false); simpleLock.unlock(); }); } - })); + }).get(); } - for (Future future : futures) { - future.get(); - } - - Assertions.assertEquals(4950, atomicInt.get()); + Assertions.assertEquals(500, atomicInt.get()); } } diff --git a/shedlock-ydb/src/test/resources/application.properties b/shedlock-ydb/src/test/resources/application.properties index 09763b8..f1c2a3d 100644 --- a/shedlock-ydb/src/test/resources/application.properties +++ b/shedlock-ydb/src/test/resources/application.properties @@ -1 +1,3 @@ -spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver \ No newline at end of file +spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver + +logging.level.tech.ydb.lock.provider=debug \ No newline at end of file