Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit e9ed4de

Browse files
committed
Adding serializing functions for serializing & de-serializing locks, leader_election_record as a class
1 parent 40f0485 commit e9ed4de

File tree

4 files changed

+124
-73
lines changed

4 files changed

+124
-73
lines changed

leaderelection/electionconfig.py

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import logging
1717
logging.basicConfig(level=logging.INFO)
1818

19+
1920
class Config:
2021
# Validate config, exit if an error is detected
2122
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):

leaderelection/leaderelection.py

+41-55
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import datetime
1616
import sys
1717
import time
18-
import ast
18+
import json
1919
import threading
20+
from leaderelectionrecord import LeaderElectionRecord
2021
import logging
22+
2123
logging.basicConfig(level=logging.INFO)
2224

2325

@@ -35,16 +37,6 @@ def __init__(self, election_config):
3537
# Latest update time of the lock
3638
self.observed_time_milliseconds = 0
3739

38-
39-
# Annotation used in the lock object
40-
def leaderelector_record(self, election_config, now):
41-
return {
42-
"holderIdentity": str(election_config.lock.identity),
43-
"leaseDurationSeconds": str(election_config.lease_duration),
44-
"acquireTime": str(now),
45-
"renewTime": str(now)
46-
}
47-
4840
# Point of entry to Leader election
4941
def run(self):
5042
# Try to create/ acquire a lock
@@ -77,13 +69,13 @@ def renew_loop(self):
7769
logging.info("Leader has entered renew loop and will try to update lease continuously")
7870

7971
retry_period = self.election_config.retry_period
80-
renew_deadline = self.election_config.renew_deadline*1000
72+
renew_deadline = self.election_config.renew_deadline * 1000
8173

8274
while True:
83-
timeout = int(time.time()*1000) + renew_deadline
75+
timeout = int(time.time() * 1000) + renew_deadline
8476
succeeded = False
8577

86-
while int(time.time()*1000) < timeout:
78+
while int(time.time() * 1000) < timeout:
8779
succeeded = self.try_acquire_or_renew()
8880

8981
if succeeded:
@@ -101,82 +93,76 @@ def try_acquire_or_renew(self):
10193
now = datetime.datetime.now()
10294

10395
# Check if lock is created
104-
lock_status, lock_response, lock_record = self.election_config.lock.get(self.election_config.lock.name,
105-
self.election_config.lock.namespace)
96+
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
97+
self.election_config.lock.namespace)
10698

10799
# create a default Election record for this candidate
108-
leader_election_record = self.leaderelector_record(self.election_config, now)
100+
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
101+
str(self.election_config.lease_duration), str(now), str(now))
109102

110103
# A lock is not created with that name, try to create one
111104
if not lock_status:
112-
if ast.literal_eval(lock_response.body)['code'] != 404:
113-
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name, lock_response.reason))
105+
if json.loads(old_election_record.body)['code'] != 404:
106+
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
107+
old_election_record.reason))
114108
return False
115109

116-
117-
logging.info("{} is trying to create a lock".format(leader_election_record['holderIdentity']))
118-
create_status, create_response = self.election_config.lock.create(name=self.election_config.lock.name,
119-
namespace=self.election_config.lock.namespace,
120-
election_record=leader_election_record)
110+
logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
111+
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
112+
namespace=self.election_config.lock.namespace,
113+
election_record=leader_election_record)
121114

122115
if create_status is False:
116+
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
123117
return False
124118

125119
self.observed_record = leader_election_record
126120
self.observed_time_milliseconds = int(time.time() * 1000)
127121
return True
128122

129123
# A lock exists with that name
130-
# Validate lock_record
131-
if lock_record is None:
124+
# Validate old_election_record
125+
if old_election_record is None:
132126
# try to update lock with proper annotation and election record
133-
return self.update_lock(lock_response, leader_election_record)
134-
135-
# check for any key, value errors in the record
136-
try:
137-
old_election_record = ast.literal_eval(lock_record)
138-
if (old_election_record['holderIdentity'] == '' or old_election_record['leaseDurationSeconds'] == ''
139-
or old_election_record['acquireTime'] == '' or old_election_record['renewTime'] == ''):
140-
# try to update lock with proper annotation and election record
141-
return self.update_lock(lock_response, leader_election_record)
142-
except:
143-
# try to update lock with proper annotation and election record
144-
return self.update_lock(lock_response, leader_election_record)
127+
return self.update_lock(leader_election_record)
145128

