Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 4bf72d7

Browse files
authored
Merge pull request #206 from Invictus17/master
Leader Election issue #434
2 parents b002110 + 4d29af1 commit 4bf72d7

9 files changed

+769
-0
lines changed

leaderelection/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
## Leader Election Example
2+
This example demonstrates how to use the leader election library.
3+
4+
## Running
5+
Run the following command in multiple separate terminals preferably an odd number.
6+
Each running process uses a unique identifier displayed when it starts to run.
7+
8+
- When a program runs, if a lock object already exists with the specified name,
9+
all candidates will start as followers.
10+
- If a lock object does not exist with the specified name then whichever candidate
11+
creates a lock object first will become the leader and the rest will be followers.
12+
- The user will be prompted about the status of the candidates and transitions.
13+
14+
### Command to run
15+
```python example.py```
16+
17+
Now kill the existing leader. You will see from the terminal outputs that one of the
18+
remaining running processes will be elected as the new leader.

leaderelection/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

leaderelection/electionconfig.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
import logging
17+
logging.basicConfig(level=logging.INFO)
18+
19+
20+
class Config:
21+
# Validate config, exit if an error is detected
22+
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
23+
self.jitter_factor = 1.2
24+
25+
if lock is None:
26+
sys.exit("lock cannot be None")
27+
self.lock = lock
28+
29+
if lease_duration <= renew_deadline:
30+
sys.exit("lease_duration must be greater than renew_deadline")
31+
32+
if renew_deadline <= self.jitter_factor * retry_period:
33+
sys.exit("renewDeadline must be greater than retry_period*jitter_factor")
34+
35+
if lease_duration < 1:
36+
sys.exit("lease_duration must be greater than one")
37+
38+
if renew_deadline < 1:
39+
sys.exit("renew_deadline must be greater than one")
40+
41+
if retry_period < 1:
42+
sys.exit("retry_period must be greater than one")
43+
44+
self.lease_duration = lease_duration
45+
self.renew_deadline = renew_deadline
46+
self.retry_period = retry_period
47+
48+
if onstarted_leading is None:
49+
sys.exit("callback onstarted_leading cannot be None")
50+
self.onstarted_leading = onstarted_leading
51+
52+
if onstopped_leading is None:
53+
self.onstopped_leading = self.on_stoppedleading_callback
54+
else:
55+
self.onstopped_leading = onstopped_leading
56+
57+
# Default callback for when the current candidate if a leader, stops leading
58+
def on_stoppedleading_callback(self):
59+
logging.info("stopped leading".format(self.lock.identity))

leaderelection/example.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import uuid
16+
from kubernetes import client, config
17+
from leaderelection import leaderelection
18+
from leaderelection.resourcelock.configmaplock import ConfigMapLock
19+
from leaderelection import electionconfig
20+
21+
22+
# Authenticate using config file
23+
config.load_kube_config(config_file=r"")
24+
25+
# Parameters required from the user
26+
27+
# A unique identifier for this candidate
28+
candidate_id = uuid.uuid4()
29+
30+
# Name of the lock object to be created
31+
lock_name = "examplepython"
32+
33+
# Kubernetes namespace
34+
lock_namespace = "default"
35+
36+
37+
# The function that a user wants to run once a candidate is elected as a leader
38+
def example_func():
39+
print("I am leader")
40+
41+
42+
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
43+
# In that case, a default callback function will be used
44+
45+
# Create config
46+
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
47+
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
48+
onstopped_leading=None)
49+
50+
# Enter leader election
51+
leaderelection.LeaderElection(config).run()
52+
53+
# User can choose to do another round of election or simply exit
54+
print("Exited leader election")

leaderelection/leaderelection.py

