Skip to content

Commit 1988d99

Browse files
committed
Dispatch python interface change suggestion
Still a WIP, but already provided here for early feedback. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e6bb87f commit 1988d99

File tree

4 files changed

+72
-29
lines changed

4 files changed

+72
-29
lines changed

src/frequenz/dispatch/_dispatcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from frequenz.channels import Broadcast, Receiver
1111
from frequenz.client.dispatch.types import Dispatch
1212

13-
from frequenz.dispatch._event import DispatchEvent
13+
from frequenz.dispatch._event import DispatchCommandRequest, DispatchEvent
1414
from frequenz.dispatch.actor import DispatchingActor
1515

1616
ReceivedT = TypeVar("ReceivedT")
@@ -107,7 +107,7 @@ def __init__(
107107
grpc_channel: The gRPC channel.
108108
svc_addr: The service address.
109109
"""
110-
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
110+
self._ready_channel = Broadcast[DispatchCommandRequest]("ready_dispatches")
111111
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
112112
self._actor = DispatchingActor(
113113
microgrid_id,

src/frequenz/dispatch/_event.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,57 @@ class Deleted:
3838
This type is used to send dispatches that were created, updated or deleted
3939
over the channel.
4040
"""
41+
42+
43+
@dataclass(frozen=True)
44+
class Start:
45+
"""A request to start a dispatch.
46+
47+
Sent when a dispatch starts as scheduled or was
48+
modified to be active after.
49+
50+
Possible modifications that could lead to this:
51+
* Change of start_time to one now() - 5 seconds
52+
* Change of active to True
53+
* Change of duration to be longer
54+
"""
55+
56+
dispatch: Dispatch
57+
"""The dispatch that should start."""
58+
59+
60+
@dataclass(frozen=False)
61+
class Stop:
62+
"""A request to stop a running dispatch.
63+
64+
Sent when a dispatch naturally ends according to its duration,
65+
but also when it was deleted or modified to be no longer active.
66+
"""
67+
68+
dispatch: Dispatch
69+
"""The dispatch that is being stopped."""
70+
71+
72+
@dataclass(frozen=True)
73+
class DryRunStart:
74+
"""A request to start a dispatch in dry run mode."""
75+
76+
dispatch: Dispatch
77+
"""The dispatch that should start in dry run mode."""
78+
79+
80+
@dataclass(frozen=True)
81+
class DryRunStop:
82+
"""A request to stop a running dispatch in dry run mode."""
83+
84+
dispatch: Dispatch
85+
"""The dispatch that is being stopped in dry run mode."""
86+
87+
88+
DispatchCommandRequest = Start | Stop | DryRunStart | DryRunStop
89+
"""Type that is sent over the channel for dispatch command requests.
90+
91+
This is sent whenever a dispatch is ready to run or finished running.
92+
Other reasons include the deletion of dispatches as well as modification
93+
updates to the start time, duration or active flag, among others.
94+
"""

src/frequenz/dispatch/actor.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@
1616
from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday
1717
from frequenz.sdk.actor import Actor
1818

19-
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
19+
from frequenz.dispatch._event import (
20+
Created,
21+
Deleted,
22+
DispatchCommandRequest,
23+
DispatchEvent,
24+
DryRunStart,
25+
DryRunStop,
26+
Start,
27+
Stop,
28+
Updated,
29+
)
2030

2131
_MAX_AHEAD_SCHEDULE = timedelta(hours=5)
2232
"""The maximum time ahead to schedule a dispatch.
@@ -36,27 +46,6 @@
3646
"""The logger for this module."""
3747

3848

39-
_RRULE_FREQ_MAP = {
40-
Frequency.MINUTELY: rrule.MINUTELY,
41-
Frequency.HOURLY: rrule.HOURLY,
42-
Frequency.DAILY: rrule.DAILY,
43-
Frequency.WEEKLY: rrule.WEEKLY,
44-
Frequency.MONTHLY: rrule.MONTHLY,
45-
}
46-
"""To map from our Frequency enum to the dateutil library enum."""
47-
48-
_RRULE_WEEKDAY_MAP = {
49-
Weekday.MONDAY: rrule.MO,
50-
Weekday.TUESDAY: rrule.TU,
51-
Weekday.WEDNESDAY: rrule.WE,
52-
Weekday.THURSDAY: rrule.TH,
53-
Weekday.FRIDAY: rrule.FR,
54-
Weekday.SATURDAY: rrule.SA,
55-
Weekday.SUNDAY: rrule.SU,
56-
}
57-
"""To map from our Weekday enum to the dateutil library enum."""
58-
59-
6049
class DispatchingActor(Actor):
6150
"""Dispatch actor.
6251
@@ -73,7 +62,7 @@ def __init__(
7362
grpc_channel: grpc.aio.Channel,
7463
svc_addr: str,
7564
updated_dispatch_sender: Sender[DispatchEvent],
76-
ready_dispatch_sender: Sender[Dispatch],
65+
dispatch_cmd_sender: Sender[DispatchCommandRequest],
7766
poll_interval: timedelta = _DEFAULT_POLL_INTERVAL,
7867
) -> None:
7968
"""Initialize the actor.
@@ -83,7 +72,7 @@ def __init__(
8372
grpc_channel: The gRPC channel to use for communication with the API.
8473
svc_addr: Address of the service to connect to.
8574
updated_dispatch_sender: A sender for new or updated dispatches.
86-
ready_dispatch_sender: A sender for ready dispatches.
75+
dispatch_cmd_sender: A sender for start/stop dispatch requests
8776
poll_interval: The interval to poll the API for dispatche changes.
8877
"""
8978
super().__init__(name="dispatch")
@@ -93,7 +82,7 @@ def __init__(
9382
self._scheduled: dict[int, asyncio.Task[None]] = {}
9483
self._microgrid_id = microgrid_id
9584
self._updated_dispatch_sender = updated_dispatch_sender
96-
self._ready_dispatch_sender = ready_dispatch_sender
85+
self._cmd_sender = dispatch_cmd_sender
9786
self._poll_timer = Timer.timeout(poll_interval)
9887

9988
async def _run(self) -> None:
@@ -196,7 +185,7 @@ def next_run_info() -> tuple[datetime, datetime] | None:
196185
await asyncio.sleep((next_time - now).total_seconds())
197186

198187
_logger.info("Dispatch ready: %s", dispatch)
199-
await self._ready_dispatch_sender.send(dispatch)
188+
await self._cmd_sender.send(Start(dispatch))
200189

201190
_logger.info("Dispatch finished: %s", dispatch)
202191
self._scheduled.pop(dispatch.id)

tests/test_frequenz_dispatch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ async def send(self, msg: T) -> None:
8888
grpc_channel=MagicMock(),
8989
svc_addr="localhost",
9090
updated_dispatch_sender=YieldingSender(updated_dispatches),
91-
ready_dispatch_sender=YieldingSender(ready_dispatches),
91+
dispatch_cmd_sender=YieldingSender(ready_dispatches),
9292
)
9393

9494
client = FakeClient()

0 commit comments

Comments
 (0)