Skip to content

Commit c9c1181

Browse files
authored
Support multiple target actors for one type of dispatch (#104)
- **WIP: Support dipatches refering to different actors** developed together with frequenz-io/frequenz-app-edge#59
2 parents 8f91403 + 6c34157 commit c9c1181

File tree

9 files changed

+264
-67
lines changed

9 files changed

+264
-67
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@ This release introduces a more flexible and powerful mechanism for managing disp
66

77
## Upgrading
88

9+
* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
910
* Two properties have been replaced by methods that require a type as parameter.
1011
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
11-
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
12+
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
1213
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
1314
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
1415
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
15-
* It only supports a single actor at a time now.
16+
* It only starts/stops a single actor at a time now instead of a set of actors.
1617
* Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information.
1718
* `DispatchUpdate` was renamed to `DispatchInfo`.
1819

1920
## New Features
2021

21-
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
22-
22+
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2323
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
24-
24+
* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ dependencies = [
3939
# mkdocs.yml file when changing the version here (look for the config key
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600",
42-
"frequenz-channels >= 1.3.0, < 2.0.0",
42+
"frequenz-channels >= 1.6.1, < 2.0.0",
4343
"frequenz-client-dispatch >= 0.8.4, < 0.9.0",
4444
]
4545
dynamic = ["version"]

src/frequenz/dispatch/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ._dispatch import Dispatch
2121
from ._dispatcher import Dispatcher
2222
from ._event import Created, Deleted, DispatchEvent, Updated
23-
from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget
23+
from ._merge_strategies import MergeByType, MergeByTypeTarget
2424

2525
__all__ = [
2626
"Created",
@@ -32,7 +32,6 @@
3232
"ActorDispatcher",
3333
"DispatchInfo",
3434
"MergeStrategy",
35-
"MergeByIdentity",
3635
"MergeByType",
3736
"MergeByTypeTarget",
3837
]

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService):
4141
import os
4242
import asyncio
4343
from typing import override
44-
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
44+
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
4545
from frequenz.client.dispatch.types import TargetComponents
4646
from frequenz.client.common.microgrid.components import ComponentCategory
4747
from frequenz.channels import Receiver, Broadcast, select, selected_from
@@ -125,7 +125,7 @@ async def main():
125125
126126
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
127127
128-
managing_actor = DispatchManagingActor(
128+
managing_actor = ActorDispatcher(
129129
actor_factory=MyActor.new_with_dispatch,
130130
running_status_receiver=status_receiver,
131131
)
@@ -138,18 +138,25 @@ def __init__(
138138
self,
139139
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
140140
running_status_receiver: Receiver[Dispatch],
141+
dispatch_identity: Callable[[Dispatch], int] | None = None,
141142
) -> None:
142143
"""Initialize the dispatch handler.
143144
144145
Args:
145146
actor_factory: A callable that creates an actor with some initial dispatch
146147
information.
147148
running_status_receiver: The receiver for dispatch running status changes.
149+
dispatch_identity: A function to identify to which actor a dispatch refers.
150+
By default, it uses the dispatch ID.
148151
"""
149152
super().__init__()
153+
self._dispatch_identity: Callable[[Dispatch], int] = (
154+
dispatch_identity if dispatch_identity else lambda d: d.id
155+
)
156+
150157
self._dispatch_rx = running_status_receiver
151158
self._actor_factory = actor_factory
152-
self._actor: Actor | None = None
159+
self._actors: dict[int, Actor] = {}
153160
self._updates_channel = Broadcast[DispatchInfo](
154161
name="dispatch_updates_channel", resend_latest=True
155162
)
@@ -167,22 +174,36 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
167174
options=dispatch.payload,
168175
)
169176