+191
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
import sys
17+
import time
18+
import json
19+
import threading
20+
from .leaderelectionrecord import LeaderElectionRecord
21+
import logging
22+
# if condition to be removed when support for python2 will be removed
23+
if sys.version_info > (3, 0):
24+
from http import HTTPStatus
25+
else:
26+
import httplib
27+
logging.basicConfig(level=logging.INFO)
28+
29+
"""
30+
This package implements leader election using an annotation in a Kubernetes object.
31+
The onstarted_leading function is run in a thread and when it returns, if it does
32+
it might not be safe to run it again in a process.
33+
34+
At first all candidates are considered followers. The one to create a lock or update
35+
an existing lock first becomes the leader and remains so until it keeps renewing its
36+
lease.
37+
"""
38+
39+
40+
class LeaderElection:
41+
def __init__(self, election_config):
42+
if election_config is None:
43+
sys.exit("argument config not passed")
44+
45+
# Latest record observed in the created lock object
46+
self.observed_record = None
47+
48+
# The configuration set for this candidate
49+
self.election_config = election_config
50+
51+
# Latest update time of the lock
52+
self.observed_time_milliseconds = 0
53+
54+
# Point of entry to Leader election
55+
def run(self):
56+
# Try to create/ acquire a lock
57+
if self.acquire():
58+
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))
59+
60+
# Start leading and call OnStartedLeading()
61+
threading.daemon = True
62+
threading.Thread(target=self.election_config.onstarted_leading).start()
63+
64+
self.renew_loop()
65+
66+
# Failed to update lease, run OnStoppedLeading callback
67+
self.election_config.onstopped_leading()
68+
69+
def acquire(self):
70+
# Follower
71+
logging.info("{} is a follower".format(self.election_config.lock.identity))
72+
retry_period = self.election_config.retry_period
73+
74+
while True:
75+
succeeded = self.try_acquire_or_renew()
76+
77+
if succeeded:
78+
return True
79+
80+
time.sleep(retry_period)
81+
82+
def renew_loop(self):
83+
# Leader
84+
logging.info("Leader has entered renew loop and will try to update lease continuously")
85+
86+
retry_period = self.election_config.retry_period
87+
renew_deadline = self.election_config.renew_deadline * 1000
88+
89+
while True:
90+
timeout = int(time.time() * 1000) + renew_deadline
91+
succeeded = False
92+
93+
while int(time.time() * 1000) < timeout:
94+
succeeded = self.try_acquire_or_renew()
95+
96+
if succeeded:
97+
break
98+
time.sleep(retry_period)
99+
100+
if succeeded:
101+
time.sleep(retry_period)
102+
continue
103+
104+
# failed to renew, return
105+
return
106+
107+
def try_acquire_or_renew(self):
108+
now_timestamp = time.time()
109+
now = datetime.datetime.fromtimestamp(now_timestamp)
110+
111+
# Check if lock is created
112+
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
113+
self.election_config.lock.namespace)
114+
115+
# create a default Election record for this candidate
116+
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
117+
str(self.election_config.lease_duration), str(now), str(now))
118+
119+
# A lock is not created with that name, try to create one
120+
if not lock_status:
121+
# To be removed when support for python2 will be removed
122+
if sys.version_info > (3, 0):
123+
if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND:
124+
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
125+
old_election_record.reason))
126+
return False
127+
else:
128+
if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND:
129+
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
130+
old_election_record.reason))
131+
return False
132+
133+
logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
134+
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
135+
namespace=self.election_config.lock.namespace,
136+
election_record=leader_election_record)
137+
138+
if create_status is False:
139+
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
140+
return False
141+
142+
self.observed_record = leader_election_record
143+
self.observed_time_milliseconds = int(time.time() * 1000)
144+
return True
145+
146+
# A lock exists with that name
147+
# Validate old_election_record
148+
if old_election_record is None:
149+
# try to update lock with proper annotation and election record
150+
return self.update_lock(leader_election_record)
151+
152+
if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
153+
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
154+
# try to update lock with proper annotation and election record
155+
return self.update_lock(leader_election_record)
156+
157+
# Report transitions
158+
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
159+
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))
160+
161+
if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
162+
self.observed_record = old_election_record
163+
self.observed_time_milliseconds = int(time.time() * 1000)
164+
165+
# If This candidate is not the leader and lease duration is yet to finish
166+
if (self.election_config.lock.identity != self.observed_record.holder_identity
167+
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)):
168+
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
169+
return False
170+
171+
# If this candidate is the Leader
172+
if self.election_config.lock.identity == self.observed_record.holder_identity:
173+
# Leader updates renewTime, but keeps acquire_time unchanged
174+
leader_election_record.acquire_time = self.observed_record.acquire_time
175+
176+
return self.update_lock(leader_election_record)
177+
178+
def update_lock(self, leader_election_record):
179+
# Update object with latest election record
180+
update_status = self.election_config.lock.update(self.election_config.lock.name,
181+
self.election_config.lock.namespace,
182+
leader_election_record)
183+
184+
if update_status is False:
185+
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
186+
return False
187+
188+
self.observed_record = leader_election_record
189+
self.observed_time_milliseconds = int(time.time() * 1000)
190+
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
191+
return True

0 commit comments

Comments
 (0)