Skip to content

feat: shedlock over jdbc #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 18, 2024
16 changes: 3 additions & 13 deletions shedlock-ydb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

<junit5.version>5.9.3</junit5.version>
<log4j2.version>2.17.2</log4j2.version>
<ydb.sdk.version>2.2.6</ydb.sdk.version>
<ydb.jdbc.version>2.2.2</ydb.jdbc.version>
<ydb.sdk.version>2.3.6</ydb.sdk.version>
<ydb.jdbc.version>2.3.5</ydb.jdbc.version>
<spring.boot.version>3.2.3</spring.boot.version>
<shedlock-spring.version>5.15.0</shedlock-spring.version>
</properties>
Expand All @@ -53,13 +53,6 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>${ydb.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
Expand All @@ -71,10 +64,6 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-coordination</artifactId>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
Expand All @@ -101,6 +90,7 @@
<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit5-support</artifactId>
<version>${ydb.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import java.util.Optional;
import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Kirill Kurdyukov
*/
public class YdbJDBCLockProvider implements LockProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class);
private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " +
"Current PID=" + ProcessHandle.current().pid();

private final DataSource dataSource;

public YdbJDBCLockProvider(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
try (var connection = dataSource.getConnection()) {
var autoCommit = connection.getAutoCommit();
try {
connection.setAutoCommit(false);

var selectPS = connection.prepareStatement("SELECT lock_until, locked_by FROM shedlock " +
"WHERE name = ? AND lock_until > CurrentUtcTimestamp() + ?");

selectPS.setString(1, lockConfiguration.getName());
selectPS.setObject(2, lockConfiguration.getLockAtMostFor());

var haveLeader = false;
try (var rs = selectPS.executeQuery()) {
haveLeader = rs.next();
}

if (haveLeader) {
return Optional.empty();
}

var upsertPS = connection.prepareStatement("" +
"UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " +
"VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)"
);

upsertPS.setObject(1, lockConfiguration.getName());
upsertPS.setObject(2, lockConfiguration.getLockAtMostFor());
upsertPS.setObject(3, LOCKED_BY);
upsertPS.execute();

connection.commit();

LOGGER.debug("Instance[{}] is leader", LOCKED_BY);

return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource));
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
LOGGER.debug(String.format("Instance[{%s}] acquire lock is failed", LOCKED_BY), e);

return Optional.empty();
}
}

private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
@Override
public void unlock() {
try (var connection = dataSource.getConnection()) {
var ps = connection.prepareStatement(
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?");
ps.setObject(1, name);
ps.execute();
} catch (SQLException e) {
LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);

throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.ydb.jdbc.YdbConnection;

/**
* @author Kirill Kurdyukov
Expand All @@ -13,11 +12,8 @@
@Configuration
public class YdbLockProviderConfiguration {
@Bean
public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException {
var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class));

provider.init();

return provider;
@ConditionalOnBean(DataSource.class)
public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) {
return new YdbJDBCLockProvider(dataSource);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -46,10 +48,25 @@ private static String jdbcUrl() {
}

@Autowired
private YdbCoordinationServiceLockProvider lockProvider;
private YdbJDBCLockProvider lockProvider;

@Autowired
private DataSource dataSource;

@Test
public void integrationTest() throws ExecutionException, InterruptedException {
public void integrationTest() throws ExecutionException, InterruptedException, SQLException {
try (var connection = dataSource.getConnection()) {
var statement = connection.createStatement();
statement.execute(
"CREATE TABLE shedlock(" +
"name TEXT NOT NULL, " +
"lock_until TIMESTAMP NOT NULL," +
"locked_at TIMESTAMP NOT NULL," +
"locked_by TEXT NOT NULL, " +
"PRIMARY KEY (name)" +
");");
}

var executorServer = Executors.newFixedThreadPool(10);
var atomicInt = new AtomicInteger();
var locked = new AtomicBoolean();
Expand All @@ -59,8 +76,8 @@ public void integrationTest() throws ExecutionException, InterruptedException {
Optional<SimpleLock> optinal = Optional.empty();

while (optinal.isEmpty()) {
optinal = lockProvider.lock(
new LockConfiguration(Instant.now(), "semaphore", Duration.ZERO, Duration.ZERO));
optinal = lockProvider.lock(new LockConfiguration(
Instant.now(), "semaphore", Duration.ofSeconds(10), Duration.ZERO));

optinal.ifPresent(simpleLock -> {
if (locked.get()) {
Expand Down