129+
if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
130+
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
131+
# try to update lock with proper annotation and election record
132+
return self.update_lock(leader_election_record)
146133

147134
# Report transitions
148-
if self.observed_record and self.observed_record['holderIdentity'] != old_election_record['holderIdentity']:
149-
logging.info("Leader has switched to {}".format(old_election_record['holderIdentity']))
135+
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
136+
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))
150137

151-
if old_election_record != self.observed_record:
138+
if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
152139
self.observed_record = old_election_record
153140
self.observed_time_milliseconds = int(time.time() * 1000)
154141

155142
# If This candidate is not the leader and lease duration is yet to finish
156-
if (str(self.election_config.lock.identity) != self.observed_record['holderIdentity']
157-
and self.observed_time_milliseconds + self.election_config.lease_duration*1000 > int(now.timestamp()*1000)):
158-
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record['holderIdentity']))
143+
if (self.election_config.lock.identity != self.observed_record.holder_identity
144+
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now.timestamp() * 1000)):
145+
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
159146
return False
160147

161148
# If this candidate is the Leader
162-
if str(self.election_config.lock.identity) == self.observed_record['holderIdentity']:
163-
# Leader sets acquireTime
164-
leader_election_record['acquireTime'] = self.observed_record['acquireTime']
149+
if self.election_config.lock.identity == self.observed_record.holder_identity:
150+
# Leader updates renewTime, but keeps acquire_time unchanged
151+
leader_election_record.acquire_time = self.observed_record.acquire_time
165152

166-
return self.update_lock(lock_response, leader_election_record)
153+
return self.update_lock(leader_election_record)
167154

168-
def update_lock(self, lock_response, leader_election_record):
155+
def update_lock(self, leader_election_record):
169156
# Update object with latest election record
170-
update_status, update_response = self.election_config.lock.update(self.election_config.lock.name,
171-
self.election_config.lock.namespace,
172-
lock_response, leader_election_record)
157+
update_status = self.election_config.lock.update(self.election_config.lock.name,
158+
self.election_config.lock.namespace,
159+
leader_election_record)
173160

174161
if update_status is False:
175-
logging.info("{} failed to acquire lease".format(leader_election_record['holderIdentity']))
162+
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
176163
return False
177164

178165
self.observed_record = leader_election_record
179166
self.observed_time_milliseconds = int(time.time() * 1000)
180-
logging.info("leader {} has successfully acquired lease".format(leader_election_record['holderIdentity']))
167+
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
181168
return True
182-
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright 2020 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
class LeaderElectionRecord:
17+
# Annotation used in the lock object
18+
def __init__(self, holder_identity, lease_duration, acquire_time, renew_time):
19+
self.holder_identity = holder_identity
20+
self.lease_duration = lease_duration
21+
self.acquire_time = acquire_time
22+
self.renew_time = renew_time

leaderelection/resourcelock/configmaplock.py

+60-18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
from kubernetes.client.rest import ApiException
1616
from kubernetes import client, config
1717
from kubernetes.client.api_client import ApiClient
18+
from leaderelectionrecord import LeaderElectionRecord
19+
import json
20+
import logging
21+
logging.basicConfig(level=logging.INFO)
22+
1823

1924
class ConfigMapLock:
2025
def __init__(self, name, namespace, identity):
@@ -27,61 +32,98 @@ def __init__(self, name, namespace, identity):
2732
self.leader_electionrecord_annotationkey = 'control-plane.alpha.kubernetes.io/leader'
2833
self.name = name
2934
self.namespace = namespace
30-
self.identity = identity
35+
self.identity = str(identity)
36+
self.configmap_reference = None
37+
self.lock_record = {
38+
'holderIdentity': None,
39+
'leaseDurationSeconds': None,
40+
'acquireTime': None,
41+
'renewTime': None
42+
}
3143

3244
# get returns the election record from a ConfigMap Annotation
3345
def get(self, name, namespace):
3446
"""
3547
:param name: Name of the configmap object information to get
3648
:param namespace: Namespace in which the configmap object is to be searched
37-
:return: 'True, response, record' if object found else 'False, exception, None'
49+
:return: 'True, election record' if object found else 'False, exception response'
3850
"""
3951
try:
4052
api_response = self.api_instance.read_namespaced_config_map(name, namespace)
4153

