Skip to content

Commit 9faa88b

Browse files
committed
Dispatches that failed to start will now be retried after a delay.
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 596ee48 commit 9faa88b

File tree

4 files changed

+142
-12
lines changed

4 files changed

+142
-12
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ Further changes:
6262
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
6363
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1800.
6464
* Actor management with dispatches has been simplified:
65-
* `Dispatcher.start_managing(dispatch_type, actor_factory, merge_strategy)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
65+
* `Dispatcher.start_managing(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
6666
* `Dispatcher.stop_managing(dispatch_type)` to stop dispatching for the given type.
6767
* `Dispatcher.is_managed(dispatch_type)` to check if dispatching is active for the given type.
68+
* Dispatches that failed to start will now be retried after a delay.
6869
* A new method `Dispatcher.wait_for_initialization()` has been added to wait for all actors to be initialized.
6970
* When using `async with Dispatcher(..) as dispatcher`, the dispatcher will first wait for the dispatch service to be initialized before entering the block.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import logging
88
from collections.abc import Callable
99
from dataclasses import dataclass
10+
from datetime import timedelta
1011
from typing import Any, Awaitable
1112

12-
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels import Broadcast, Receiver, select
1314
from frequenz.client.dispatch.types import TargetComponents
1415
from frequenz.sdk.actor import Actor, BackgroundService
1516

@@ -132,13 +133,61 @@ async def main():
132133
```
133134
"""
134135

135-
def __init__(
136+
class RetryFailedDispatches:
137+
"""Manages the retry of failed dispatches."""
138+
139+
def __init__(self, retry_interval: timedelta) -> None:
140+
"""Initialize the retry manager.
141+
142+
Args:
143+
retry_interval: The interval between retries.
144+
"""
145+
self._retry_interval = retry_interval
146+
self._channel = Broadcast[Dispatch](name="retry_channel")
147+
self._sender = self._channel.new_sender()
148+
self._tasks: set[asyncio.Task[None]] = set()
149+
150+
def new_receiver(self) -> Receiver[Dispatch]:
151+
"""Create a new receiver for dispatches to retry.
152+
153+
Returns:
154+
The receiver.
155+
"""
156+
return self._channel.new_receiver()
157+
158+
def retry(self, dispatch: Dispatch) -> None:
159+
"""Retry a dispatch.
160+
161+
Args:
162+
dispatch: The dispatch information to retry.
163+
"""
164+
task = asyncio.create_task(self._retry_after_delay(dispatch))
165+
self._tasks.add(task)
166+
task.add_done_callback(self._tasks.remove)
167+
168+
async def _retry_after_delay(self, dispatch: Dispatch) -> None:
169+
"""Retry a dispatch after a delay.
170+
171+
Args:
172+
dispatch: The dispatch information to retry.
173+
"""
174+
_logger.info(
175+
"Will retry dispatch %s after %s",
176+
dispatch.id,
177+
self._retry_interval,
178+
)
179+
await asyncio.sleep(self._retry_interval.total_seconds())
180+
_logger.info("Retrying dispatch %s now", dispatch.id)
181+
await self._sender.send(dispatch)
182+
183+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
136184
self,
137185
actor_factory: Callable[
138186
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
139187
],
140188
running_status_receiver: Receiver[Dispatch],
141189
dispatch_identity: Callable[[Dispatch], int] | None = None,
190+
retry_interval: timedelta | None = timedelta(seconds=60),
142191
) -> None:
143192
"""Initialize the dispatch handler.
144193
@@ -148,6 +197,7 @@ def __init__(
148197
running_status_receiver: The receiver for dispatch running status changes.
149198
dispatch_identity: A function to identify to which actor a dispatch refers.
150199
By default, it uses the dispatch ID.
200+
retry_interval: The interval between retries. If `None`, retries are disabled.
151201
"""
152202
super().__init__()
153203
self._dispatch_identity: Callable[[Dispatch], int] = (
@@ -161,6 +211,11 @@ def __init__(
161211
name="dispatch_updates_channel", resend_latest=True
162212
)
163213
self._updates_sender = self._updates_channel.new_sender()
214+
self._retrier = (
215+
ActorDispatcher.RetryFailedDispatches(retry_interval)
216+
if retry_interval
217+
else None
218+
)
164219

165220
def start(self) -> None:
166221
"""Start the background service."""
@@ -174,7 +229,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
174229
options=dispatch.payload,
175230
)
176231

177-
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
232+
identity = self._dispatch_identity(dispatch)
233+
actor: Actor | None = self._actors.get(identity)
178234

179235
if actor:
180236
sent_str = ""
@@ -193,17 +249,24 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
193249
dispatch_update,
194250
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
195251
)
196-
self._actors[self._dispatch_identity(dispatch)] = actor
197252

198253
actor.start()
199254

200255
except Exception as e: # pylint: disable=broad-except
201256
_logger.error(
202-
"Failed to start actor for dispatch type %r: %s",
257+
"Failed to start actor for dispatch type %r",
203258
dispatch.type,
204-
e,
205-
exc_info=True,
259+
exc_info=e,
206260
)
261+
if self._retrier:
262+
self._retrier.retry(dispatch)
263+
else:
264+
_logger.error(
265+
"No retry mechanism enabled, dispatch %r failed", dispatch
266+
)
267+
else:
268+
# No exception occurred, so we can add the actor to the list
269+
self._actors[identity] = actor
207270

208271
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
209272
"""Stop all actors.
@@ -212,17 +275,31 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
212275
stopping_dispatch: The dispatch that is stopping the actor.
213276
msg: The message to be passed to the actors being stopped.
214277
"""
215-
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
278+
actor: Actor | None = None
279+
identity = self._dispatch_identity(stopping_dispatch)
280+
281+
actor = self._actors.get(identity)
282+
283+
if actor:
216284
await actor.stop(msg)
217285
else:
218286
_logger.warning(
219287
"Actor for dispatch type %r is not running", stopping_dispatch.type
220288
)
221289

222290
async def _run(self) -> None:
223-
"""Wait for dispatches and handle them."""
224-
async for dispatch in self._dispatch_rx:
225-
await self._handle_dispatch(dispatch=dispatch)
291+
"""Run the background service."""
292+
if not self._retrier:
293+
async for dispatch in self._dispatch_rx:
294+
await self._handle_dispatch(dispatch)
295+
else:
296+
retry_recv = self._retrier.new_receiver()
297+
298+
async for selected in select(retry_recv, self._dispatch_rx):
299+
if retry_recv.triggered(selected):
300+
self._retrier.retry(selected.message)
301+
elif self._dispatch_rx.triggered(selected):
302+
await self._handle_dispatch(selected.message)
226303

227304
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
228305
"""Handle a dispatch.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import logging
1010
from asyncio import Event
11+
from datetime import timedelta
1112
from typing import Awaitable, Callable, Self
1213

1314
from frequenz.channels import Receiver
@@ -270,6 +271,7 @@ async def start_managing(
270271
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
271272
],
272273
merge_strategy: MergeStrategy | None = None,
274+
retry_interval: timedelta = timedelta(seconds=60),
273275
) -> None:
274276
"""Manage actors for a given dispatch type.
275277
@@ -298,6 +300,7 @@ async def start_managing(
298300
dispatch_type: The type of the dispatch to manage.
299301
actor_factory: The factory to create actors.
300302
merge_strategy: The strategy to merge running intervals.
303+
retry_interval: Retry interval for when actor creation fails.
301304
"""
302305
dispatcher = self._actor_dispatchers.get(dispatch_type)
303306

@@ -321,6 +324,7 @@ def id_identity(dispatch: Dispatch) -> int:
321324
dispatch_identity=(
322325
id_identity if merge_strategy is None else merge_strategy.identity
323326
),
327+
retry_interval=retry_interval,
324328
)
325329

326330
self._actor_dispatchers[dispatch_type] = dispatcher

tests/test_mananging_actor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ async def create(
8484
actor = cls(initial_dispatch, receiver)
8585
return actor
8686

87+
@classmethod
88+
async def create_fail(
89+
cls, __: DispatchInfo, _: Receiver[DispatchInfo]
90+
) -> "MockActor":
91+
"""Create a new actor."""
92+
raise ValueError("Failed to create actor")
93+
8794

8895
@dataclass
8996
class TestEnv:
@@ -170,6 +177,47 @@ async def test_simple_start_stop(
170177
# pylint: enable=protected-access
171178

172179

180+
async def test_start_failed(
181+
test_env: TestEnv, fake_time: time_machine.Coordinates
182+
) -> None:
183+
"""Test auto-retry after 60 seconds."""
184+
# pylint: disable=protected-access
185+
test_env.actors_service._actor_factory = MockActor.create_fail
186+
187+
now = _now()
188+
duration = timedelta(minutes=10)
189+
dispatch = test_env.generator.generate_dispatch()
190+
dispatch = replace(
191+
dispatch,
192+
id=1,
193+
active=True,
194+
dry_run=False,
195+
duration=duration,
196+
start_time=now,
197+
payload={"test": True},
198+
type="UNIT_TEST",
199+
recurrence=replace(
200+
dispatch.recurrence,
201+
frequency=Frequency.UNSPECIFIED,
202+
),
203+
)
204+
205+
# Send status update to start actor, expect no DispatchInfo for the start
206+
await test_env.running_status_sender.send(Dispatch(dispatch))
207+
fake_time.shift(timedelta(seconds=1))
208+
209+
# Replace failing mock actor factory with a working one
210+
test_env.actors_service._actor_factory = MockActor.create
211+
212+
# Give retry task time to start
213+
await asyncio.sleep(1)
214+
215+
fake_time.shift(timedelta(seconds=65))
216+
await asyncio.sleep(65)
217+
218+
assert test_env.actor(1).is_running is True
219+
220+
173221
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
174222
"""Test that the heapq compare function works."""
175223
dispatch1 = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)