20
20
import logging
21
21
logging .basicConfig (level = logging .INFO )
22
22
23
+
23
24
class LeaderElection :
24
25
def __init__ (self , election_config ):
25
26
if election_config is None :
@@ -36,12 +37,12 @@ def __init__(self, election_config):
36
37
37
38
38
39
# Annotation used in the lock object
39
- def leaderelector_record (self , election_config ):
40
+ def leaderelector_record (self , election_config , now ):
40
41
return {
41
42
"holderIdentity" : str (election_config .lock .identity ),
42
43
"leaseDurationSeconds" : str (election_config .lease_duration ),
43
- "acquireTime" : str (datetime . datetime . now () ),
44
- "renewTime" : str (datetime . datetime . now () )
44
+ "acquireTime" : str (now ),
45
+ "renewTime" : str (now )
45
46
}
46
47
47
48
# Point of entry to Leader election
@@ -97,51 +98,22 @@ def renew_loop(self):
97
98
return
98
99
99
100
def try_acquire_or_renew (self ):
101
+ now = datetime .datetime .now ()
102
+
100
103
# Check if lock is created
101
- lock_status , lock_response = self .election_config .lock .get (self .election_config .lock .name , self .election_config .lock .namespace )
104
+ lock_status , lock_response , lock_record = self .election_config .lock .get (self .election_config .lock .name ,
105
+ self .election_config .lock .namespace )
102
106
103
107
# create a default Election record for this candidate
104
- leader_election_record = self .leaderelector_record (self .election_config )
105
-
106
- # If a lock is already created with that name
107
- if lock_status :
108
- old_election_record = ast .literal_eval (lock_response .metadata .annotations [self .election_config .lock .leader_electionrecord_annotationkey ])
109
-
110
- # Report transitions
111
- if self .observed_record and self .observed_record ['holderIdentity' ] != old_election_record ['holderIdentity' ]:
112
- logging .info ("Leader has switched to {}" .format (old_election_record ['holderIdentity' ]))
113
-
114
-
115
- if old_election_record != self .observed_record :
116
- self .observed_record = old_election_record
117
- self .observed_time_milliseconds = int (time .time ()* 1000 )
118
-
119
-
120
- # If This candidate is not the leader and lease duration is yet to finish
121
- if str (self .election_config .lock .identity ) != self .observed_record ['holderIdentity' ] and self .observed_time_milliseconds + self .election_config .lease_duration * 1000 > int (time .time ()* 1000 ):
122
- logging .info ("yet to finish lease_duration, lease held by {} and has not expired" .format (old_election_record ['holderIdentity' ]))
123
- return False
108
+ leader_election_record = self .leaderelector_record (self .election_config , now )
124
109
125
- # If this candidate is the Leader
126
- if str (self .election_config .lock .identity ) == self .observed_record ['holderIdentity' ]:
127
- # Leader sets acquireTime
128
- leader_election_record ['acquireTime' ] = self .observed_record ['acquireTime' ]
129
-
130
- # Update object with latest election record
131
- lock_response .metadata .annotations [self .election_config .lock .leader_electionrecord_annotationkey ] = str (leader_election_record )
132
- update_status , update_response = self .election_config .lock .update (self .election_config .lock .name , self .election_config .lock .namespace , lock_response )
133
-
134
- if update_status is False :
135
- logging .info ("{} failed to acquire lease" .format (leader_election_record ['holderIdentity' ]))
110
+ # A lock is not created with that name, try to create one
111
+ if not lock_status :
112
+ if ast .literal_eval (lock_response .body )['code' ] != 404 :
113
+ logging .info ("Error retrieving resource lock {} as {}" .format (self .election_config .lock .name , lock_response .reason ))
136
114
return False
137
115
138
- self .observed_record = leader_election_record
139
- self .observed_time_milliseconds = int (time .time ()* 1000 )
140
- logging .info ("leader {} has successfully acquired lease" .format (leader_election_record ['holderIdentity' ]))
141
- return True
142
116
143
- # A lock is not created with that name, try to create one
144
- else :
145
117
logging .info ("{} is trying to create a lock" .format (leader_election_record ['holderIdentity' ]))
146
118
create_status , create_response = self .election_config .lock .create (name = self .election_config .lock .name ,
147
119
namespace = self .election_config .lock .namespace ,
@@ -151,6 +123,60 @@ def try_acquire_or_renew(self):
151
123
return False
152
124
153
125
self .observed_record = leader_election_record
154
- self .observed_time_milliseconds = int (time .time ()* 1000 )
126
+ self .observed_time_milliseconds = int (time .time () * 1000 )
155
127
return True
156
128
129
+ # A lock exists with that name
130
+ # Validate lock_record
131
+ if lock_record is None :
132
+ # try to update lock with proper annotation and election record
133
+ return self .update_lock (lock_response , leader_election_record )
134
+
135
+ # check for any key, value errors in the record
136
+ try :
137
+ old_election_record = ast .literal_eval (lock_record )
138
+ if (old_election_record ['holderIdentity' ] == '' or old_election_record ['leaseDurationSeconds' ] == ''
139
+ or old_election_record ['acquireTime' ] == '' or old_election_record ['renewTime' ] == '' ):
140
+ # try to update lock with proper annotation and election record
141
+ return self .update_lock (lock_response , leader_election_record )
142
+ except :
143
+ # try to update lock with proper annotation and election record
144
+ return self .update_lock (lock_response , leader_election_record )
145
+
146
+
147
+ # Report transitions
148
+ if self .observed_record and self .observed_record ['holderIdentity' ] != old_election_record ['holderIdentity' ]:
149
+ logging .info ("Leader has switched to {}" .format (old_election_record ['holderIdentity' ]))
150
+
151
+ if old_election_record != self .observed_record :
152
+ self .observed_record = old_election_record
153
+ self .observed_time_milliseconds = int (time .time () * 1000 )
154
+
155
+ # If This candidate is not the leader and lease duration is yet to finish
156
+ if (str (self .election_config .lock .identity ) != self .observed_record ['holderIdentity' ]
157
+ and self .observed_time_milliseconds + self .election_config .lease_duration * 1000 > int (now .timestamp ()* 1000 )):
158
+ logging .info ("yet to finish lease_duration, lease held by {} and has not expired" .format (old_election_record ['holderIdentity' ]))
159
+ return False
160
+
161
+ # If this candidate is the Leader
162
+ if str (self .election_config .lock .identity ) == self .observed_record ['holderIdentity' ]:
163
+ # Leader sets acquireTime
164
+ leader_election_record ['acquireTime' ] = self .observed_record ['acquireTime' ]
165
+
166
+ return self .update_lock (lock_response , leader_election_record )
167
+
168
+ def update_lock (self , lock_response , leader_election_record ):
169
+ # Update object with latest election record
170
+ update_status , update_response = self .election_config .lock .update (self .election_config .lock .name ,
171
+ self .election_config .lock .namespace ,
172
+ lock_response , leader_election_record )
173
+
174
+ if update_status is False :
175
+ logging .info ("{} failed to acquire lease" .format (leader_election_record ['holderIdentity' ]))
176
+ return False
177
+
178
+ self .observed_record = leader_election_record
179
+ self .observed_time_milliseconds = int (time .time () * 1000 )
180
+ logging .info ("leader {} has successfully acquired lease" .format (leader_election_record ['holderIdentity' ]))
181
+ return True
182
+
0 commit comments