Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 4d29af1

Browse files
committed
Leader Election issue #434
changed file naming style consistent with the existing go client code Update example.py Changed file and folder names Rename LeaderElection.py to leaderelection.py Rename threadingWithException.py to threadingwithexception.py Rename ConfigMapLock.py to configmaplock.py LeaderElection to leaderelection Added boiler plate headers, updated variable and function names consistent with the guidelines, removed the ctypes dependency by using traces to kill threads, changed logic for leader now it gives up and doesn't re-join as a follower if it fails to update lease added correct boiler plate year Rename threadingWithTrace.py to threadingwithtrace.py Update leaderelection.py Update example.py Changes based on review - logging, OnStoppedLeading is not killed abruptly, OnStartedLeading is not run in a separate thread, adding README Update example.py updated comments set threads as daemon Update README.md Code made consistent with other clients. Update example.py Update leaderelection.py Error & exception handling for the annotation, reduced indentation Adding serializing functions for serializing & de-serializing locks, leader_election_record as a class Adding a test Adding boilerplate header Rename leaderelectiontest.py to leaderelection_test.py Updated boiler plates handling imports for pytest handling 'HTTP not found' compatibility with python 2 & 3, & handling relative imports Update leaderelection.py to check tests for tox assertEquals -> assertEqual Update leaderelection_test.py making Threading compatible for Python 2 changing datetime.timestamp for backward compatibility with Python 2.7 Adding comments for test_Leader_election_with_renew_deadline & making candidates run in parallel for test_leader_election remove redundant daemon = True reassignment common thread lock for MockResourceLock
1 parent 54d188f commit 4d29af1

9 files changed

+769
-0
lines changed

leaderelection/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
## Leader Election Example
2+
This example demonstrates how to use the leader election library.
3+
4+
## Running
5+
Run the following command in multiple separate terminals preferably an odd number.
6+
Each running process uses a unique identifier displayed when it starts to run.
7+
8+
- When a program runs, if a lock object already exists with the specified name,
9+
all candidates will start as followers.
10+
- If a lock object does not exist with the specified name then whichever candidate
11+
creates a lock object first will become the leader and the rest will be followers.
12+
- The user will be prompted about the status of the candidates and transitions.
13+
14+
### Command to run
15+
```python example.py```
16+
17+
Now kill the existing leader. You will see from the terminal outputs that one of the
18+
remaining running processes will be elected as the new leader.

leaderelection/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

leaderelection/electionconfig.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
import logging
17+
logging.basicConfig(level=logging.INFO)
18+
19+
20+
class Config:
21+
# Validate config, exit if an error is detected
22+
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
23+
self.jitter_factor = 1.2
24+
25+
if lock is None:
26+
sys.exit("lock cannot be None")
27+
self.lock = lock
28+
29+
if lease_duration <= renew_deadline:
30+
sys.exit("lease_duration must be greater than renew_deadline")
31+
32+
if renew_deadline <= self.jitter_factor * retry_period:
33+
sys.exit("renewDeadline must be greater than retry_period*jitter_factor")
34+
35+
if lease_duration < 1:
36+
sys.exit("lease_duration must be greater than one")
37+
38+
if renew_deadline < 1:
39+
sys.exit("renew_deadline must be greater than one")
40+
41+
if retry_period < 1:
42+
sys.exit("retry_period must be greater than one")
43+
44+
self.lease_duration = lease_duration
45+
self.renew_deadline = renew_deadline
46+
self.retry_period = retry_period
47+
48+
if onstarted_leading is None:
49+
sys.exit("callback onstarted_leading cannot be None")
50+
self.onstarted_leading = onstarted_leading
51+
52+
if onstopped_leading is None:
53+
self.onstopped_leading = self.on_stoppedleading_callback
54+
else:
55+
self.onstopped_leading = onstopped_leading
56+
57+
# Default callback for when the current candidate if a leader, stops leading
58+
def on_stoppedleading_callback(self):
59+
logging.info("stopped leading".format(self.lock.identity))

leaderelection/example.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import uuid
16+
from kubernetes import client, config
17+
from leaderelection import leaderelection
18+
from leaderelection.resourcelock.configmaplock import ConfigMapLock
19+
from leaderelection import electionconfig
20+
21+
22+
# Authenticate using config file
23+
config.load_kube_config(config_file=r"")
24+
25+
# Parameters required from the user
26+
27+
# A unique identifier for this candidate
28+
candidate_id = uuid.uuid4()
29+
30+
# Name of the lock object to be created
31+
lock_name = "examplepython"
32+
33+
# Kubernetes namespace
34+
lock_namespace = "default"
35+
36+
37+
# The function that a user wants to run once a candidate is elected as a leader
38+
def example_func():
39+
print("I am leader")
40+
41+
42+
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
43+
# In that case, a default callback function will be used
44+
45+
# Create config
46+
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
47+
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
48+
onstopped_leading=None)
49+
50+
# Enter leader election
51+
leaderelection.LeaderElection(config).run()
52+
53+
# User can choose to do another round of election or simply exit
54+
print("Exited leader election")

leaderelection/leaderelection.py

