Skip to content

Commit 69a72ec

Browse files
authored
Make ready_to_execute channel more general (#22)
* Rename `ready_to_execute` channel to `running_status_change` reflecting it's broader purpose. * Small re-designs and changes about the interfaces in general * Add own `Dispatch` type with useful functions on top of the original base class
2 parents 64dea91 + addb81d commit 69a72ec

File tree

8 files changed

+604
-184
lines changed

8 files changed

+604
-184
lines changed

README.md

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,61 @@
77
## Introduction
88

99
A highlevel interface for the dispatch API.
10-
The interface is made of the dispatch actor which should run once per microgrid.
11-
It provides two channels for clients:
12-
- "new_dispatches" for newly created dispatches
13-
- "ready_dispatches" for dispatches that are ready to be executed
1410

15-
## Example Usage
11+
See [the documentation](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch) for more information.
12+
13+
## Usage
14+
15+
The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher), the main entry point for the API, provides two channels:
16+
17+
* [Lifecycle events](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.lifecycle_events): A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
18+
* [Running status change](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.running_status_change): Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.
19+
20+
### Example using the running status change channel
1621

1722
```python
18-
async def run():
19-
# dispatch helper sends out dispatches when they are due
20-
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
21-
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()
22-
23-
async for selected in select(dispatch_ready, dispatch_arrived):
24-
if selected_from(selected, dispatch_ready):
25-
dispatch = selected.value
26-
match dispatch.type:
27-
case DISPATCH_TYPE_BATTERY_CHARGE:
28-
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
29-
battery_pool.set_charge(dispatch.power)
30-
...
31-
if selected_from(selected, dispatch_arrived):
32-
match selected.value:
33-
case Created(dispatch):
34-
log.info("New dispatch arrived %s", dispatch)
35-
...
36-
case Updated(dispatch):
37-
log.info("Dispatch updated %s", dispatch)
38-
...
39-
case Deleted(dispatch):
40-
log.info("Dispatch deleted %s", dispatch)
41-
...
23+
import os
24+
import grpc.aio
25+
from unittest.mock import MagicMock
26+
27+
async def run():
28+
host = os.getenv("DISPATCH_API_HOST", "localhost")
29+
port = os.getenv("DISPATCH_API_PORT", "50051")
30+
31+
service_address = f"{host}:{port}"
32+
grpc_channel = grpc.aio.insecure_channel(service_address)
33+
microgrid_id = 1
34+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
35+
await dispatcher.start()
36+
37+
actor = MagicMock() # replace with your actor
38+
39+
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
40+
41+
async for dispatch in changed_running_status_rx:
42+
match dispatch.running("DEMO_TYPE"):
43+
case RunningState.RUNNING:
44+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
45+
if actor.is_running:
46+
actor.reconfigure(
47+
components=dispatch.selector,
48+
run_parameters=dispatch.payload, # custom actor parameters
49+
dry_run=dispatch.dry_run,
50+
until=dispatch.until,
51+
) # this will reconfigure the actor
52+
else:
53+
# this will start a new actor with the given components
54+
# and run it for the duration of the dispatch
55+
actor.start(
56+
components=dispatch.selector,
57+
run_parameters=dispatch.payload, # custom actor parameters
58+
dry_run=dispatch.dry_run,
59+
until=dispatch.until,
60+
)
61+
case RunningState.STOPPED:
62+
actor.stop() # this will stop the actor
63+
case RunningState.DIFFERENT_TYPE:
64+
pass # dispatch not for this type
4265
```
4366

4467
## Supported Platforms

