11
11
import net .javacrumbs .shedlock .support .Utils ;
12
12
import org .slf4j .Logger ;
13
13
import org .slf4j .LoggerFactory ;
14
+ import tech .ydb .common .retry .RetryForever ;
14
15
import tech .ydb .coordination .CoordinationClient ;
15
16
import tech .ydb .coordination .CoordinationSession ;
16
17
import tech .ydb .coordination .SemaphoreLease ;
18
+ import tech .ydb .coordination .settings .CoordinationSessionSettings ;
17
19
import tech .ydb .core .Result ;
18
20
import tech .ydb .jdbc .YdbConnection ;
19
21
@@ -28,8 +30,6 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
28
30
private final YdbConnection ydbConnection ;
29
31
private final CoordinationClient coordinationClient ;
30
32
31
- private volatile CoordinationSession coordinationSession ;
32
-
33
33
public YdbCoordinationServiceLockProvider (YdbConnection ydbConnection ) {
34
34
this .ydbConnection = ydbConnection ;
35
35
this .coordinationClient = CoordinationClient .newClient (ydbConnection .getCtx ().getGrpcTransport ());
@@ -39,21 +39,6 @@ public void init() {
39
39
for (int i = 0 ; i < ATTEMPT_CREATE_NODE ; i ++) {
40
40
var status = coordinationClient .createNode (YDB_LOCK_NODE_NAME ).join ();
41
41
42
- if (status .isSuccess ()) {
43
- coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
44
-
45
- var statusCS = coordinationSession .connect ().join ();
46
-
47
- if (statusCS .isSuccess ()) {
48
- logger .info ("Created coordination node session [{}]" , coordinationSession );
49
-
50
- return ;
51
- }
52
- if (i == ATTEMPT_CREATE_NODE - 1 ) {
53
- statusCS .expectSuccess ("Failed creating coordination node session" );
54
- }
55
- }
56
-
57
42
if (i == ATTEMPT_CREATE_NODE - 1 ) {
58
43
status .expectSuccess ("Failed created coordination service node: " + YDB_LOCK_NODE_NAME );
59
44
}
@@ -70,39 +55,57 @@ public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
70
55
71
56
logger .info ("Instance[{}] is trying to become a leader..." , instanceInfo );
72
57
58
+ var coordinationSession = coordinationClient .createSession (
59
+ YDB_LOCK_NODE_NAME , CoordinationSessionSettings .newBuilder ()
60
+ .withRetryPolicy (new RetryForever (500 ))
61
+ .build ()
62
+ );
63
+
64
+ var statusCS = coordinationSession .connect ().join ();
65
+
66
+ if (!statusCS .isSuccess ()) {
67
+ logger .info ("Failed creating coordination session [{}]" , coordinationSession );
68
+
69
+ return Optional .empty ();
70
+ }
71
+
72
+ logger .info ("Created coordination node session [{}]" , coordinationSession );
73
+
73
74
Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
74
75
lockConfiguration .getName (),
75
76
true ,
76
77
instanceInfo .getBytes (StandardCharsets .UTF_8 ),
77
78
lockConfiguration .getLockAtMostFor ()
78
79
).join ();
79
80
81
+ logger .debug (coordinationSession .toString ());
82
+
80
83
if (semaphoreLease .isSuccess ()) {
81
84
logger .info ("Instance[{}] acquired semaphore[SemaphoreName={}]" , instanceInfo ,
82
85
semaphoreLease .getValue ().getSemaphoreName ());
83
86
84
- return Optional .of (new YdbSimpleLock (semaphoreLease .getValue (), instanceInfo ));
87
+ return Optional .of (new YdbSimpleLock (semaphoreLease .getValue (), instanceInfo , coordinationSession ));
85
88
} else {
86
89
logger .info ("Instance[{}] did not acquire semaphore" , instanceInfo );
87
90
88
91
return Optional .empty ();
89
92
}
90
93
}
91
94
92
- private record YdbSimpleLock (SemaphoreLease semaphoreLease , String metaInfo ) implements SimpleLock {
95
+ private record YdbSimpleLock (SemaphoreLease semaphoreLease , String metaInfo ,
96
+ CoordinationSession coordinationSession ) implements SimpleLock {
93
97
@ Override
94
98
public void unlock () {
95
99
logger .info ("Instance[{}] released semaphore[SemaphoreName={}]" , metaInfo , semaphoreLease .getSemaphoreName ());
96
100
97
101
semaphoreLease .release ().join ();
102
+
103
+ coordinationSession .close ();
98
104
}
99
105
}
100
106
101
107
@ PreDestroy
102
108
private void close () throws SQLException {
103
- // closing coordination session
104
- coordinationSession .close ();
105
-
106
109
ydbConnection .close ();
107
110
}
108
111
}
0 commit comments