Skip to content

Commit 8f91403

Browse files
authored
Implement merge strategy based on TYPE+TARGET (#98)
2 parents d990e0a + 93683e2 commit 8f91403

File tree

6 files changed

+206
-73
lines changed

6 files changed

+206
-73
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
1818

1919
## New Features
2020

21+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
22+
2123
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
2224

src/frequenz/dispatch/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
"""
1717

1818
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
19+
from ._bg_service import MergeStrategy
1920
from ._dispatch import Dispatch
2021
from ._dispatcher import Dispatcher
2122
from ._event import Created, Deleted, DispatchEvent, Updated
23+
from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget
2224

2325
__all__ = [
2426
"Created",
@@ -29,4 +31,8 @@
2931
"Dispatch",
3032
"ActorDispatcher",
3133
"DispatchInfo",
34+
"MergeStrategy",
35+
"MergeByIdentity",
36+
"MergeByType",
37+
"MergeByTypeTarget",
3238
]

src/frequenz/dispatch/_bg_service.py

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
9+
import functools
710
import logging
11+
from abc import ABC, abstractmethod
12+
from collections.abc import Mapping
813
from dataclasses import dataclass, field
914
from datetime import datetime, timedelta, timezone
1015
from heapq import heappop, heappush
@@ -23,6 +28,22 @@
2328
"""The logger for this module."""
2429

2530

31+
class MergeStrategy(ABC):
32+
"""Base class for strategies to merge running intervals."""
33+
34+
@abstractmethod
35+
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
36+
"""Filter dispatches based on the strategy.
37+
38+
Args:
39+
dispatches: All dispatches, available as context.
40+
dispatch: The dispatch to filter.
41+
42+
Returns:
43+
True if the dispatch should be included, False otherwise.
44+
"""
45+
46+
2647
# pylint: disable=too-many-instance-attributes
2748
class DispatchScheduler(BackgroundService):
2849
"""Dispatch background service.
@@ -119,19 +140,36 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119140
)
120141

121142
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
143+
self, type: str, *, merge_strategy: MergeStrategy | None = None
123144
) -> Receiver[Dispatch]:
124145
"""Create a new receiver for running state events of the specified type.
125146
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
147+
`merge_strategy` is an instance of a class derived from
148+
[`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies
149+
are:
150+
151+
* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
152+
of the same type
153+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
154+
dispatches of the same type and target
155+
* `None` — no merging, just send all events
156+
157+
You can make your own strategy by subclassing:
158+
159+
* [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
160+
dispatches based on a user defined identity function
161+
* [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
162+
on a user defined filter function
163+
164+
Running intervals from multiple dispatches will be merged, according to
165+
the chosen strategy.
166+
167+
While merging, stop events are ignored as long as at least one
168+
merge-criteria-matching dispatch remains active.
130169
131170
Args:
132171
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
172+
merge_strategy: The merge strategy to use.
135173
Returns:
136174
A new receiver for running state status.
137175
"""
@@ -140,33 +178,21 @@ async def new_running_state_event_receiver(
140178
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141179
]
142180

143-
# Create receiver with enough capacity to hold all matching dispatches
181+
# Create a new receiver with at least 30 slots, but more if there are
182+
# more dispatches.
183+
# That way we can send all dispatches initially and don't have to worry
184+
# about the receiver being full.
185+
# If there are no initial dispatches, we still want to have some slots
186+
# available for future dispatches, so we set the limit to 30.
144187
receiver = self._running_state_status_channel.new_receiver(
145-
limit=max(1, len(dispatches))
188+
limit=max(30, len(dispatches))
146189
).filter(lambda dispatch: dispatch.type == type)
147190

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
191+
if merge_strategy:
192+
receiver = receiver.filter(
193+
functools.partial(merge_strategy.filter, self._dispatches)
194+
)
168195

169-
# Send all matching dispatches to the receiver
170196
for dispatch in dispatches:
171197
await self._send_running_state_change(dispatch)
172198

