Skip to content

Commit ef01ee6

Browse files
committed
Switch callbacks to coroutines
Instead of passing Callable[...,Coroutine] functions as callbacks, pass Coroutines directly. This allows the caller to supply arbitrary context associated with the Coroutine, such as the ApiClient used to establish the leader election.
1 parent 214d8e3 commit ef01ee6

File tree

4 files changed

+21
-22
lines changed

4 files changed

+21
-22
lines changed

examples/leaderelection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ async def example_end_func():
6262
lease_duration=17,
6363
renew_deadline=15,
6464
retry_period=5,
65-
onstarted_leading=example_start_func,
66-
onstopped_leading=example_end_func,
65+
onstarted_leading=example_start_func(),
66+
onstopped_leading=example_end_func(),
6767
)
6868

6969
# Enter leader election

kubernetes_asyncio/leaderelection/electionconfig.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,25 @@
1313
# limitations under the License.
1414

1515
import logging
16+
from collections.abc import Coroutine # noqa:F401
1617

1718
logging.basicConfig(level=logging.INFO)
1819

1920

2021
class Config:
2122
# Validate config, exit if an error is detected
23+
24+
# onstarted_leading and onstopped_leading are defined as coroutines rather
25+
# than callables in order to faciliate passing context. For example, this
26+
# allows the ApiClient used by the leader election to be shared and reused.
2227
def __init__(
2328
self,
2429
lock,
2530
lease_duration,
2631
renew_deadline,
2732
retry_period,
28-
onstarted_leading,
29-
onstopped_leading,
33+
onstarted_leading, # type: Coroutine
34+
onstopped_leading=None, # type: Coroutine | None
3035
):
3136
self.jitter_factor = 1.2
3237

@@ -59,11 +64,4 @@ def __init__(
5964
raise ValueError("callback onstarted_leading cannot be None")
6065
self.onstarted_leading = onstarted_leading
6166

62-
if onstopped_leading is None:
63-
self.onstopped_leading = self.on_stoppedleading_callback
64-
else:
65-
self.onstopped_leading = onstopped_leading
66-
67-
# Default callback for when the current candidate if a leader, stops leading
68-
def on_stoppedleading_callback(self):
69-
logging.info("{} stopped leading".format(self.lock.identity))
67+
self.onstopped_leading = onstopped_leading

kubernetes_asyncio/leaderelection/leaderelection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async def run(self):
5959
)
6060
)
6161

62-
task = asyncio.create_task(self.election_config.onstarted_leading())
62+
task = asyncio.create_task(self.election_config.onstarted_leading)
6363

6464
await self.renew_loop()
6565

@@ -71,7 +71,8 @@ async def run(self):
7171
# Failed to update lease, run onstopped_leading callback. This is
7272
# preserved in order to continue to provide an interface similar to
7373
# the one provided by `kubernetes-client/python`.
74-
await self.election_config.onstopped_leading()
74+
if self.election_config.onstopped_leading is not None:
75+
await self.election_config.onstopped_leading
7576

7677
async def acquire(self):
7778
# Follower

kubernetes_asyncio/leaderelection/leaderelection_test.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ async def on_stopped_leading():
6262
lease_duration=2.5,
6363
renew_deadline=2,
6464
retry_period=1.5,
65-
onstarted_leading=on_started_leading,
66-
onstopped_leading=on_stopped_leading,
65+
onstarted_leading=on_started_leading(),
66+
onstopped_leading=on_stopped_leading(),
6767
)
6868

6969
# Enter leader election
@@ -116,8 +116,8 @@ async def on_stopped_leading_A():
116116
lease_duration=2.5,
117117
renew_deadline=2,
118118
retry_period=1.5,
119-
onstarted_leading=on_started_leading_A,
120-
onstopped_leading=on_stopped_leading_A,
119+
onstarted_leading=on_started_leading_A(),
120+
onstopped_leading=on_stopped_leading_A(),
121121
)
122122

123123
def on_create_B():
@@ -153,8 +153,8 @@ async def on_stopped_leading_B():
153153
lease_duration=2.5,
154154
renew_deadline=2,
155155
retry_period=1.5,
156-
onstarted_leading=on_started_leading_B,
157-
onstopped_leading=on_stopped_leading_B,
156+
onstarted_leading=on_started_leading_B(),
157+
onstopped_leading=on_stopped_leading_B(),
158158
)
159159

160160
mock_lock_B.leader_record = mock_lock_A.leader_record
@@ -246,8 +246,8 @@ async def on_stopped_leading():
246246
lease_duration=2.5,
247247
renew_deadline=2,
248248
retry_period=1.5,
249-
onstarted_leading=on_started_leading,
250-
onstopped_leading=on_stopped_leading,
249+
onstarted_leading=on_started_leading(),
250+
onstopped_leading=on_stopped_leading(),
251251
)
252252

253253
# Enter leader election

0 commit comments

Comments
 (0)