Skip to content

Commit 3dc5439

Browse files
committed
Dispatch python interface change suggestion
Still a WIP, but already provided here for early feedback. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e6bb87f commit 3dc5439

File tree

4 files changed

+322
-98
lines changed

4 files changed

+322
-98
lines changed

src/frequenz/dispatch/_dispatch.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Dispatch type with support for next_run calculation."""
5+
6+
7+
import logging
8+
from dataclasses import dataclass
9+
from datetime import datetime, timezone
10+
from typing import Generator, cast
11+
12+
from dateutil import rrule
13+
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
14+
from frequenz.client.dispatch.types import Frequency, Weekday
15+
16+
_logger = logging.getLogger(__name__)
17+
"""The logger for this module."""
18+
19+
_RRULE_FREQ_MAP = {
20+
Frequency.MINUTELY: rrule.MINUTELY,
21+
Frequency.HOURLY: rrule.HOURLY,
22+
Frequency.DAILY: rrule.DAILY,
23+
Frequency.WEEKLY: rrule.WEEKLY,
24+
Frequency.MONTHLY: rrule.MONTHLY,
25+
}
26+
"""To map from our Frequency enum to the dateutil library enum."""
27+
28+
_RRULE_WEEKDAY_MAP = {
29+
Weekday.MONDAY: rrule.MO,
30+
Weekday.TUESDAY: rrule.TU,
31+
Weekday.WEDNESDAY: rrule.WE,
32+
Weekday.THURSDAY: rrule.TH,
33+
Weekday.FRIDAY: rrule.FR,
34+
Weekday.SATURDAY: rrule.SA,
35+
Weekday.SUNDAY: rrule.SU,
36+
}
37+
"""To map from our Weekday enum to the dateutil library enum."""
38+
39+
40+
@dataclass
41+
class Dispatch(BaseDispatch):
42+
"""Dispatch type with extra functionality."""
43+
44+
deleted: bool = False
45+
"""Whether the dispatch is deleted."""
46+
47+
last_sent_running_state_change_notification: datetime | None = None
48+
"""The last time a message was sent about the running state change."""
49+
50+
def __init__(self, **kwargs):
51+
"""Initialize the dispatch.
52+
53+
Args:
54+
**kwargs: The dispatch attributes to set.
55+
"""
56+
super().__init__(**kwargs)
57+
58+
self.deleted = kwargs.get("deleted", False)
59+
60+
@property
61+
def running(self) -> bool:
62+
"""Check if the dispatch is currently supposed to be running.
63+
64+
Returns:
65+
True if the dispatch is currently meant to be running, False otherwise.
66+
"""
67+
if not self.active or self.deleted:
68+
return False
69+
70+
now = datetime.now(tz=timezone.utc)
71+
if until := self._until(now):
72+
return now < until
73+
74+
return False
75+
76+
@property
77+
def until(self) -> datetime | None:
78+
"""Time when the dispatch should end.
79+
80+
Returns the time that a running dispatch should end.
81+
If the dispatch is not running, None is returned.
82+
83+
Returns:
84+
The time when the dispatch should end or None if the dispatch is not running.
85+
"""
86+
if not self.active or self.deleted:
87+
return None
88+
89+
now = datetime.now(tz=timezone.utc)
90+
return self._until(now)
91+
92+
def missed_runs(self) -> Generator[datetime, None, None]:
93+
"""Yield all missed runs of a dispatch.
94+
95+
Yields all missed runs of a dispatch.
96+
97+
If a running state change notification was not sent in time
98+
due to connection issues, this method will yield all missed runs
99+
since the last sent notification.
100+
101+
Returns:
102+
A generator that yields all missed runs of a dispatch.
103+
"""
104+
if self.update_time == self.last_sent_running_state_change_notification:
105+
return
106+
107+
next_run = self.update_time
108+
now = datetime.now(tz=timezone.utc)
109+
110+
while (next_run := self.next_run(next_run)) and next_run < now:
111+
yield next_run
112+
113+
def next_run(self, _from: datetime) -> datetime | None:
114+
"""Calculate the next run of a dispatch.
115+
116+
Args:
117+
_from: The time to calculate the next run from.
118+
119+
Returns:
120+
The next run of the dispatch or None if the dispatch is finished.
121+
"""
122+
if (
123+
not self.recurrence.frequency
124+
or self.recurrence.frequency == Frequency.UNSPECIFIED
125+
):
126+
if _from > self.start_time:
127+
return None
128+
return self.start_time
129+
130+
# Make sure no weekday is UNSPECIFIED
131+
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
132+
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
133+
return None
134+
135+
# No type information for rrule, so we need to cast
136+
return cast(datetime | None, self._prepare_rrule().after(_from, inc=True))
137+
138+
def _prepare_rrule(self) -> rrule.rrule:
139+
"""Prepare the rrule object.
140+
141+
Returns:
142+
The rrule object.
143+
"""
144+
count, until = (None, None)
145+
if end := self.recurrence.end_criteria:
146+
count = end.count
147+
until = end.until
148+
149+
rrule_obj = rrule.rrule(
150+
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
151+
dtstart=self.start_time,
152+
count=count,
153+
until=until,
154+
byminute=self.recurrence.byminutes,
155+
byhour=self.recurrence.byhours,
156+
byweekday=[
157+
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
158+
],
159+
bymonthday=self.recurrence.bymonthdays,
160+
bymonth=self.recurrence.bymonths,
161+
interval=self.recurrence.interval,
162+
)
163+
164+
return rrule_obj
165+
166+
def _until(self, now: datetime) -> datetime | None:
167+
"""Calculate the time when the dispatch should end.
168+
169+
If no previous run is found, None is returned.
170+
171+
Args:
172+
now: The current time.
173+
174+
Returns:
175+
The time when the dispatch should end or None if the dispatch is not running.
176+
"""
177+
latest_past_start = self._prepare_rrule().before(now, inc=True)
178+
179+
if not latest_past_start:
180+
return None
181+
182+
return latest_past_start + self.duration

src/frequenz/dispatch/_dispatcher.py

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99
import grpc.aio
1010
from frequenz.channels import Broadcast, Receiver
11-
from frequenz.client.dispatch.types import Dispatch
1211

13-
from frequenz.dispatch._event import DispatchEvent
14-
from frequenz.dispatch.actor import DispatchingActor
12+
from ._dispatch import Dispatch
13+
from ._event import DispatchEvent
14+
from .actor import DispatchingActor
1515

1616
ReceivedT = TypeVar("ReceivedT")
1717
"""The type being received."""
@@ -48,7 +48,7 @@ class Dispatcher:
4848
4949
allows to receive new dispatches and ready dispatches.
5050
51-
Example: Processing ready-to-execute dispatches
51+
Example: Processing running state change dispatches
5252
```python
5353
import grpc.aio
5454
@@ -58,13 +58,18 @@ async def run():
5858
service_address = "localhost:50051"
5959
6060
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
61-
dispatcher.start() # this will start the actor
6261
63-
ready_receiver = dispatcher.ready_to_execute.new_receiver()
62+
changed_running_status_rx= dispatcher.running_status_change.new_receiver()
6463
65-
async for dispatch in ready_receiver:
64+
async for dispatch in changed_running_status_rx:
6665
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
67-
# execute the dispatch
66+
if dispatch.running:
67+
# this will start the actor
68+
# and run it for the duration of the dispatch
69+
dispatcher.start(duration=dispatch.duration, dry_run=dispatch.dry_run)
70+
else:
71+
dispatcher.stop() # this will stop the actor
72+
6873
```
6974
7075
Example: Getting notification about dispatch lifecycle events
@@ -107,14 +112,14 @@ def __init__(
107112
grpc_channel: The gRPC channel.
108113
svc_addr: The service address.
109114
"""
110-
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
111-
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
115+
self._running_state_channel = Broadcast[Dispatch]("ready_dispatches")
116+
self._lifecycle_events_channel = Broadcast[DispatchEvent]("new_dispatches")
112117
self._actor = DispatchingActor(
113118
microgrid_id,
114119
grpc_channel,
115120
svc_addr,
116-
self._updated_channel.new_sender(),
117-
self._ready_channel.new_sender(),
121+
self._lifecycle_events_channel.new_sender(),
122+
self._running_state_channel.new_sender(),
118123
)
119124

