1
1
package tech .ydb .lock .provider ;
2
2
3
+ import java .nio .charset .StandardCharsets ;
3
4
import java .sql .SQLException ;
4
5
import java .util .Optional ;
6
+ import javax .annotation .PostConstruct ;
5
7
import javax .annotation .PreDestroy ;
6
8
import net .javacrumbs .shedlock .core .LockConfiguration ;
7
9
import net .javacrumbs .shedlock .core .LockProvider ;
8
10
import net .javacrumbs .shedlock .core .SimpleLock ;
11
+ import net .javacrumbs .shedlock .support .Utils ;
9
12
import org .slf4j .Logger ;
10
13
import org .slf4j .LoggerFactory ;
11
14
import tech .ydb .coordination .CoordinationClient ;
15
+ import tech .ydb .coordination .CoordinationSession ;
12
16
import tech .ydb .coordination .SemaphoreLease ;
17
+ import tech .ydb .core .Result ;
13
18
import tech .ydb .jdbc .YdbConnection ;
14
19
15
20
/**
@@ -19,21 +24,38 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
19
24
private static final Logger logger = LoggerFactory .getLogger (YdbCoordinationServiceLockProvider .class );
20
25
private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb" ;
21
26
private static final int ATTEMPT_CREATE_NODE = 10 ;
27
+ private static final String INSTANCE_INFO =
28
+ "{Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid () + "}" ;
29
+ private static final byte [] INSTANCE_INFO_BYTES = INSTANCE_INFO .getBytes (StandardCharsets .UTF_8 );
22
30
23
31
private final YdbConnection ydbConnection ;
24
32
private final CoordinationClient coordinationClient ;
25
33
34
+ private volatile CoordinationSession coordinationSession ;
35
+
26
36
public YdbCoordinationServiceLockProvider (YdbConnection ydbConnection ) {
27
37
this .ydbConnection = ydbConnection ;
28
38
this .coordinationClient = CoordinationClient .newClient (ydbConnection .getCtx ().getGrpcTransport ());
29
39
}
30
40
41
+ @ PostConstruct
31
42
public void init () {
32
43
for (int i = 0 ; i < ATTEMPT_CREATE_NODE ; i ++) {
33
44
var status = coordinationClient .createNode (YDB_LOCK_NODE_NAME ).join ();
34
45
35
46
if (status .isSuccess ()) {
36
- return ;
47
+ coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
48
+
49
+ var statusCS = coordinationSession .connect ().join ();
50
+
51
+ if (statusCS .isSuccess ()) {
52
+ logger .info ("Created coordination node session [{}]" , coordinationSession );
53
+
54
+ return ;
55
+ }
56
+ if (i == ATTEMPT_CREATE_NODE - 1 ) {
57
+ statusCS .expectSuccess ("Failed creating coordination node session" );
58
+ }
37
59
}
38
60
39
61
if (i == ATTEMPT_CREATE_NODE - 1 ) {
@@ -44,35 +66,41 @@ public void init() {
44
66
45
67
@ Override
46
68
public Optional <SimpleLock > lock (LockConfiguration lockConfiguration ) {
47
- var coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
48
-
49
- coordinationSession .connect ().join ()
50
- .expectSuccess ("Failed creating coordination node session" );
69
+ logger .info ("Instance[{}] is trying to become a leader..." , INSTANCE_INFO );
51
70
52
- logger .debug ("Created coordination node session" );
53
-
54
- var semaphoreLease = coordinationSession .acquireEphemeralSemaphore (lockConfiguration .getName (), true ,
55
- lockConfiguration .getLockAtMostFor ()).join ();
71
+ Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
72
+ lockConfiguration .getName (),
73
+ true ,
74
+ INSTANCE_INFO_BYTES ,
75
+ lockConfiguration .getLockAtMostFor ()
76
+ ).join ();
56
77
57
78
if (semaphoreLease .isSuccess ()) {
58
- logger .debug ("Semaphore acquired" );
79
+ logger .info ("Instance[{}] acquired semaphore[SemaphoreName={}]" , INSTANCE_INFO ,
80
+ semaphoreLease .getValue ().getSemaphoreName ());
59
81
60
82
return Optional .of (new YdbSimpleLock (semaphoreLease .getValue ()));
61
83
} else {
62
- logger .debug ("Semaphore is not acquired" );
84
+ logger .info ("Instance[{}] did not acquire semaphore" , INSTANCE_INFO );
85
+
63
86
return Optional .empty ();
64
87
}
65
88
}
66
89
67
90
private record YdbSimpleLock (SemaphoreLease semaphoreLease ) implements SimpleLock {
68
91
@ Override
69
92
public void unlock () {
93
+ logger .info ("Instance[{}] released semaphore[SemaphoreName={}]" , INSTANCE_INFO , semaphoreLease .getSemaphoreName ());
94
+
70
95
semaphoreLease .release ().join ();
71
96
}
72
97
}
73
98
74
99
@ PreDestroy
75
100
private void close () throws SQLException {
101
+ // closing coordination session
102
+ coordinationSession .close ();
103
+
76
104
ydbConnection .close ();
77
105
}
78
106
}
0 commit comments