2
2
3
3
import java .nio .charset .StandardCharsets ;
4
4
import java .sql .SQLException ;
5
+ import java .time .Instant ;
5
6
import java .util .Optional ;
6
7
import javax .annotation .PostConstruct ;
7
8
import javax .annotation .PreDestroy ;
14
15
import tech .ydb .coordination .CoordinationClient ;
15
16
import tech .ydb .coordination .CoordinationSession ;
16
17
import tech .ydb .coordination .SemaphoreLease ;
18
+ import tech .ydb .coordination .settings .DescribeSemaphoreMode ;
17
19
import tech .ydb .core .Result ;
18
20
import tech .ydb .jdbc .YdbConnection ;
19
21
@@ -25,7 +27,7 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
25
27
private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb" ;
26
28
private static final int ATTEMPT_CREATE_NODE = 10 ;
27
29
private static final String INSTANCE_INFO =
28
- "{ Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid () + "}" ;
30
+ "Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid ();
29
31
private static final byte [] INSTANCE_INFO_BYTES = INSTANCE_INFO .getBytes (StandardCharsets .UTF_8 );
30
32
31
33
private final YdbConnection ydbConnection ;
@@ -66,7 +68,36 @@ public void init() {
66
68
67
69
@ Override
68
70
public Optional <SimpleLock > lock (LockConfiguration lockConfiguration ) {
69
- logger .info ("Instance[{}] is trying to become a leader..." , INSTANCE_INFO );
71
+ var now = Instant .now ();
72
+
73
+ String instanceInfo = "Hostname=" + Utils .getHostname () + ", " +
74
+ "Current PID=" + ProcessHandle .current ().pid () + ", " +
75
+ "CreatedAt=" + now ;
76
+
77
+ logger .info ("Instance[{}] is trying to become a leader..." , instanceInfo );
78
+
79
+ var describeResult = coordinationSession .describeSemaphore (
80
+ lockConfiguration .getName (),
81
+ DescribeSemaphoreMode .WITH_OWNERS
82
+ ).join ();
83
+
84
+ if (describeResult .isSuccess ()) {
85
+ var describe = describeResult .getValue ();
86
+ var describePayload = new String (describe .getData (), StandardCharsets .UTF_8 );
87
+
88
+ logger .debug ("Received DescribeSemaphore[Name={}, Data={}]" , describe .getName (), describePayload );
89
+
90
+ Instant createdLeaderTimestampUTC = Instant .parse (describePayload .split ("," )[2 ].split ("=" )[1 ]);
91
+
92
+ if (now .isAfter (createdLeaderTimestampUTC .plus (lockConfiguration .getLockAtMostFor ()))) {
93
+ var deleteResult = coordinationSession .deleteSemaphore (describe .getName (), true ).join ();
94
+ logger .debug ("Delete semaphore[Name={}] result: {}" , describe .getName (), deleteResult );
95
+ }
96
+ } else {
97
+ // no success, ephemeral semaphore is not created
98
+
99
+ logger .debug ("Semaphore[Name={}] not found" , lockConfiguration .getName ());
100
+ }
70
101
71
102
Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
72
103
lockConfiguration .getName (),
0 commit comments