Skip to content

feat: shedlock over jdbc #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 18, 2024
18 changes: 4 additions & 14 deletions shedlock-ydb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>tech.ydb.dialects</groupId>
<artifactId>shedlock-ydb</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -38,8 +38,8 @@

<junit5.version>5.9.3</junit5.version>
<log4j2.version>2.17.2</log4j2.version>
<ydb.sdk.version>2.2.6</ydb.sdk.version>
<ydb.jdbc.version>2.2.2</ydb.jdbc.version>
<ydb.sdk.version>2.3.6</ydb.sdk.version>
<ydb.jdbc.version>2.3.5</ydb.jdbc.version>
<spring.boot.version>3.2.3</spring.boot.version>
<shedlock-spring.version>5.15.0</shedlock-spring.version>
</properties>
Expand All @@ -53,13 +53,6 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>${ydb.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
Expand All @@ -71,10 +64,6 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-coordination</artifactId>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
Expand All @@ -101,6 +90,7 @@
<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit5-support</artifactId>
<version>${ydb.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import java.util.Optional;
import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Kirill Kurdyukov
*/
public class YdbJDBCLockProvider implements LockProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class);
private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " +
"Current PID=" + ProcessHandle.current().pid();

private final DataSource dataSource;

public YdbJDBCLockProvider(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
try (var connection = dataSource.getConnection()) {
var autoCommit = connection.getAutoCommit();
try {
connection.setAutoCommit(false);

var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " +
"WHERE name = ? AND lock_until > CurrentUtcTimestamp()");

selectPS.setString(1, lockConfiguration.getName());

try (var rs = selectPS.executeQuery()) {
if (rs.next()) {
LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}",
LOCKED_BY, rs.getString(1), rs.getString(2));
return Optional.empty();
}
}

var upsertPS = connection.prepareStatement("" +
"UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " +
"VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)"
);

upsertPS.setObject(1, lockConfiguration.getName());
upsertPS.setObject(2, lockConfiguration.getLockAtMostFor());
upsertPS.setObject(3, LOCKED_BY);
upsertPS.execute();

connection.commit();

LOGGER.debug("Instance[{}] is leader", LOCKED_BY);

return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource));
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY);

return Optional.empty();
}
}

private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
private static final int ATTEMPT_RELEASE_LOCK = 10;

@Override
public void unlock() {
for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) {
try {
doUnlock();

return;
} catch (SQLException e) {
if (i == ATTEMPT_RELEASE_LOCK - 1) {
throw new RuntimeException(e);
}
}
}
}

private void doUnlock() throws SQLException {
try (var connection = dataSource.getConnection()) {
var autoCommit = connection.getAutoCommit();

try {
connection.setAutoCommit(true);
var ps = connection.prepareStatement(
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?");
ps.setString(1, name);
ps.setString(2, LOCKED_BY);
ps.execute();
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);

throw e;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.ydb.jdbc.YdbConnection;

/**
* @author Kirill Kurdyukov
Expand All @@ -13,11 +12,8 @@
@Configuration
public class YdbLockProviderConfiguration {
@Bean
public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException {
var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class));

provider.init();

return provider;
@ConditionalOnBean(DataSource.class)
public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) {
return new YdbJDBCLockProvider(dataSource);
}
}
Loading