+191
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
import sys
17+
import time
18+
import json
19+
import threading
20+
from .leaderelectionrecord import LeaderElectionRecord
21+
import logging
22+
# if condition to be removed when support for python2 will be removed
23+
if sys.version_info > (3, 0):
24+
from http import HTTPStatus
25+
else:
26+
import httplib
27+
logging.basicConfig(level=logging.INFO)
28+
29+
"""
30+
This package implements leader election using an annotation in a Kubernetes object.
31+
The onstarted_leading function is run in a thread and when it returns, if it does
32+
it might not be safe to run it again in a process.
33+
34+
At first all candidates are considered followers. The one to create a lock or update
35+
an existing lock first becomes the leader and remains so until it keeps renewing its
36+
lease.
37+
"""
38+
39+
40+
class LeaderElection:
41+
def __init__(self, election_config):
42+
if election_config is None:
43+
sys.exit("argument config not passed")
44+
45+
# Latest record observed in the created lock object
46+
self.observed_record = None
47+
48+
# The configuration set for this candidate
49+
self.election_config = election_config
50+
51+
# Latest update time of the lock
52+
self.observed_time_milliseconds = 0
53+
54+
# Point of entry to Leader election
55+
def run(self):
56+
# Try to create/ acquire a lock
57+
if self.acquire():
58+
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))
59+
60+
# Start leading and call OnStartedLeading()
61+
threading.daemon = True
62+
threading.Thread(target=self.election_config.onstarted_leading).start()
63+
64+
self.renew_loop()
65+
66+
# Failed to update lease, run OnStoppedLeading callback
67+
self.election_config.onstopped_leading()
68+
69+
def acquire(self):
70+
# Follower
71+
logging.info("{} is a follower".format(self.election_config.lock.identity))
72+
retry_period = self.election_config.retry_period
73+
74+
while True:
75+
succeeded = self.try_acquire_or_renew()
76+
77+
if succeeded:
78+
return True
79+
80+
time.sleep(retry_period)
81+
82+
def renew_loop(self):
83+
# Leader
84+
logging.info("Leader has entered renew loop and will try to update lease continuously")
85+
86+
retry_period = self.election_config.retry_period
87+
renew_deadline = self.election_config.renew_deadline * 1000
88+
89+
while True:
90+
timeout = int(time.time() * 1000) + renew_deadline
91+
succeeded = False
92+
93+
while int(time.time() * 1000) < timeout:
94+
succeeded = self.try_acquire_or_renew()
95+
96+
if succeeded:
97+
break
98+
time.sleep(retry_period)
99+
100+
if succeeded:
101+
time.sleep(retry_period)
102+
continue
103+
104+
# failed to renew, return
105+
return
106+
107+
def try_acquire_or_renew(self):
108+
now_timestamp = time.time()
109+
now = datetime.datetime.fromtimestamp(now_timestamp)
110+
111+
# Check if lock is created
112+
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
113+
self.election_config.lock.namespace)
114+
115+
# create a default Election record for this candidate
116+
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
117+
str(self.election_config.lease_duration), str(now), str(now))
118+
119+
# A lock is not created with that name, try to create one
120+
if not lock_status:
121+
# To be removed when support for python2 will be removed
122+
if sys.version_info > (3, 0):
123+
if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND:
124+
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
125+
old_election_record.reason))
126+
return False
127+
else:
128+
if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND:
129+
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
130+
old_election_record.reason))
131+
return False
132+
133+
logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
134+
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
135+
namespace=self.election_config.lock.namespace,
136+
election_record=leader_election_record)
137+
138+
if create_status is False:
139+
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
140+
return False
141+
142+
self.observed_record = leader_election_record
143+
self.observed_time_milliseconds = int(time.time() * 1000)
144+
return True
145+
146+
# A lock exists with that name
147+
# Validate old_election_record
148+
if old_election_record is None:
149+
# try to update lock with proper annotation and election record
150+
return self.update_lock(leader_election_record)
151+
152+
if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
153+
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
154+
# try to update lock with proper annotation and election record
155+
return self.update_lock(leader_election_record)
156+
157+
# Report transitions
158+
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
159+
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))
160+
161+
if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
162+
self.observed_record = old_election_record
163+
self.observed_time_milliseconds = int(time.time() * 1000)
164+
165+
# If This candidate is not the leader and lease duration is yet to finish
166+
if (self.election_config.lock.identity != self.observed_record.holder_identity
167+
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)):
168+
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
169+
return False
170+
171+
# If this candidate is the Leader
172+
if self.election_config.lock.identity == self.observed_record.holder_identity:
173+
# Leader updates renewTime, but keeps acquire_time unchanged
174+
leader_election_record.acquire_time = self.observed_record.acquire_time
175+
176+
return self.update_lock(leader_election_record)
177+
178+
def update_lock(self, leader_election_record):
179+
# Update object with latest election record
180+
update_status = self.election_config.lock.update(self.election_config.lock.name,
181+
self.election_config.lock.namespace,
182+
leader_election_record)
183+
184+
if update_status is False:
185+
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
186+
return False
187+
188+
self.observed_record = leader_election_record
189+
self.observed_time_milliseconds = int(time.time() * 1000)
190+
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
191+
return True

0 commit comments

Comments
 (0)