170-
if self._actor:
177+
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
178+
179+
if actor:
171180
sent_str = ""
172181
if self._updates_sender is not None:
173182
sent_str = ", sent a dispatch update instead of creating a new actor"
174183
await self._updates_sender.send(dispatch_update)
175-
_logger.warning(
184+
_logger.info(
176185
"Actor for dispatch type %r is already running%s",
177186
dispatch.type,
178187
sent_str,
179188
)
180189
else:
181-
_logger.info("Starting actor for dispatch type %r", dispatch.type)
182-
self._actor = self._actor_factory(
183-
dispatch_update, self._updates_channel.new_receiver()
184-
)
185-
self._actor.start()
190+
try:
191+
_logger.info("Starting actor for dispatch type %r", dispatch.type)
192+
actor = self._actor_factory(
193+
dispatch_update,
194+
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
195+
)
196+
self._actors[self._dispatch_identity(dispatch)] = actor
197+
198+
actor.start()
199+
200+
except Exception as e: # pylint: disable=broad-except
201+
_logger.error(
202+
"Failed to start actor for dispatch type %r: %s",
203+
dispatch.type,
204+
e,
205+
exc_info=True,
206+
)
186207

187208
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
188209
"""Stop all actors.
@@ -191,13 +212,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
191212
stopping_dispatch: The dispatch that is stopping the actor.
192213
msg: The message to be passed to the actors being stopped.
193214
"""
194-
if self._actor is None:
215+
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
216+
await actor.stop(msg)
217+
else:
195218
_logger.warning(
196219
"Actor for dispatch type %r is not running", stopping_dispatch.type
197220
)
198-
else:
199-
await self._actor.stop(msg)
200-
self._actor = None
201221

202222
async def _run(self) -> None:
203223
"""Wait for dispatches and handle them."""

src/frequenz/dispatch/_bg_service.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
class MergeStrategy(ABC):
3232
"""Base class for strategies to merge running intervals."""
3333

34+
@abstractmethod
35+
def identity(self, dispatch: Dispatch) -> int:
36+
"""Identity function for the merge criteria."""
37+
3438
@abstractmethod
3539
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
3640
"""Filter dispatches based on the strategy.
@@ -154,12 +158,9 @@ async def new_running_state_event_receiver(
154158
dispatches of the same type and target
155159
* `None` — no merging, just send all events
156160
157-
You can make your own strategy by subclassing:
158-
159-
* [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
160-
dispatches based on a user defined identity function
161-
* [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
162-
on a user defined filter function
161+
You can make your own identity-based strategy by subclassing `MergeByType` and overriding
162+
the `identity()` method. If you require a more complex strategy, you can subclass
163+
`MergeStrategy` directly and implement both the `identity()` and `filter()` methods.
163164
164165
Running intervals from multiple dispatches will be merged, according to
165166
the chosen strategy.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,27 @@
33

44
"""A highlevel interface for the dispatch API."""
55

6+
from __future__ import annotations
7+
8+
import asyncio
9+
import logging
10+
from asyncio import Event
11+
from typing import Callable
612

713
from frequenz.channels import Receiver
814
from frequenz.client.dispatch import Client
15+
from frequenz.sdk.actor import Actor, BackgroundService
16+
from typing_extensions import override
917

18+
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
1019
from ._bg_service import DispatchScheduler, MergeStrategy
1120
from ._dispatch import Dispatch
1221
from ._event import DispatchEvent
1322

23+
_logger = logging.getLogger(__name__)
24+
1425

15-
class Dispatcher:
26+
class Dispatcher(BackgroundService):
1627
"""A highlevel interface for the dispatch API.
1728
1829
This class provides a highlevel interface to the dispatch API.
@@ -173,16 +184,103 @@ def __init__(
173184
server_url: The URL of the dispatch service.
174185
key: The key to access the service.
175186
"""
187+
super().__init__()
188+
176189
self._client = Client(server_url=server_url, key=key)
177190
self._bg_service = DispatchScheduler(
178191
microgrid_id,
179192
self._client,
180193
)
194+
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
195+
self._empty_event = Event()
196+
self._empty_event.set()
181197

182-
async def start(self) -> None:
198+
@override
199+
def start(self) -> None:
183200
"""Start the local dispatch service."""
184201
self._bg_service.start()
185202

203+
@property
204+
@override
205+
def is_running(self) -> bool:
206+
"""Whether the local dispatch service is running."""
207+
return self._bg_service.is_running
208+
209+
@override
210+
async def wait(self) -> None:
211+
"""Wait until all actor dispatches are stopped."""
212+
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
213+
214+
self._actor_dispatchers.clear()
215+
216+
@override
217+
def cancel(self, msg: str | None = None) -> None:
218+
"""Stop the local dispatch service."""
219+
self._bg_service.cancel(msg)
220+
221+
for instance in self._actor_dispatchers.values():
222+
instance.cancel()
223+
224+
async def start_dispatching(
225+
self,
226+
dispatch_type: str,
227+
*,
228+
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
229+
merge_strategy: MergeStrategy | None = None,
230+
) -> None:
231+
"""Manage actors for a given dispatch type.
232+
233+
Creates and manages an ActorDispatcher for the given type that will
234+
start, stop and reconfigure actors based on received dispatches.
235+
236+
You can await the `Dispatcher` instance to block until all types
237+
registered with `start_dispatching()` are stopped using
238+
`stop_dispatching()`
239+
240+
Args:
241+
dispatch_type: The type of the dispatch to manage.
242+
actor_factory: The factory to create actors.
243+
merge_strategy: The strategy to merge running intervals.
244+
"""
245+
dispatcher = self._actor_dispatchers.get(dispatch_type)
246+
247+
if dispatcher is not None:
248+
_logger.debug(
249+
"Ignoring duplicate actor dispatcher request for %r", dispatch_type
250+
)
251+
return
252+
253+
self._empty_event.clear()
254+
255+
def id_identity(dispatch: Dispatch) -> int:
256+
return dispatch.id
257+
258+
dispatcher = ActorDispatcher(
259+
actor_factory=actor_factory,
260+
running_status_receiver=await self.new_running_state_event_receiver(
261+
dispatch_type, merge_strategy=merge_strategy
262+
),
263+
dispatch_identity=(
264+
id_identity if merge_strategy is None else merge_strategy.identity
265+
),
266+
)
267+
268+
self._actor_dispatchers[dispatch_type] = dispatcher
269+
dispatcher.start()
270+
271+
async def stop_dispatching(self, dispatch_type: str) -> None:
272+
"""Stop managing actors for a given dispatch type.
273+
274+
Args:
275+
dispatch_type: The type of the dispatch to stop managing.
276+
"""
277+
dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
278+
if dispatcher is not None:
279+
await dispatcher.stop()
280+
281+
if not self._actor_dispatchers:
282+
self._empty_event.set()
283+
186284
@property
187285
def client(self) -> Client:
188286
"""Return the client."""

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""Different merge strategies for dispatch running state events."""
55

66
import logging
7-
from abc import abstractmethod
87
from collections.abc import Mapping
98

109
from typing_extensions import override
@@ -13,12 +12,13 @@
1312
from ._dispatch import Dispatch
1413

1514

16-
class MergeByIdentity(MergeStrategy):
17-
"""Merge running intervals based on a dispatch configuration."""
15+
class MergeByType(MergeStrategy):
16+
"""Merge running intervals based on the dispatch type."""
1817

19-
@abstractmethod
18+
@override
2019
def identity(self, dispatch: Dispatch) -> int:
2120
"""Identity function for the merge criteria."""
21+
return hash(dispatch.type)
2222

2323
@override
2424
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
@@ -49,19 +49,10 @@ def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool
4949
return not other_dispatches_running
5050

5151

52-
class MergeByType(MergeByIdentity):
53-
"""Merge running intervals based on the dispatch type."""
54-
55-
@override
56-
def identity(self, dispatch: Dispatch) -> int:
57-
"""Identity function for the merge criteria."""
58-
return hash(dispatch.type)
59-
60-
6152
class MergeByTypeTarget(MergeByType):
6253
"""Merge running intervals based on the dispatch type and target."""
6354

6455
@override
6556
def identity(self, dispatch: Dispatch) -> int:
6657
"""Identity function for the merge criteria."""
67-
return hash((dispatch.type, dispatch.target))
58+
return hash((dispatch.type, tuple(dispatch.target)))

tests/test_frequenz_dispatch.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,8 @@ async def test_at_least_one_running_filter(
701701
recurrence=RecurrenceRule(),
702702
type="TEST_TYPE",
703703
)
704+
_ = merge_strategy.identity(Dispatch(dispatch))
705+
704706
lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
705707
await client.create(**to_create_params(microgrid_id, dispatch))
706708
await lifecycle.receive()

0 commit comments

Comments
 (0)