@@ -195,9 +221,6 @@ async def _run(self) -> None:
195221
if selected_from(selected, self._next_event_timer):
196222
if not self._scheduled_events:
197223
continue
198-
_logger.debug(
199-
"Executing scheduled event: %s", self._scheduled_events[0].dispatch
200-
)
201224
await self._execute_scheduled_event(
202225
heappop(self._scheduled_events).dispatch
203226
)
@@ -227,6 +250,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
227250
Args:
228251
dispatch: The dispatch to execute.
229252
"""
253+
_logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started)
230254
await self._send_running_state_change(dispatch)
231255

232256
# The timer is always a tiny bit delayed, so we need to check if the
@@ -256,7 +280,7 @@ async def _fetch(self) -> None:
256280
for client_dispatch in page:
257281
dispatch = Dispatch(client_dispatch)
258282

259-
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
283+
self._dispatches[dispatch.id] = dispatch
260284
old_dispatch = old_dispatches.pop(dispatch.id, None)
261285
if not old_dispatch:
262286
_logger.debug("New dispatch: %s", dispatch)
@@ -310,7 +334,7 @@ async def _update_dispatch_schedule_and_notify(
310334
self._remove_scheduled(old_dispatch)
311335

312336
was_running = old_dispatch.started
313-
old_dispatch._set_deleted() # pylint: disable=protected-access)
337+
old_dispatch._set_deleted() # pylint: disable=protected-access
314338

315339
# If the dispatch was running, we need to notify
316340
if was_running:

src/frequenz/dispatch/_dispatcher.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -16,17 +16,19 @@ class Dispatcher:
1616
"""A highlevel interface for the dispatch API.
1717
1818
This class provides a highlevel interface to the dispatch API.
19-
It provides two channels:
19+
It provides two receiver functions:
2020
21-
Lifecycle events:
22-
A channel that sends a dispatch event message whenever a dispatch
23-
is created, updated or deleted.
21+
* [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
22+
Receives an event whenever a dispatch is created, updated or deleted.
23+
* [Running status change
24+
receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]:
25+
Receives an event whenever the running status of a dispatch changes.
26+
The running status of a dispatch can change due to a variety of reasons,
27+
such as but not limited to the dispatch being started, stopped, modified
28+
or deleted or reaching its scheduled start or end time.
2429
25-
Running status change:
26-
Sends a dispatch message whenever a dispatch is ready
27-
to be executed according to the schedule or the running status of the
28-
dispatch changed in a way that could potentially require the consumer to start,
29-
stop or reconfigure itself.
30+
Any change that could potentially require the consumer to start, stop or
31+
reconfigure itself will cause a message to be sent.
3032
3133
Example: Processing running state change dispatches
3234
```python
@@ -200,7 +202,10 @@ def new_lifecycle_events_receiver(
200202
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201203

202204
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
205+
self,
206+
dispatch_type: str,
207+
*,
208+
merge_strategy: MergeStrategy | None = None,
204209
) -> Receiver[Dispatch]:
205210
"""Return running state event receiver.
206211
@@ -228,18 +233,29 @@ async def new_running_state_event_receiver(
228233
- The payload changed
229234
- The dispatch was deleted
230235
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
236+
`merge_strategy` is an instance of a class derived from
237+
[`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
238+
are:
239+
240+
* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
241+
of the same type
242+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
243+
dispatches of the same type and target
244+
* `None` — no merging, just send all events (default)
245+
246+
Running intervals from multiple dispatches will be merged, according to
247+
the chosen strategy.
248+
249+
While merging, stop events are ignored as long as at least one
250+
merge-criteria-matching dispatch remains active.
235251
236252
Args:
237253
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
254+
merge_strategy: The type of the strategy to merge running intervals.
239255
240256
Returns:
241257
A new receiver for dispatches whose running status changed.
242258
"""
243259
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
260+
dispatch_type, merge_strategy=merge_strategy
245261
)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Different merge strategies for dispatch running state events."""
5+
6+
import logging
7+
from abc import abstractmethod
8+
from collections.abc import Mapping
9+
10+
from typing_extensions import override
11+
12+
from ._bg_service import MergeStrategy
13+
from ._dispatch import Dispatch
14+
15+
16+
class MergeByIdentity(MergeStrategy):
17+
"""Merge running intervals based on a dispatch configuration."""
18+
19+
@abstractmethod
20+
def identity(self, dispatch: Dispatch) -> int:
21+
"""Identity function for the merge criteria."""
22+
23+
@override
24+
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
25+
"""Filter dispatches based on the merge strategy.
26+
27+
Keeps start events.
28+
Keeps stop events only if no other dispatches matching the
29+
strategy's criteria are running.
30+
"""
31+
if dispatch.started:
32+
logging.debug("Keeping start event %s", dispatch.id)
33+
return True
34+
35+
other_dispatches_running = any(
36+
existing_dispatch.started
37+
for existing_dispatch in dispatches.values()
38+
if (
39+
self.identity(existing_dispatch) == self.identity(dispatch)
40+
and existing_dispatch.id != dispatch.id
41+
)
42+
)
43+
44+
logging.debug(
45+
"stop event %s because other_dispatches_running=%s",
46+
dispatch.id,
47+
other_dispatches_running,
48+
)
49+
return not other_dispatches_running
50+
51+
52+
class MergeByType(MergeByIdentity):
53+
"""Merge running intervals based on the dispatch type."""
54+
55+
@override
56+
def identity(self, dispatch: Dispatch) -> int:
57+
"""Identity function for the merge criteria."""
58+
return hash(dispatch.type)
59+
60+
61+
class MergeByTypeTarget(MergeByType):
62+
"""Merge running intervals based on the dispatch type and target."""
63+
64+
@override
65+
def identity(self, dispatch: Dispatch) -> int:
66+
"""Identity function for the merge criteria."""
67+
return hash((dispatch.type, dispatch.target))

0 commit comments

Comments
 (0)