diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml
index 82234b9..543d217 100644
--- a/shedlock-ydb/pom.xml
+++ b/shedlock-ydb/pom.xml
@@ -6,7 +6,7 @@
tech.ydb.dialects
shedlock-ydb
- 0.1.0
+ 0.2.0
jar
diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java
index 352de67..d198486 100644
--- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java
+++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java
@@ -1,15 +1,22 @@
package tech.ydb.lock.provider;
+import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
+import java.time.Instant;
import java.util.Optional;
import javax.annotation.PreDestroy;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
+import net.javacrumbs.shedlock.support.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.retry.RetryForever;
import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
import tech.ydb.coordination.SemaphoreLease;
+import tech.ydb.coordination.settings.CoordinationSessionSettings;
+import tech.ydb.core.Result;
import tech.ydb.jdbc.YdbConnection;
/**
@@ -32,10 +39,6 @@ public void init() {
for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) {
var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join();
- if (status.isSuccess()) {
- return;
- }
-
if (i == ATTEMPT_CREATE_NODE - 1) {
status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME);
}
@@ -44,30 +47,60 @@ public void init() {
@Override
public Optional lock(LockConfiguration lockConfiguration) {
- var coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME);
+ var now = Instant.now();
+
+ String instanceInfo = "Hostname=" + Utils.getHostname() + ", " +
+ "Current PID=" + ProcessHandle.current().pid() + ", " +
+ "CreatedAt=" + now;
+
+ logger.info("Instance[{}] is trying to become a leader...", instanceInfo);
- coordinationSession.connect().join()
- .expectSuccess("Failed creating coordination node session");
+ var coordinationSession = coordinationClient.createSession(
+ YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder()
+ .withRetryPolicy(new RetryForever(500))
+ .build()
+ );
- logger.debug("Created coordination node session");
+ var statusCS = coordinationSession.connect().join();
- var semaphoreLease = coordinationSession.acquireEphemeralSemaphore(lockConfiguration.getName(), true,
- lockConfiguration.getLockAtMostFor()).join();
+ if (!statusCS.isSuccess()) {
+ logger.info("Failed creating coordination session [{}]", coordinationSession);
+
+ return Optional.empty();
+ }
+
+ logger.info("Created coordination node session [{}]", coordinationSession);
+
+ Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore(
+ lockConfiguration.getName(),
+ true,
+ instanceInfo.getBytes(StandardCharsets.UTF_8),
+ lockConfiguration.getLockAtMostFor()
+ ).join();
+
+ logger.debug(coordinationSession.toString());
if (semaphoreLease.isSuccess()) {
- logger.debug("Semaphore acquired");
+ logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo,
+ semaphoreLease.getValue().getSemaphoreName());
- return Optional.of(new YdbSimpleLock(semaphoreLease.getValue()));
+ return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession));
} else {
- logger.debug("Semaphore is not acquired");
+ logger.info("Instance[{}] did not acquire semaphore", instanceInfo);
+
return Optional.empty();
}
}
- private record YdbSimpleLock(SemaphoreLease semaphoreLease) implements SimpleLock {
+ private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo,
+ CoordinationSession coordinationSession) implements SimpleLock {
@Override
public void unlock() {
+ logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName());
+
semaphoreLease.release().join();
+
+ coordinationSession.close();
}
}
diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
index 2666768..556e781 100644
--- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
+++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
@@ -2,11 +2,9 @@
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.javacrumbs.shedlock.core.LockConfiguration;
@@ -55,11 +53,9 @@ public void integrationTest() throws ExecutionException, InterruptedException {
var executorServer = Executors.newFixedThreadPool(10);
var atomicInt = new AtomicInteger();
var locked = new AtomicBoolean();
- var futures = new ArrayList>();
- for (int i = 0; i < 100; i++) {
- final var ii = i;
- futures.add(executorServer.submit(() -> {
+ for (int i = 0; i < 10; i++) {
+ executorServer.submit(() -> {
Optional optinal = Optional.empty();
while (optinal.isEmpty()) {
@@ -78,18 +74,14 @@ public void integrationTest() throws ExecutionException, InterruptedException {
throw new RuntimeException(e);
}
- atomicInt.addAndGet(ii);
+ atomicInt.addAndGet(50);
locked.set(false);
simpleLock.unlock();
});
}
- }));
+ }).get();
}
- for (Future> future : futures) {
- future.get();
- }
-
- Assertions.assertEquals(4950, atomicInt.get());
+ Assertions.assertEquals(500, atomicInt.get());
}
}
diff --git a/shedlock-ydb/src/test/resources/application.properties b/shedlock-ydb/src/test/resources/application.properties
index 09763b8..f1c2a3d 100644
--- a/shedlock-ydb/src/test/resources/application.properties
+++ b/shedlock-ydb/src/test/resources/application.properties
@@ -1 +1,3 @@
-spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver
\ No newline at end of file
+spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver
+
+logging.level.tech.ydb.lock.provider=debug
\ No newline at end of file