120125
async def start(self) -> None:
@@ -123,18 +128,46 @@ async def start(self) -> None:
123128

124129
@property
125130
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
126-
"""Return new, updated or deleted dispatches receiver.
131+
"""Return new, updated or deleted dispatches receiver fetcher.
127132
128133
Returns:
129134
A new receiver for new dispatches.
130135
"""
131-
return self._updated_channel
136+
return self._lifecycle_events_channel
132137

133138
@property
134-
def ready_to_execute(self) -> ReceiverFetcher[Dispatch]:
135-
"""Return ready dispatches receiver.
139+
def running_status_change(self) -> ReceiverFetcher[Dispatch]:
140+
"""Return running status change receiver fetcher.
141+
142+
This receiver will receive a message whenever the current running
143+
status of a dispatch changes.
144+
145+
Usually, one message per scheduled run is to be expected.
146+
However, things get complicated when a dispatch was modified:
147+
148+
If it was currently running and the modification now says
149+
it should not be running or running with different parameters,
150+
then a message will be sent.
151+
152+
In other words: Any change that is expected to make an actor start, stop
153+
or reconfigure itself with new parameters causes a message to be
154+
sent.
155+
156+
A non-exhaustive list of possible changes that will cause a message to be sent:
157+
- The normal scheduled start_time has been reached
158+
- The duration of the dispatch has been modified
159+
- The start_time has been modified to be in the future
160+
- The component selection changed
161+
- The active status changed
162+
- The dry_run status changed
163+
- The payload changed
164+
- The dispatch was deleted
165+
166+
Note: Reaching the end time (start_time + duration) will not
167+
send a message, except when it was reached by modifying the duration.
168+
136169
137170
Returns:
138-
A new receiver for ready dispatches.
171+
A new receiver for dispatches whose running status changed.
139172
"""
140-
return self._ready_channel
173+
return self._running_state_channel

0 commit comments

Comments
 (0)