Skip to content

Commit 96c14dd

Browse files
committed
Dispatch python interface changes
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 020e625 commit 96c14dd

File tree

7 files changed

+421
-127
lines changed

7 files changed

+421
-127
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()`
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch.
1414

1515
## Bug Fixes
1616

src/frequenz/dispatch/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
77
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
88

9+
from ._dispatch import Dispatch
10+
911
__all__ = [
1012
"Created",
1113
"Deleted",
1214
"DispatchEvent",
1315
"Dispatcher",
1416
"ReceiverFetcher",
1517
"Updated",
18+
"Dispatch",
1619
]

src/frequenz/dispatch/_dispatch.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Dispatch type with support for next_run calculation."""
5+
6+
7+
import logging
8+
from dataclasses import dataclass
9+
from datetime import datetime, timezone
10+
from typing import Iterator, cast
11+
12+
from dateutil import rrule
13+
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
14+
from frequenz.client.dispatch.types import Frequency, Weekday
15+
16+
_logger = logging.getLogger(__name__)
17+
"""The logger for this module."""
18+
19+
_RRULE_FREQ_MAP = {
20+
Frequency.MINUTELY: rrule.MINUTELY,
21+
Frequency.HOURLY: rrule.HOURLY,
22+
Frequency.DAILY: rrule.DAILY,
23+
Frequency.WEEKLY: rrule.WEEKLY,
24+
Frequency.MONTHLY: rrule.MONTHLY,
25+
}
26+
"""To map from our Frequency enum to the dateutil library enum."""
27+
28+
_RRULE_WEEKDAY_MAP = {
29+
Weekday.MONDAY: rrule.MO,
30+
Weekday.TUESDAY: rrule.TU,
31+
Weekday.WEDNESDAY: rrule.WE,
32+
Weekday.THURSDAY: rrule.TH,
33+
Weekday.FRIDAY: rrule.FR,
34+
Weekday.SATURDAY: rrule.SA,
35+
Weekday.SUNDAY: rrule.SU,
36+
}
37+
"""To map from our Weekday enum to the dateutil library enum."""
38+
39+
40+
@dataclass(frozen=True)
41+
class Dispatch(BaseDispatch):
42+
"""Dispatch type with extra functionality."""
43+
44+
deleted: bool = False
45+
"""Whether the dispatch is deleted."""
46+
47+
running_state_change_synced: datetime | None = None
48+
"""The last time a message was sent about the running state change."""
49+
50+
def __init__(
51+
self,
52+
client_dispatch: BaseDispatch,
53+
deleted: bool = False,
54+
running_state_change_synced: datetime | None = None,
55+
):
56+
"""Initialize the dispatch.
57+
58+
Args:
59+
client_dispatch: The client dispatch.
60+
deleted: Whether the dispatch is deleted.
61+
running_state_change_synced: Timestamp of the last running state change message.
62+
"""
63+
super().__init__(**client_dispatch.__dict__)
64+
# Work around frozen to set deleted
65+
object.__setattr__(self, "deleted", deleted)
66+
object.__setattr__(
67+
self,
68+
"running_state_change_synced",
69+
running_state_change_synced,
70+
)
71+
72+
def set_deleted(self) -> None:
73+
"""Mark the dispatch as deleted."""
74+
object.__setattr__(self, "deleted", True)
75+
76+
def running_status_notified(self) -> bool:
77+
"""Check that the latest running state change notification was sent.
78+
79+
Returns:
80+
True if the latest running state change notification was sent, False otherwise.
81+
"""
82+
return self.running_state_change_synced == self.update_time
83+
84+
def set_running_status_notified(self) -> None:
85+
"""Mark the latest running state change notification as sent."""
86+
object.__setattr__(self, "running_state_change_synced", self.update_time)
87+
88+
def running(self, type_: str) -> bool:
89+
"""Check if the dispatch is currently supposed to be running.
90+
91+
Args:
92+
type_: The type of the dispatch that should be running.
93+
94+
Returns:
95+
True if the dispatch is currently meant to be running, False otherwise.
96+
"""
97+
if self.type != type_:
98+
return False
99+
100+
if not self.active or self.deleted:
101+
return False
102+
103+
now = datetime.now(tz=timezone.utc)
104+
if until := self._until(now):
105+
return now < until
106+
107+
return False
108+
109+
@property
110+
def until(self) -> datetime | None:
111+
"""Time when the dispatch should end.
112+
113+
Returns the time that a running dispatch should end.
114+
If the dispatch is not running, None is returned.
115+
116+
Returns:
117+
The time when the dispatch should end or None if the dispatch is not running.
118+
"""
119+
if not self.active or self.deleted:
120+
return None
121+
122+
now = datetime.now(tz=timezone.utc)
123+
return self._until(now)
124+
125+
@property
126+
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405
127+
"""Yield all missed runs of a dispatch.
128+
129+
Yields all missed runs of a dispatch.
130+
131+
If a running state change notification was not sent in time
132+
due to connection issues, this method will yield all missed runs
133+
since the last sent notification.
134+
135+
Returns:
136+
A generator that yields all missed runs of a dispatch.
137+
"""
138+
if self.update_time == self.running_state_change_synced:
139+
return
140+
141+
from_time = self.update_time
142+
now = datetime.now(tz=timezone.utc)
143+
144+
while (next_run := self.next_run_after(from_time)) and next_run < now:
145+
yield next_run
146+
from_time = next_run
147+
148+
@property
149+
def next_run(self) -> datetime | None:
150+
"""Calculate the next run of a dispatch.
151+
152+
Returns:
153+
The next run of the dispatch or None if the dispatch is finished.
154+
"""
155+
return self.next_run_after(datetime.now(tz=timezone.utc))
156+
157+
def next_run_after(self, after: datetime) -> datetime | None:
158+
"""Calculate the next run of a dispatch.
159+
160+
Args:
161+
after: The time to calculate the next run from.
162+
163+
Returns:
164+
The next run of the dispatch or None if the dispatch is finished.
165+
"""
166+
if (
167+
not self.recurrence.frequency
168+
or self.recurrence.frequency == Frequency.UNSPECIFIED
169+
):
170+
if after > self.start_time:
171+
return None
172+
return self.start_time
173+
174+
# Make sure no weekday is UNSPECIFIED
175+
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
176+
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
177+
return None
178+
179+
# No type information for rrule, so we need to cast
180+
return cast(datetime | None, self._prepare_rrule().after(after, inc=True))
181+
182+
def _prepare_rrule(self) -> rrule.rrule:
183+
"""Prepare the rrule object.
184+
185+
Returns:
186+
The rrule object.
187+
"""
188+
count, until = (None, None)
189+
if end := self.recurrence.end_criteria:
190+
count = end.count
191+
until = end.until
192+
193+
rrule_obj = rrule.rrule(
194+
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
195+
dtstart=self.start_time,
196+
count=count,
197+
until=until,
198+
byminute=self.recurrence.byminutes,
199+
byhour=self.recurrence.byhours,
200+
byweekday=[
201+
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
202+
],
203+
bymonthday=self.recurrence.bymonthdays,
204+
bymonth=self.recurrence.bymonths,
205+
interval=self.recurrence.interval,
206+
)
207+
208+
return rrule_obj
209+
210+
def _until(self, now: datetime) -> datetime | None:
211+
"""Calculate the time when the dispatch should end.
212+
213+
If no previous run is found, None is returned.
214+
215+
Args:
216+
now: The current time.
217+
218+
Returns:
219+
The time when the dispatch should end or None if the dispatch is not running.
220+
"""
221+
latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)
222+
223+
if not latest_past_start:
224+
return None
225+
226+
return latest_past_start + self.duration

src/frequenz/dispatch/_dispatcher.py

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99
import grpc.aio
1010
from frequenz.channels import Broadcast, Receiver
11-
from frequenz.client.dispatch.types import Dispatch
1211

13-
from frequenz.dispatch._event import DispatchEvent
14-
from frequenz.dispatch.actor import DispatchingActor
12+
from ._dispatch import Dispatch
13+
from ._event import DispatchEvent
14+
from .actor import DispatchingActor
1515

1616
ReceivedT_co = TypeVar("ReceivedT_co", covariant=True)
1717
"""The type being received."""
@@ -44,27 +44,39 @@ class Dispatcher:
4444
One that sends a dispatch event message whenever a dispatch is created, updated or deleted.
4545
4646
The other sends a dispatch message whenever a dispatch is ready to be
47-
executed according to the schedule.
47+
executed according to the schedule or the running status of the dispatch
48+
changed in a way that could potentially require the actor to start, stop or
49+
reconfigure itself.
4850
49-
allows to receive new dispatches and ready dispatches.
50-
51-
Example: Processing ready-to-execute dispatches
51+
Example: Processing running state change dispatches
5252
```python
5353
import grpc.aio
54+
from unittest.mock import MagicMock
5455
5556
async def run():
5657
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
5758
microgrid_id = 1
5859
service_address = "localhost:50051"
5960
6061
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
61-
dispatcher.start() # this will start the actor
62+
actor = MagicMock() # replace with your actor
6263
63-
ready_receiver = dispatcher.ready_to_execute.new_receiver()
64+
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
6465
65-
async for dispatch in ready_receiver:
66+
async for dispatch in changed_running_status_rx:
6667
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
67-
# execute the dispatch
68+
if dispatch.running:
69+
if actor.is_running:
70+
actor.reconfigure(
71+
components=dispatch.selector,
72+
run_parameters=dispatch.payload
73+
) # this will reconfigure the actor
74+
else:
75+
# this will start the actor
76+
# and run it for the duration of the dispatch
77+
actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run)
78+
else:
79+
actor.stop() # this will stop the actor
6880
```
6981
7082
Example: Getting notification about dispatch lifecycle events
@@ -107,14 +119,16 @@ def __init__(
107119
grpc_channel: The gRPC channel.
108120
svc_addr: The service address.
109121
"""
110-
self._ready_channel = Broadcast[Dispatch](name="ready_dispatches")
111-
self._updated_channel = Broadcast[DispatchEvent](name="new_dispatches")
122+
self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
123+
self._lifecycle_events_channel = Broadcast[DispatchEvent](
124+
name="lifecycle_events"
125+
)
112126
self._actor = DispatchingActor(
113127
microgrid_id,
114128
grpc_channel,
115129
svc_addr,
116-
self._updated_channel.new_sender(),
117-
self._ready_channel.new_sender(),
130+
self._lifecycle_events_channel.new_sender(),
131+
self._running_state_channel.new_sender(),
118132
)
119133

120134
async def start(self) -> None:
@@ -123,18 +137,46 @@ async def start(self) -> None:
123137

124138
@property
125139
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
126-
"""Return new, updated or deleted dispatches receiver.
140+
"""Return new, updated or deleted dispatches receiver fetcher.
127141
128142
Returns:
129143
A new receiver for new dispatches.
130144
"""
131-
return self._updated_channel
145+
return self._lifecycle_events_channel
132146

133147
@property
134-
def ready_to_execute(self) -> ReceiverFetcher[Dispatch]:
135-
"""Return ready dispatches receiver.
148+
def running_status_change(self) -> ReceiverFetcher[Dispatch]:
149+
"""Return running status change receiver fetcher.
150+
151+
This receiver will receive a message whenever the current running
152+
status of a dispatch changes.
153+
154+
Usually, one message per scheduled run is to be expected.
155+
However, things get complicated when a dispatch was modified:
156+
157+
If it was currently running and the modification now says
158+
it should not be running or running with different parameters,
159+
then a message will be sent.
160+
161+
In other words: Any change that is expected to make an actor start, stop
162+
or reconfigure itself with new parameters causes a message to be
163+
sent.
164+
165+
A non-exhaustive list of possible changes that will cause a message to be sent:
166+
- The normal scheduled start_time has been reached
167+
- The duration of the dispatch has been modified
168+
- The start_time has been modified to be in the future
169+
- The component selection changed
170+
- The active status changed
171+
- The dry_run status changed
172+
- The payload changed
173+
- The dispatch was deleted
174+
175+
Note: Reaching the end time (start_time + duration) will not
176+
send a message, except when it was reached by modifying the duration.
177+
136178
137179
Returns:
138-
A new receiver for ready dispatches.
180+
A new receiver for dispatches whose running status changed.
139181
"""
140-
return self._ready_channel
182+
return self._running_state_channel

0 commit comments

Comments
 (0)