Skip to content

Commit e6cd80f

Browse files
feat: added leaderinfo in semaphore (#166)
1 parent bbbaefe commit e6cd80f

File tree

4 files changed

+56
-29
lines changed

4 files changed

+56
-29
lines changed

shedlock-ydb/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>tech.ydb.dialects</groupId>
88
<artifactId>shedlock-ydb</artifactId>
9-
<version>0.1.0</version>
9+
<version>0.2.0</version>
1010

1111
<packaging>jar</packaging>
1212

shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java

+47-14
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
package tech.ydb.lock.provider;
22

3+
import java.nio.charset.StandardCharsets;
34
import java.sql.SQLException;
5+
import java.time.Instant;
46
import java.util.Optional;
57
import javax.annotation.PreDestroy;
68
import net.javacrumbs.shedlock.core.LockConfiguration;
79
import net.javacrumbs.shedlock.core.LockProvider;
810
import net.javacrumbs.shedlock.core.SimpleLock;
11+
import net.javacrumbs.shedlock.support.Utils;
912
import org.slf4j.Logger;
1013
import org.slf4j.LoggerFactory;
14+
import tech.ydb.common.retry.RetryForever;
1115
import tech.ydb.coordination.CoordinationClient;
16+
import tech.ydb.coordination.CoordinationSession;
1217
import tech.ydb.coordination.SemaphoreLease;
18+
import tech.ydb.coordination.settings.CoordinationSessionSettings;
19+
import tech.ydb.core.Result;
1320
import tech.ydb.jdbc.YdbConnection;
1421

1522
/**
@@ -32,10 +39,6 @@ public void init() {
3239
for (int i = 0; i < ATTEMPT_CREATE_NODE; i++) {
3340
var status = coordinationClient.createNode(YDB_LOCK_NODE_NAME).join();
3441

35-
if (status.isSuccess()) {
36-
return;
37-
}
38-
3942
if (i == ATTEMPT_CREATE_NODE - 1) {
4043
status.expectSuccess("Failed created coordination service node: " + YDB_LOCK_NODE_NAME);
4144
}
@@ -44,30 +47,60 @@ public void init() {
4447

4548
@Override
4649
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
47-
var coordinationSession = coordinationClient.createSession(YDB_LOCK_NODE_NAME);
50+
var now = Instant.now();
51+
52+
String instanceInfo = "Hostname=" + Utils.getHostname() + ", " +
53+
"Current PID=" + ProcessHandle.current().pid() + ", " +
54+
"CreatedAt=" + now;
55+
56+
logger.info("Instance[{}] is trying to become a leader...", instanceInfo);
4857

49-
coordinationSession.connect().join()
50-
.expectSuccess("Failed creating coordination node session");
58+
var coordinationSession = coordinationClient.createSession(
59+
YDB_LOCK_NODE_NAME, CoordinationSessionSettings.newBuilder()
60+
.withRetryPolicy(new RetryForever(500))
61+
.build()
62+
);
5163

52-
logger.debug("Created coordination node session");
64+
var statusCS = coordinationSession.connect().join();
5365

54-
var semaphoreLease = coordinationSession.acquireEphemeralSemaphore(lockConfiguration.getName(), true,
55-
lockConfiguration.getLockAtMostFor()).join();
66+
if (!statusCS.isSuccess()) {
67+
logger.info("Failed creating coordination session [{}]", coordinationSession);
68+
69+
return Optional.empty();
70+
}
71+
72+
logger.info("Created coordination node session [{}]", coordinationSession);
73+
74+
Result<SemaphoreLease> semaphoreLease = coordinationSession.acquireEphemeralSemaphore(
75+
lockConfiguration.getName(),
76+
true,
77+
instanceInfo.getBytes(StandardCharsets.UTF_8),
78+
lockConfiguration.getLockAtMostFor()
79+
).join();
80+
81+
logger.debug(coordinationSession.toString());
5682

5783
if (semaphoreLease.isSuccess()) {
58-
logger.debug("Semaphore acquired");
84+
logger.info("Instance[{}] acquired semaphore[SemaphoreName={}]", instanceInfo,
85+
semaphoreLease.getValue().getSemaphoreName());
5986

60-
return Optional.of(new YdbSimpleLock(semaphoreLease.getValue()));
87+
return Optional.of(new YdbSimpleLock(semaphoreLease.getValue(), instanceInfo, coordinationSession));
6188
} else {
62-
logger.debug("Semaphore is not acquired");
89+
logger.info("Instance[{}] did not acquire semaphore", instanceInfo);
90+
6391
return Optional.empty();
6492
}
6593
}
6694

67-
private record YdbSimpleLock(SemaphoreLease semaphoreLease) implements SimpleLock {
95+
private record YdbSimpleLock(SemaphoreLease semaphoreLease, String metaInfo,
96+
CoordinationSession coordinationSession) implements SimpleLock {
6897
@Override
6998
public void unlock() {
99+
logger.info("Instance[{}] released semaphore[SemaphoreName={}]", metaInfo, semaphoreLease.getSemaphoreName());
100+
70101
semaphoreLease.release().join();
102+
103+
coordinationSession.close();
71104
}
72105
}
73106

shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
import java.time.Duration;
44
import java.time.Instant;
5-
import java.util.ArrayList;
65
import java.util.Optional;
76
import java.util.concurrent.ExecutionException;
87
import java.util.concurrent.Executors;
9-
import java.util.concurrent.Future;
108
import java.util.concurrent.atomic.AtomicBoolean;
119
import java.util.concurrent.atomic.AtomicInteger;
1210
import net.javacrumbs.shedlock.core.LockConfiguration;
@@ -55,11 +53,9 @@ public void integrationTest() throws ExecutionException, InterruptedException {
5553
var executorServer = Executors.newFixedThreadPool(10);
5654
var atomicInt = new AtomicInteger();
5755
var locked = new AtomicBoolean();
58-
var futures = new ArrayList<Future<?>>();
5956

60-
for (int i = 0; i < 100; i++) {
61-
final var ii = i;
62-
futures.add(executorServer.submit(() -> {
57+
for (int i = 0; i < 10; i++) {
58+
executorServer.submit(() -> {
6359
Optional<SimpleLock> optinal = Optional.empty();
6460

6561
while (optinal.isEmpty()) {
@@ -78,18 +74,14 @@ public void integrationTest() throws ExecutionException, InterruptedException {
7874
throw new RuntimeException(e);
7975
}
8076

81-
atomicInt.addAndGet(ii);
77+
atomicInt.addAndGet(50);
8278
locked.set(false);
8379
simpleLock.unlock();
8480
});
8581
}
86-
}));
82+
}).get();
8783
}
8884

89-
for (Future<?> future : futures) {
90-
future.get();
91-
}
92-
93-
Assertions.assertEquals(4950, atomicInt.get());
85+
Assertions.assertEquals(500, atomicInt.get());
9486
}
9587
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver
1+
spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver
2+
3+
logging.level.tech.ydb.lock.provider=debug

0 commit comments

Comments
 (0)