RELEASE_NOTES.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()`
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch.
14+
* `Dispatcher.client` was added to provide an easy access to the client for updating, deleting and creating dispatches
1415

1516
## Bug Fixes
1617

src/frequenz/dispatch/__init__.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""A highlevel interface for the dispatch API."""
4+
"""A highlevel interface for the dispatch API.
55
6-
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
7-
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
6+
A small overview of the most important classes in this module:
7+
8+
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
9+
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [Created][frequenz.dispatch.Created],
11+
[Updated][frequenz.dispatch.Updated],
12+
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
13+
14+
"""
15+
16+
from ._dispatch import Dispatch, RunningState
17+
from ._dispatcher import Dispatcher, ReceiverFetcher
18+
from ._event import Created, Deleted, DispatchEvent, Updated
819

920
__all__ = [
1021
"Created",
@@ -13,4 +24,6 @@
1324
"Dispatcher",
1425
"ReceiverFetcher",
1526
"Updated",
27+
"Dispatch",
28+
"RunningState",
1629
]

src/frequenz/dispatch/_dispatch.py

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Dispatch type with support for next_run calculation."""
5+
6+
7+
import logging
8+
from dataclasses import dataclass
9+
from datetime import datetime, timezone
10+
from enum import Enum
11+
from typing import Iterator, cast
12+
13+
from dateutil import rrule
14+
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
15+
from frequenz.client.dispatch.types import Frequency, Weekday
16+
17+
_logger = logging.getLogger(__name__)
18+
"""The logger for this module."""
19+
20+
_RRULE_FREQ_MAP = {
21+
Frequency.MINUTELY: rrule.MINUTELY,
22+
Frequency.HOURLY: rrule.HOURLY,
23+
Frequency.DAILY: rrule.DAILY,
24+
Frequency.WEEKLY: rrule.WEEKLY,
25+
Frequency.MONTHLY: rrule.MONTHLY,
26+
}
27+
"""To map from our Frequency enum to the dateutil library enum."""
28+
29+
_RRULE_WEEKDAY_MAP = {
30+
Weekday.MONDAY: rrule.MO,
31+
Weekday.TUESDAY: rrule.TU,
32+
Weekday.WEDNESDAY: rrule.WE,
33+
Weekday.THURSDAY: rrule.TH,
34+
Weekday.FRIDAY: rrule.FR,
35+
Weekday.SATURDAY: rrule.SA,
36+
Weekday.SUNDAY: rrule.SU,
37+
}
38+
"""To map from our Weekday enum to the dateutil library enum."""
39+
40+
41+
class RunningState(Enum):
42+
"""The running state of a dispatch."""
43+
44+
RUNNING = "RUNNING"
45+
"""The dispatch is running."""
46+
47+
STOPPED = "STOPPED"
48+
"""The dispatch is stopped."""
49+
50+
DIFFERENT_TYPE = "DIFFERENT_TYPE"
51+
"""The dispatch is for a different type."""
52+
53+
54+
@dataclass(frozen=True)
55+
class Dispatch(BaseDispatch):
56+
"""Dispatch type with extra functionality."""
57+
58+
deleted: bool = False
59+
"""Whether the dispatch is deleted."""
60+
61+
running_state_change_synced: datetime | None = None
62+
"""The last time a message was sent about the running state change."""
63+
64+
def __init__(
65+
self,
66+
client_dispatch: BaseDispatch,
67+
deleted: bool = False,
68+
running_state_change_synced: datetime | None = None,
69+
):
70+
"""Initialize the dispatch.
71+
72+
Args:
73+
client_dispatch: The client dispatch.
74+
deleted: Whether the dispatch is deleted.
75+
running_state_change_synced: Timestamp of the last running state change message.
76+
"""
77+
super().__init__(**client_dispatch.__dict__)
78+
# Work around frozen to set deleted
79+
object.__setattr__(self, "deleted", deleted)
80+
object.__setattr__(
81+
self,
82+
"running_state_change_synced",
83+
running_state_change_synced,
84+
)
85+
86+
def _set_deleted(self) -> None:
87+
"""Mark the dispatch as deleted."""
88+
object.__setattr__(self, "deleted", True)
89+
90+
@property
91+
def _running_status_notified(self) -> bool:
92+
"""Check that the latest running state change notification was sent.
93+
94+
Returns:
95+
True if the latest running state change notification was sent, False otherwise.
96+
"""
97+
return self.running_state_change_synced == self.update_time
98+
99+
def _set_running_status_notified(self) -> None:
100+
"""Mark the latest running state change notification as sent."""
101+
object.__setattr__(self, "running_state_change_synced", self.update_time)
102+
103+
def running(self, type_: str) -> RunningState:
104+
"""Check if the dispatch is currently supposed to be running.
105+
106+
Args:
107+
type_: The type of the dispatch that should be running.
108+
109+
Returns:
110+
RUNNING if the dispatch is running,
111+
STOPPED if it is stopped,
112+
DIFFERENT_TYPE if it is for a different type.
113+
"""
114+
if self.type != type_:
115+
return RunningState.DIFFERENT_TYPE
116+
117+
if not self.active or self.deleted:
118+
return RunningState.STOPPED
119+
120+
now = datetime.now(tz=timezone.utc)
121+
if until := self._until(now):
122+
return RunningState.RUNNING if now < until else RunningState.STOPPED
123+
124+
return RunningState.STOPPED
125+
126+
@property
127+
def until(self) -> datetime | None:
128+
"""Time when the dispatch should end.
129+
130+
Returns the time that a running dispatch should end.
131+
If the dispatch is not running, None is returned.
132+
133+
Returns:
134+
The time when the dispatch should end or None if the dispatch is not running.
135+
"""
136+
if not self.active or self.deleted:
137+
return None
138+
139+
now = datetime.now(tz=timezone.utc)
140+
return self._until(now)
141+
142+
@property
143+
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
144+
# value needs documenting
145+
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405
146+
"""Yield all missed runs of a dispatch.
147+
148+
Yields all missed runs of a dispatch.
149+
150+
If a running state change notification was not sent in time
151+
due to connection issues, this method will yield all missed runs
152+
since the last sent notification.
153+
154+
Returns:
155+
A generator that yields all missed runs of a dispatch.
156+
"""
157+
if self.update_time == self.running_state_change_synced:
158+
return
159+
160+
from_time = self.update_time
161+
now = datetime.now(tz=timezone.utc)
162+
163+
while (next_run := self.next_run_after(from_time)) and next_run < now:
164+
yield next_run
165+
from_time = next_run
166+
167+
@property
168+
def next_run(self) -> datetime | None:
169+
"""Calculate the next run of a dispatch.
170+
171+
Returns:
172+
The next run of the dispatch or None if the dispatch is finished.
173+
"""
174+
return self.next_run_after(datetime.now(tz=timezone.utc))
175+
176+
def next_run_after(self, after: datetime) -> datetime | None:
177+
"""Calculate the next run of a dispatch.
178+
179+
Args:
180+
after: The time to calculate the next run from.
181+
182+
Returns:
183+
The next run of the dispatch or None if the dispatch is finished.
184+
"""
185+
if (
186+
not self.recurrence.frequency
187+
or self.recurrence.frequency == Frequency.UNSPECIFIED
188+
):
189+
if after > self.start_time:
190+
return None
191+
return self.start_time
192+
193+
# Make sure no weekday is UNSPECIFIED
194+
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
195+
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
196+
return None
197+
198+
# No type information for rrule, so we need to cast
199+
return cast(datetime | None, self._prepare_rrule().after(after, inc=True))
200+
201+
def _prepare_rrule(self) -> rrule.rrule:
202+
"""Prepare the rrule object.
203+
204+
Returns:
205+
The rrule object.
206+
"""
207+
count, until = (None, None)
208+
if end := self.recurrence.end_criteria:
209+
count = end.count
210+
until = end.until
211+
212+
rrule_obj = rrule.rrule(
213+
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
214+
dtstart=self.start_time,
215+
count=count,
216+
until=until,
217+
byminute=self.recurrence.byminutes,
218+
byhour=self.recurrence.byhours,
219+
byweekday=[
220+
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
221+
],
222+
bymonthday=self.recurrence.bymonthdays,
223+
bymonth=self.recurrence.bymonths,
224+
interval=self.recurrence.interval,
225+
)
226+
227+
return rrule_obj
228+
229+
def _until(self, now: datetime) -> datetime | None:
230+
"""Calculate the time when the dispatch should end.
231+
232+
If no previous run is found, None is returned.
233+
234+
Args:
235+
now: The current time.
236+
237+
Returns:
238+
The time when the dispatch should end or None if the dispatch is not running.
239+
"""
240+
if (
241+
not self.recurrence.frequency
242+
or self.recurrence.frequency == Frequency.UNSPECIFIED
243+
):
244+
return self.start_time + self.duration
245+
246+
latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)
247+
248+
if not latest_past_start:
249+
return None
250+
251+
return latest_past_start + self.duration

0 commit comments

Comments
 (0)