4254
# If an annotation does not exist - add the leader_electionrecord_annotationkey
43-
if api_response.metadata.annotations is None or api_response.metadata.annotations == '':
55+
annotations = api_response.metadata.annotations
56+
if annotations is None or annotations == '':
4457
api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''}
45-
return True, api_response, None
58+
self.configmap_reference = api_response
59+
return True, None
4660

4761
# If an annotation exists but, the leader_electionrecord_annotationkey does not then add it as a key
48-
if not api_response.metadata.annotations.get(self.leader_electionrecord_annotationkey):
62+
if not annotations.get(self.leader_electionrecord_annotationkey):
4963
api_response.metadata.annotations = {self.leader_electionrecord_annotationkey: ''}
50-
return True, api_response, None
64+
self.configmap_reference = api_response
65+
return True, None
66+
67+
lock_record = self.get_lock_object(json.loads(annotations[self.leader_electionrecord_annotationkey]))
5168

52-
return True, api_response, api_response.metadata.annotations[self.leader_electionrecord_annotationkey]
69+
self.configmap_reference = api_response
70+
return True, lock_record
5371
except ApiException as e:
54-
return False, e, None
72+
return False, e
5573

5674
def create(self, name, namespace, election_record):
5775
"""
5876
:param electionRecord: Annotation string
5977
:param name: Name of the configmap object to be created
6078
:param namespace: Namespace in which the configmap object is to be created
61-
:return: 'True, None' if object is created else 'False, error' if failed
79+
:return: 'True' if object is created else 'False' if failed
6280
"""
6381
body = client.V1ConfigMap(
64-
metadata={"name": name, "annotations": {self.leader_electionrecord_annotationkey: str(election_record)}})
82+
metadata={"name": name,
83+
"annotations": {self.leader_electionrecord_annotationkey: json.dumps(self.get_lock_dict(election_record))}})
6584

6685
try:
6786
api_response = self.api_instance.create_namespaced_config_map(namespace, body, pretty=True)
68-
return True, api_response
87+
return True
6988
except ApiException as e:
70-
return False, e
89+
logging.info("Failed to create lock as {}".format(e))
90+
return False
7191

72-
def update(self, name, namespace, update_object, updated_record):
92+
def update(self, name, namespace, updated_record):
7393
"""
7494
:param name: name of the lock to be updated
7595
:param namespace: namespace the lock is in
76-
:param update_object: the object used to update the lock
7796
:param updated_record: the updated election record
7897
:return: True if update is succesful False if it fails
7998
"""
8099
try:
81100
# Set the updated record
82-
update_object.metadata.annotations[self.leader_electionrecord_annotationkey] = str(updated_record)
83-
api_response = self.api_instance.replace_namespaced_config_map(name=name, namespace=namespace, body=update_object)
84-
return True, api_response
101+
self.configmap_reference.metadata.annotations[self.leader_electionrecord_annotationkey] = json.dumps(self.get_lock_dict(updated_record))
102+
api_response = self.api_instance.replace_namespaced_config_map(name=name, namespace=namespace,
103+
body=self.configmap_reference)
104+
return True
85105
except ApiException as e:
86-
return False, e
106+
logging.info("Failed to update lock as {}".format(e))
107+
return False
108+
109+
def get_lock_object(self, lock_record):
110+
leader_election_record = LeaderElectionRecord(None, None, None, None)
111+
112+
if lock_record.get('holderIdentity'):
113+
leader_election_record.holder_identity = lock_record['holderIdentity']
114+
if lock_record.get('leaseDurationSeconds'):
115+
leader_election_record.lease_duration = lock_record['leaseDurationSeconds']
116+
if lock_record.get('acquireTime'):
117+
leader_election_record.acquire_time = lock_record['acquireTime']
118+
if lock_record.get('renewTime'):
119+
leader_election_record.renew_time = lock_record['renewTime']
120+
121+
return leader_election_record
87122

123+
def get_lock_dict(self, leader_election_record):
124+
self.lock_record['holderIdentity'] = leader_election_record.holder_identity
125+
self.lock_record['leaseDurationSeconds'] = leader_election_record.lease_duration
126+
self.lock_record['acquireTime'] = leader_election_record.acquire_time
127+
self.lock_record['renewTime'] = leader_election_record.renew_time
128+
129+
return self.lock_record

0 commit comments

Comments
 (0)