diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml
index 543d217..4810187 100644
--- a/shedlock-ydb/pom.xml
+++ b/shedlock-ydb/pom.xml
@@ -6,7 +6,7 @@
tech.ydb.dialects
shedlock-ydb
- 0.2.0
+ 0.3.0
jar
@@ -38,8 +38,8 @@
5.9.3
2.17.2
- 2.2.6
- 2.2.2
+ 2.3.6
+ 2.3.5
3.2.3
5.15.0
@@ -53,13 +53,6 @@
-
- tech.ydb
- ydb-sdk-bom
- ${ydb.sdk.version}
- pom
- import
-
org.springframework.boot
spring-boot-dependencies
@@ -71,10 +64,6 @@
-
- tech.ydb
- ydb-sdk-coordination
-
net.javacrumbs.shedlock
shedlock-spring
@@ -101,6 +90,7 @@
tech.ydb.test
ydb-junit5-support
+ ${ydb.sdk.version}
test
diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java
deleted file mode 100644
index 4ef66c6..0000000
--- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package tech.ydb.lock.provider;
-
-import java.nio.charset.StandardCharsets;
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.Optional;
-import javax.annotation.PreDestroy;
-import net.javacrumbs.shedlock.core.LockConfiguration;
-import net.javacrumbs.shedlock.core.LockProvider;
-import net.javacrumbs.shedlock.core.SimpleLock;
-import net.javacrumbs.shedlock.support.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import tech.ydb.common.retry.RetryForever;
-import tech.ydb.coordination.CoordinationClient;
-import tech.ydb.coordination.CoordinationSession;
-import tech.ydb.coordination.SemaphoreLease;
-import tech.ydb.coordination.settings.CoordinationSessionSettings;
-import tech.ydb.coordination.settings.DescribeSemaphoreMode;
-import tech.ydb.core.Result;
-import tech.ydb.jdbc.YdbConnection;
-
-/**
- * @author Kirill Kurdyukov
- */
-public class YdbCoordinationServiceLockProvider implements LockProvider {
- private static final Logger logger = LoggerFactory.getLogger(YdbCoordinationServiceLockProvider.class);
- private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb";
- private static final int ATTEMPT_CREATE_NODE = 10;
-
- private final YdbConnection ydbConnection;
- private final CoordinationClient coordinationClient;
-
- public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) {
- this.ydbConnection = ydbConnection;
- this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport());
- }
-
- public void init() {
- for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) {
- var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join();
-
- if (i == ATTEMPT_CREATE_NODE - 1) {
- status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME);
- }
- }
- }
-
- @Override
- public Optional lock(LockConfiguration lockConfiguration) {
- var now = Instant.now();
-
- String instanceInfo = "Hostname=" + Utils.getHostname() + ", " +
- "Current PID=" + ProcessHandle.current().pid() + ", " +
- "CreatedAt=" + now;
-
- logger.info("Instance[{}] is trying to become a leader...", instanceInfo);
-
- var coordinationSession = coordinationClient.createSession(
- YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder()
- .withRetryPolicy(new RetryForever(500))
- .build()
- );
-
- var statusCS = coordinationSession.connect().join();
-
- if (!statusCS.isSuccess()) {
- logger.warn("Failed creating coordination session [{}]", coordinationSession);
-
- return Optional.empty();
- }
-
- logger.debug("Created coordination node session [{}]", coordinationSession);
-
- var describeResult = coordinationSession.describeSemaphore(
- lockConfiguration.getName(),
- DescribeSemaphoreMode.WITH_OWNERS
- ).join();
-
- if (describeResult.isSuccess()) {
- var describe = describeResult.getValue();
-
- if (!describe.getOwnersList().isEmpty()) {
- var describePayload = new String(describe.getOwnersList().get(0).getData(), StandardCharsets.UTF_8);
-
- logger.info("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload);
- }
- }
-
- Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore(
- lockConfiguration.getName(),
- true,
- instanceInfo.getBytes(StandardCharsets.UTF_8),
- lockConfiguration.getLockAtMostFor()
- ).join();
-
- logger.debug(coordinationSession.toString());
-
- if (semaphoreLease.isSuccess()) {
- logger.debug("Instance[{}] acquired semaphore [SemaphoreName={}]", instanceInfo,
- semaphoreLease.getValue().getSemaphoreName());
-
- return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession));
- } else {
- logger.debug("Instance[{}] did not acquire semaphore", instanceInfo);
-
- return Optional.empty();
- }
- }
-
- private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo,
- CoordinationSession coordinationSession) implements SimpleLock {
- @Override
- public void unlock() {
- logger.info("Instance[{}] released semaphore [SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName());
-
- semaphoreLease.release().join();
-
- coordinationSession.close();
- }
- }
-
- @PreDestroy
- private void close() throws SQLException {
- ydbConnection.close();
- }
-}
diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java
new file mode 100644
index 0000000..c15935e
--- /dev/null
+++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java
@@ -0,0 +1,111 @@
+package tech.ydb.lock.provider;
+
+import java.sql.SQLException;
+import java.util.Optional;
+import javax.sql.DataSource;
+import net.javacrumbs.shedlock.core.LockConfiguration;
+import net.javacrumbs.shedlock.core.LockProvider;
+import net.javacrumbs.shedlock.core.SimpleLock;
+import net.javacrumbs.shedlock.support.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Kirill Kurdyukov
+ */
+public class YdbJDBCLockProvider implements LockProvider {
+ private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class);
+ private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " +
+ "Current PID=" + ProcessHandle.current().pid();
+
+ private final DataSource dataSource;
+
+ public YdbJDBCLockProvider(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public Optional lock(LockConfiguration lockConfiguration) {
+ try (var connection = dataSource.getConnection()) {
+ var autoCommit = connection.getAutoCommit();
+ try {
+ connection.setAutoCommit(false);
+
+ var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " +
+ "WHERE name = ? AND lock_until > CurrentUtcTimestamp()");
+
+ selectPS.setString(1, lockConfiguration.getName());
+
+ try (var rs = selectPS.executeQuery()) {
+ if (rs.next()) {
+ LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}",
+ LOCKED_BY, rs.getString(1), rs.getString(2));
+ return Optional.empty();
+ }
+ }
+
+ var upsertPS = connection.prepareStatement("" +
+ "UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " +
+ "VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)"
+ );
+
+ upsertPS.setObject(1, lockConfiguration.getName());
+ upsertPS.setObject(2, lockConfiguration.getLockAtMostFor());
+ upsertPS.setObject(3, LOCKED_BY);
+ upsertPS.execute();
+
+ connection.commit();
+
+ LOGGER.debug("Instance[{}] is leader", LOCKED_BY);
+
+ return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource));
+ } finally {
+ connection.setAutoCommit(autoCommit);
+ }
+ } catch (SQLException e) {
+ LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY);
+
+ return Optional.empty();
+ }
+ }
+
+ private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
+ private static final int ATTEMPT_RELEASE_LOCK = 10;
+
+ @Override
+ public void unlock() {
+ for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) {
+ try {
+ doUnlock();
+
+ return;
+ } catch (SQLException e) {
+ if (i == ATTEMPT_RELEASE_LOCK - 1) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private void doUnlock() throws SQLException {
+ try (var connection = dataSource.getConnection()) {
+ var autoCommit = connection.getAutoCommit();
+
+ try {
+ connection.setAutoCommit(true);
+ var ps = connection.prepareStatement(
+ "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?");
+ ps.setString(1, name);
+ ps.setString(2, LOCKED_BY);
+ ps.execute();
+ } finally {
+ connection.setAutoCommit(autoCommit);
+ }
+ } catch (SQLException e) {
+ LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);
+
+ throw e;
+ }
+ }
+ }
+}
diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java
index a3507bb..6a24fa5 100644
--- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java
+++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java
@@ -1,10 +1,9 @@
package tech.ydb.lock.provider;
-import java.sql.SQLException;
import javax.sql.DataSource;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import tech.ydb.jdbc.YdbConnection;
/**
* @author Kirill Kurdyukov
@@ -13,11 +12,8 @@
@Configuration
public class YdbLockProviderConfiguration {
@Bean
- public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException {
- var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class));
-
- provider.init();
-
- return provider;
+ @ConditionalOnBean(DataSource.class)
+ public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) {
+ return new YdbJDBCLockProvider(dataSource);
}
}
diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
index 556e781..99dca48 100644
--- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
+++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java
@@ -1,17 +1,24 @@
package tech.ydb.lock.provider;
+import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
@@ -23,6 +30,7 @@
*/
@SpringBootTest(classes = TestApp.class)
public class YdbLockProviderTest {
+ private static final Logger logger = LoggerFactory.getLogger(YdbLockProviderTest.class);
@RegisterExtension
private static final YdbHelperExtension ydb = new YdbHelperExtension();
@@ -46,27 +54,44 @@ private static String jdbcUrl() {
}
@Autowired
- private YdbCoordinationServiceLockProvider lockProvider;
+ private YdbJDBCLockProvider lockProvider;
+
+ @Autowired
+ private DataSource dataSource;
@Test
- public void integrationTest() throws ExecutionException, InterruptedException {
+ public void integrationTest() throws ExecutionException, InterruptedException, SQLException {
+ try (var connection = dataSource.getConnection()) {
+ var statement = connection.createStatement();
+ statement.execute(
+ "CREATE TABLE shedlock(" +
+ "name TEXT NOT NULL, " +
+ "lock_until TIMESTAMP NOT NULL," +
+ "locked_at TIMESTAMP NOT NULL," +
+ "locked_by TEXT NOT NULL, " +
+ "PRIMARY KEY (name)" +
+ ");");
+ }
+
+ var lockFutures = new ArrayList>();
var executorServer = Executors.newFixedThreadPool(10);
var atomicInt = new AtomicInteger();
var locked = new AtomicBoolean();
- for (int i = 0; i < 10; i++) {
- executorServer.submit(() -> {
+ for (int i = 0; i < 100; i++) {
+ lockFutures.add(executorServer.submit(() -> {
Optional optinal = Optional.empty();
while (optinal.isEmpty()) {
- optinal = lockProvider.lock(
- new LockConfiguration(Instant.now(), "semaphore", Duration.ZERO, Duration.ZERO));
+ optinal = lockProvider.lock(new LockConfiguration(
+ Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO));
optinal.ifPresent(simpleLock -> {
- if (locked.get()) {
+ if (!locked.compareAndSet(false, true)) {
+ logger.error("Failed test! System has two leaders");
+
throw new RuntimeException();
}
- locked.set(true);
try {
Thread.sleep(100);
@@ -76,12 +101,23 @@ public void integrationTest() throws ExecutionException, InterruptedException {
atomicInt.addAndGet(50);
locked.set(false);
+ logger.info("Leader does UNLOCK!");
simpleLock.unlock();
});
+
+ try {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(3_000));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
- }).get();
+ }));
+ }
+
+ for (var future : lockFutures) {
+ future.get();
}
- Assertions.assertEquals(500, atomicInt.get());
+ Assertions.assertEquals(5000, atomicInt.get());
}
}