Skip to content

Commit 214d8e3

Browse files
committed
Add leaselock
Add leaselock, derived from [1]. [1]: https://github.com/kubernetes-client/python/pull/2314/files
1 parent cce9f8b commit 214d8e3

File tree

4 files changed

+153
-9
lines changed

4 files changed

+153
-9
lines changed

examples/leaderelection.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
from kubernetes_asyncio import config
2020
from kubernetes_asyncio.client import api_client
2121
from kubernetes_asyncio.leaderelection import electionconfig, leaderelection
22-
from kubernetes_asyncio.leaderelection.resourcelock.configmaplock import (
23-
ConfigMapLock,
24-
)
22+
from kubernetes_asyncio.leaderelection.resourcelock.leaselock import LeaseLock
2523

2624

2725
async def main():
@@ -59,7 +57,8 @@ async def example_end_func():
5957
async with api_client.ApiClient() as apic:
6058
# Create config
6159
leader_election_config = electionconfig.Config(
62-
ConfigMapLock(lock_name, lock_namespace, candidate_id, apic),
60+
# A legacy ConfigMapLock is also available
61+
LeaseLock(lock_name, lock_namespace, candidate_id, apic),
6362
lease_duration=17,
6463
renew_deadline=15,
6564
retry_period=5,

kubernetes_asyncio/leaderelection/leaderelection.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
the leader lock is obtained and then lost.
3131
3232
At first all candidates are considered followers. The one to create a lock or
33-
update an existing lock first becomes the leader and remains so until it keeps
34-
renewing its lease.
33+
update an existing lock first becomes the leader and remains so until it fails
34+
to renew its lease.
3535
"""
3636

3737

@@ -166,7 +166,7 @@ async def try_acquire_or_renew(self):
166166
# A lock exists with that name
167167
# Validate old_election_record
168168
if old_election_record is None:
169-
# try to update lock with proper annotation and election record
169+
# try to update lock with proper election record
170170
return await self.update_lock(leader_election_record)
171171

172172
if (
@@ -175,7 +175,7 @@ async def try_acquire_or_renew(self):
175175
or old_election_record.acquire_time is None
176176
or old_election_record.renew_time is None
177177
):
178-
# try to update lock with proper annotation and election record
178+
# try to update lock with proper election record
179179
return await self.update_lock(leader_election_record)
180180

181181
# Report transitions

kubernetes_asyncio/leaderelection/leaderelectionrecord.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
class LeaderElectionRecord:
17-
# Annotation used in the lock object
17+
# Leader election details, used in the lock object
1818
def __init__(self, holder_identity, lease_duration, acquire_time, renew_time):
1919
self.holder_identity = holder_identity
2020
self.lease_duration = lease_duration
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
from datetime import datetime
17+
18+
from kubernetes_asyncio import client
19+
from kubernetes_asyncio.client.rest import ApiException
20+
21+
from ..leaderelectionrecord import LeaderElectionRecord
22+
23+
logging.basicConfig(level=logging.INFO)
24+
25+
26+
class LeaseLock:
27+
def __init__(self, name, namespace, identity, api_client):
28+
"""
29+
:param name: name of the lock
30+
:param namespace: namespace
31+
:param identity: A unique identifier that the candidate is using
32+
"""
33+
self.api_instance = client.CoordinationV1Api(api_client=api_client)
34+
35+
# lease resource identity and reference
36+
self.name = name
37+
self.namespace = namespace
38+
self.lease_reference = None
39+
40+
# identity of this candidate
41+
self.identity = str(identity)
42+
43+
# get returns the election record from a Lease Annotation
44+
async def get(self, name, namespace):
45+
"""
46+
:param name: Name of the lease object information to get
47+
:param namespace: Namespace in which the lease object is to be searched
48+
:return: 'True, election record' if object found else 'False, exception response'
49+
"""
50+
try:
51+
lease = await self.api_instance.read_namespaced_lease(name, namespace)
52+
except ApiException as e:
53+
return False, e
54+
else:
55+
self.lease_reference = lease
56+
return True, self.election_record(lease)
57+
58+
async def create(self, name, namespace, election_record):
59+
"""
60+
:param electionRecord: Annotation string
61+
:param name: Name of the lease object to be created
62+
:param namespace: Namespace in which the lease object is to be created
63+
:return: 'True' if object is created else 'False' if failed
64+
"""
65+
body = client.V1Lease(
66+
metadata={"name": name}, spec=self.update_lease(election_record)
67+
)
68+
69+
try:
70+
await self.api_instance.create_namespaced_lease(
71+
namespace, body, pretty=True
72+
)
73+
return True
74+
except ApiException as e:
75+
logging.info("Failed to create lock as {}".format(e))
76+
return False
77+
78+
async def update(self, name, namespace, updated_record):
79+
"""
80+
:param name: name of the lock to be updated
81+
:param namespace: namespace the lock is in
82+
:param updated_record: the updated election record
83+
:return: True if update is successful False if it fails
84+
"""
85+
try:
86+
# update the Lease from the updated record
87+
self.lease_reference.spec = self.update_lease(
88+
updated_record, self.lease_reference.spec
89+
)
90+
91+
await self.api_instance.replace_namespaced_lease(
92+
name=name, namespace=namespace, body=self.lease_reference
93+
)
94+
return True
95+
except ApiException as e:
96+
logging.info("Failed to update lock as {}".format(e))
97+
return False
98+
99+
def update_lease(self, leader_election_record, current_spec=None):
100+
# existing or new lease?
101+
spec = current_spec if current_spec else client.V1LeaseSpec()
102+
103+
# lease configuration
104+
spec.holder_identity = leader_election_record.holder_identity
105+
spec.lease_duration_seconds = int(leader_election_record.lease_duration)
106+
spec.acquire_time = self.time_str_to_iso(leader_election_record.acquire_time)
107+
spec.renew_time = self.time_str_to_iso(leader_election_record.renew_time)
108+
109+
return spec
110+
111+
def election_record(self, lease):
112+
"""
113+
Get leader election record from Lease spec.
114+
"""
115+
leader_election_record = LeaderElectionRecord(None, None, None, None)
116+
117+
if not lease.spec:
118+
return leader_election_record
119+
120+
if lease.spec.holder_identity:
121+
leader_election_record.holder_identity = lease.spec.holder_identity
122+
if lease.spec.lease_duration_seconds:
123+
leader_election_record.lease_duration = str(
124+
lease.spec.lease_duration_seconds
125+
)
126+
if lease.spec.acquire_time:
127+
leader_election_record.acquire_time = str(
128+
datetime.replace(lease.spec.acquire_time, tzinfo=None)
129+
)
130+
if lease.spec.renew_time:
131+
leader_election_record.renew_time = str(
132+
datetime.replace(lease.spec.renew_time, tzinfo=None)
133+
)
134+
135+
return leader_election_record
136+
137+
# conversion between kubernetes ISO formatted time and elector record time
138+
def time_str_to_iso(self, str_time):
139+
formats = ["%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S.%f"]
140+
for fmt in formats:
141+
try:
142+
return datetime.strptime(str_time, fmt).isoformat() + "Z"
143+
except ValueError:
144+
pass
145+
logging.error("Failed to parse time string: {}".format(str_time))

0 commit comments

Comments
 (0)