diff --git a/shedlock-ydb/pom.xml b/shedlock-ydb/pom.xml index 543d217..4810187 100644 --- a/shedlock-ydb/pom.xml +++ b/shedlock-ydb/pom.xml @@ -6,7 +6,7 @@ tech.ydb.dialects shedlock-ydb - 0.2.0 + 0.3.0 jar @@ -38,8 +38,8 @@ 5.9.3 2.17.2 - 2.2.6 - 2.2.2 + 2.3.6 + 2.3.5 3.2.3 5.15.0 @@ -53,13 +53,6 @@ - - tech.ydb - ydb-sdk-bom - ${ydb.sdk.version} - pom - import - org.springframework.boot spring-boot-dependencies @@ -71,10 +64,6 @@ - - tech.ydb - ydb-sdk-coordination - net.javacrumbs.shedlock shedlock-spring @@ -101,6 +90,7 @@ tech.ydb.test ydb-junit5-support + ${ydb.sdk.version} test diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java deleted file mode 100644 index 4ef66c6..0000000 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java +++ /dev/null @@ -1,127 +0,0 @@ -package tech.ydb.lock.provider; - -import java.nio.charset.StandardCharsets; -import java.sql.SQLException; -import java.time.Instant; -import java.util.Optional; -import javax.annotation.PreDestroy; -import net.javacrumbs.shedlock.core.LockConfiguration; -import net.javacrumbs.shedlock.core.LockProvider; -import net.javacrumbs.shedlock.core.SimpleLock; -import net.javacrumbs.shedlock.support.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.ydb.common.retry.RetryForever; -import tech.ydb.coordination.CoordinationClient; -import tech.ydb.coordination.CoordinationSession; -import tech.ydb.coordination.SemaphoreLease; -import tech.ydb.coordination.settings.CoordinationSessionSettings; -import tech.ydb.coordination.settings.DescribeSemaphoreMode; -import tech.ydb.core.Result; -import tech.ydb.jdbc.YdbConnection; - -/** - * @author Kirill Kurdyukov - */ -public class YdbCoordinationServiceLockProvider implements LockProvider { - private static final Logger logger = LoggerFactory.getLogger(YdbCoordinationServiceLockProvider.class); - private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb"; - private static final int ATTEMPT_CREATE_NODE = 10; - - private final YdbConnection ydbConnection; - private final CoordinationClient coordinationClient; - - public YdbCoordinationServiceLockProvider(YdbConnection ydbConnection) { - this.ydbConnection = ydbConnection; - this.coordinationClient = CoordinationClient.newClient(ydbConnection.getCtx().getGrpcTransport()); - } - - public void init() { - for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) { - var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join(); - - if (i == ATTEMPT_CREATE_NODE - 1) { - status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME); - } - } - } - - @Override - public Optional lock(LockConfiguration lockConfiguration) { - var now = Instant.now(); - - String instanceInfo = "Hostname=" + Utils.getHostname() + ", " + - "Current PID=" + ProcessHandle.current().pid() + ", " + - "CreatedAt=" + now; - - logger.info("Instance[{}] is trying to become a leader...", instanceInfo); - - var coordinationSession = coordinationClient.createSession( - YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder() - .withRetryPolicy(new RetryForever(500)) - .build() - ); - - var statusCS = coordinationSession.connect().join(); - - if (!statusCS.isSuccess()) { - logger.warn("Failed creating coordination session [{}]", coordinationSession); - - return Optional.empty(); - } - - logger.debug("Created coordination node session [{}]", coordinationSession); - - var describeResult = coordinationSession.describeSemaphore( - lockConfiguration.getName(), - DescribeSemaphoreMode.WITH_OWNERS - ).join(); - - if (describeResult.isSuccess()) { - var describe = describeResult.getValue(); - - if (!describe.getOwnersList().isEmpty()) { - var describePayload = new String(describe.getOwnersList().get(0).getData(), StandardCharsets.UTF_8); - - logger.info("Received DescribeSemaphore[Name={}, Data={}]", describe.getName(), describePayload); - } - } - - Result semaphoreLease = coordinationSession.acquireEphemeralSemaphore( - lockConfiguration.getName(), - true, - instanceInfo.getBytes(StandardCharsets.UTF_8), - lockConfiguration.getLockAtMostFor() - ).join(); - - logger.debug(coordinationSession.toString()); - - if (semaphoreLease.isSuccess()) { - logger.debug("Instance[{}] acquired semaphore [SemaphoreName={}]", instanceInfo, - semaphoreLease.getValue().getSemaphoreName()); - - return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession)); - } else { - logger.debug("Instance[{}] did not acquire semaphore", instanceInfo); - - return Optional.empty(); - } - } - - private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo, - CoordinationSession coordinationSession) implements SimpleLock { - @Override - public void unlock() { - logger.info("Instance[{}] released semaphore [SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName()); - - semaphoreLease.release().join(); - - coordinationSession.close(); - } - } - - @PreDestroy - private void close() throws SQLException { - ydbConnection.close(); - } -} diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java new file mode 100644 index 0000000..c15935e --- /dev/null +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java @@ -0,0 +1,111 @@ +package tech.ydb.lock.provider; + +import java.sql.SQLException; +import java.util.Optional; +import javax.sql.DataSource; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.core.LockProvider; +import net.javacrumbs.shedlock.core.SimpleLock; +import net.javacrumbs.shedlock.support.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Kirill Kurdyukov + */ +public class YdbJDBCLockProvider implements LockProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class); + private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " + + "Current PID=" + ProcessHandle.current().pid(); + + private final DataSource dataSource; + + public YdbJDBCLockProvider(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public Optional lock(LockConfiguration lockConfiguration) { + try (var connection = dataSource.getConnection()) { + var autoCommit = connection.getAutoCommit(); + try { + connection.setAutoCommit(false); + + var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " + + "WHERE name = ? AND lock_until > CurrentUtcTimestamp()"); + + selectPS.setString(1, lockConfiguration.getName()); + + try (var rs = selectPS.executeQuery()) { + if (rs.next()) { + LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}", + LOCKED_BY, rs.getString(1), rs.getString(2)); + return Optional.empty(); + } + } + + var upsertPS = connection.prepareStatement("" + + "UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " + + "VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)" + ); + + upsertPS.setObject(1, lockConfiguration.getName()); + upsertPS.setObject(2, lockConfiguration.getLockAtMostFor()); + upsertPS.setObject(3, LOCKED_BY); + upsertPS.execute(); + + connection.commit(); + + LOGGER.debug("Instance[{}] is leader", LOCKED_BY); + + return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource)); + } finally { + connection.setAutoCommit(autoCommit); + } + } catch (SQLException e) { + LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY); + + return Optional.empty(); + } + } + + private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock { + private static final int ATTEMPT_RELEASE_LOCK = 10; + + @Override + public void unlock() { + for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) { + try { + doUnlock(); + + return; + } catch (SQLException e) { + if (i == ATTEMPT_RELEASE_LOCK - 1) { + throw new RuntimeException(e); + } + } + } + } + + private void doUnlock() throws SQLException { + try (var connection = dataSource.getConnection()) { + var autoCommit = connection.getAutoCommit(); + + try { + connection.setAutoCommit(true); + var ps = connection.prepareStatement( + "UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?"); + ps.setString(1, name); + ps.setString(2, LOCKED_BY); + ps.execute(); + } finally { + connection.setAutoCommit(autoCommit); + } + } catch (SQLException e) { + LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e); + + throw e; + } + } + } +} diff --git a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java index a3507bb..6a24fa5 100644 --- a/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java +++ b/shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbLockProviderConfiguration.java @@ -1,10 +1,9 @@ package tech.ydb.lock.provider; -import java.sql.SQLException; import javax.sql.DataSource; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import tech.ydb.jdbc.YdbConnection; /** * @author Kirill Kurdyukov @@ -13,11 +12,8 @@ @Configuration public class YdbLockProviderConfiguration { @Bean - public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException { - var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class)); - - provider.init(); - - return provider; + @ConditionalOnBean(DataSource.class) + public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) { + return new YdbJDBCLockProvider(dataSource); } } diff --git a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java index 556e781..99dca48 100644 --- a/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java +++ b/shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java @@ -1,17 +1,24 @@ package tech.ydb.lock.provider; +import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.sql.DataSource; import net.javacrumbs.shedlock.core.LockConfiguration; import net.javacrumbs.shedlock.core.SimpleLock; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.DynamicPropertyRegistry; @@ -23,6 +30,7 @@ */ @SpringBootTest(classes = TestApp.class) public class YdbLockProviderTest { + private static final Logger logger = LoggerFactory.getLogger(YdbLockProviderTest.class); @RegisterExtension private static final YdbHelperExtension ydb = new YdbHelperExtension(); @@ -46,27 +54,44 @@ private static String jdbcUrl() { } @Autowired - private YdbCoordinationServiceLockProvider lockProvider; + private YdbJDBCLockProvider lockProvider; + + @Autowired + private DataSource dataSource; @Test - public void integrationTest() throws ExecutionException, InterruptedException { + public void integrationTest() throws ExecutionException, InterruptedException, SQLException { + try (var connection = dataSource.getConnection()) { + var statement = connection.createStatement(); + statement.execute( + "CREATE TABLE shedlock(" + + "name TEXT NOT NULL, " + + "lock_until TIMESTAMP NOT NULL," + + "locked_at TIMESTAMP NOT NULL," + + "locked_by TEXT NOT NULL, " + + "PRIMARY KEY (name)" + + ");"); + } + + var lockFutures = new ArrayList>(); var executorServer = Executors.newFixedThreadPool(10); var atomicInt = new AtomicInteger(); var locked = new AtomicBoolean(); - for (int i = 0; i < 10; i++) { - executorServer.submit(() -> { + for (int i = 0; i < 100; i++) { + lockFutures.add(executorServer.submit(() -> { Optional optinal = Optional.empty(); while (optinal.isEmpty()) { - optinal = lockProvider.lock( - new LockConfiguration(Instant.now(), "semaphore", Duration.ZERO, Duration.ZERO)); + optinal = lockProvider.lock(new LockConfiguration( + Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO)); optinal.ifPresent(simpleLock -> { - if (locked.get()) { + if (!locked.compareAndSet(false, true)) { + logger.error("Failed test! System has two leaders"); + throw new RuntimeException(); } - locked.set(true); try { Thread.sleep(100); @@ -76,12 +101,23 @@ public void integrationTest() throws ExecutionException, InterruptedException { atomicInt.addAndGet(50); locked.set(false); + logger.info("Leader does UNLOCK!"); simpleLock.unlock(); }); + + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(3_000)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - }).get(); + })); + } + + for (var future : lockFutures) { + future.get(); } - Assertions.assertEquals(500, atomicInt.get()); + Assertions.assertEquals(5000, atomicInt.get()); } }