diff --git a/README.md b/README.md index f8cb1b6..c50cb3f 100644 --- a/README.md +++ b/README.md @@ -7,38 +7,61 @@ ## Introduction A highlevel interface for the dispatch API. -The interface is made of the dispatch actor which should run once per microgrid. -It provides two channels for clients: -- "new_dispatches" for newly created dispatches -- "ready_dispatches" for dispatches that are ready to be executed -## Example Usage +See [the documentation](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch) for more information. + +## Usage + +The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher), the main entry point for the API, provides two channels: + +* [Lifecycle events](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.lifecycle_events): A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted. +* [Running status change](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.running_status_change): Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself. + +### Example using the running status change channel ```python - async def run(): - # dispatch helper sends out dispatches when they are due - dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver() - dispatch_ready = dispatch_helper.ready_dispatches().new_receiver() - - async for selected in select(dispatch_ready, dispatch_arrived): - if selected_from(selected, dispatch_ready): - dispatch = selected.value - match dispatch.type: - case DISPATCH_TYPE_BATTERY_CHARGE: - battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id) - battery_pool.set_charge(dispatch.power) - ... - if selected_from(selected, dispatch_arrived): - match selected.value: - case Created(dispatch): - log.info("New dispatch arrived %s", dispatch) - ... - case Updated(dispatch): - log.info("Dispatch updated %s", dispatch) - ... - case Deleted(dispatch): - log.info("Dispatch deleted %s", dispatch) - ... +import os +import grpc.aio +from unittest.mock import MagicMock + +async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() + + actor = MagicMock() # replace with your actor + + changed_running_status_rx = dispatcher.running_status_change.new_receiver() + + async for dispatch in changed_running_status_rx: + match dispatch.running("DEMO_TYPE"): + case RunningState.RUNNING: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + case RunningState.STOPPED: + actor.stop() # this will stop the actor + case RunningState.DIFFERENT_TYPE: + pass # dispatch not for this type ``` ## Supported Platforms diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..48e894a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,12 @@ ## Upgrading - +* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()` ## New Features - +* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch. +* `Dispatcher.client` was added to provide an easy access to the client for updating, deleting and creating dispatches ## Bug Fixes diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 6435da9..16df1b5 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -1,10 +1,21 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""A highlevel interface for the dispatch API.""" +"""A highlevel interface for the dispatch API. -from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher -from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated +A small overview of the most important classes in this module: + +* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. +* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. +* [Created][frequenz.dispatch.Created], + [Updated][frequenz.dispatch.Updated], + [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. + +""" + +from ._dispatch import Dispatch, RunningState +from ._dispatcher import Dispatcher, ReceiverFetcher +from ._event import Created, Deleted, DispatchEvent, Updated __all__ = [ "Created", @@ -13,4 +24,6 @@ "Dispatcher", "ReceiverFetcher", "Updated", + "Dispatch", + "RunningState", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py new file mode 100644 index 0000000..cd67bd1 --- /dev/null +++ b/src/frequenz/dispatch/_dispatch.py @@ -0,0 +1,251 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Dispatch type with support for next_run calculation.""" + + +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Iterator, cast + +from dateutil import rrule +from frequenz.client.dispatch.types import Dispatch as BaseDispatch +from frequenz.client.dispatch.types import Frequency, Weekday + +_logger = logging.getLogger(__name__) +"""The logger for this module.""" + +_RRULE_FREQ_MAP = { + Frequency.MINUTELY: rrule.MINUTELY, + Frequency.HOURLY: rrule.HOURLY, + Frequency.DAILY: rrule.DAILY, + Frequency.WEEKLY: rrule.WEEKLY, + Frequency.MONTHLY: rrule.MONTHLY, +} +"""To map from our Frequency enum to the dateutil library enum.""" + +_RRULE_WEEKDAY_MAP = { + Weekday.MONDAY: rrule.MO, + Weekday.TUESDAY: rrule.TU, + Weekday.WEDNESDAY: rrule.WE, + Weekday.THURSDAY: rrule.TH, + Weekday.FRIDAY: rrule.FR, + Weekday.SATURDAY: rrule.SA, + Weekday.SUNDAY: rrule.SU, +} +"""To map from our Weekday enum to the dateutil library enum.""" + + +class RunningState(Enum): + """The running state of a dispatch.""" + + RUNNING = "RUNNING" + """The dispatch is running.""" + + STOPPED = "STOPPED" + """The dispatch is stopped.""" + + DIFFERENT_TYPE = "DIFFERENT_TYPE" + """The dispatch is for a different type.""" + + +@dataclass(frozen=True) +class Dispatch(BaseDispatch): + """Dispatch type with extra functionality.""" + + deleted: bool = False + """Whether the dispatch is deleted.""" + + running_state_change_synced: datetime | None = None + """The last time a message was sent about the running state change.""" + + def __init__( + self, + client_dispatch: BaseDispatch, + deleted: bool = False, + running_state_change_synced: datetime | None = None, + ): + """Initialize the dispatch. + + Args: + client_dispatch: The client dispatch. + deleted: Whether the dispatch is deleted. + running_state_change_synced: Timestamp of the last running state change message. + """ + super().__init__(**client_dispatch.__dict__) + # Work around frozen to set deleted + object.__setattr__(self, "deleted", deleted) + object.__setattr__( + self, + "running_state_change_synced", + running_state_change_synced, + ) + + def _set_deleted(self) -> None: + """Mark the dispatch as deleted.""" + object.__setattr__(self, "deleted", True) + + @property + def _running_status_notified(self) -> bool: + """Check that the latest running state change notification was sent. + + Returns: + True if the latest running state change notification was sent, False otherwise. + """ + return self.running_state_change_synced == self.update_time + + def _set_running_status_notified(self) -> None: + """Mark the latest running state change notification as sent.""" + object.__setattr__(self, "running_state_change_synced", self.update_time) + + def running(self, type_: str) -> RunningState: + """Check if the dispatch is currently supposed to be running. + + Args: + type_: The type of the dispatch that should be running. + + Returns: + RUNNING if the dispatch is running, + STOPPED if it is stopped, + DIFFERENT_TYPE if it is for a different type. + """ + if self.type != type_: + return RunningState.DIFFERENT_TYPE + + if not self.active or self.deleted: + return RunningState.STOPPED + + now = datetime.now(tz=timezone.utc) + if until := self._until(now): + return RunningState.RUNNING if now < until else RunningState.STOPPED + + return RunningState.STOPPED + + @property + def until(self) -> datetime | None: + """Time when the dispatch should end. + + Returns the time that a running dispatch should end. + If the dispatch is not running, None is returned. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + """ + if not self.active or self.deleted: + return None + + now = datetime.now(tz=timezone.utc) + return self._until(now) + + @property + # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return + # value needs documenting + def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 + """Yield all missed runs of a dispatch. + + Yields all missed runs of a dispatch. + + If a running state change notification was not sent in time + due to connection issues, this method will yield all missed runs + since the last sent notification. + + Returns: + A generator that yields all missed runs of a dispatch. + """ + if self.update_time == self.running_state_change_synced: + return + + from_time = self.update_time + now = datetime.now(tz=timezone.utc) + + while (next_run := self.next_run_after(from_time)) and next_run < now: + yield next_run + from_time = next_run + + @property + def next_run(self) -> datetime | None: + """Calculate the next run of a dispatch. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + return self.next_run_after(datetime.now(tz=timezone.utc)) + + def next_run_after(self, after: datetime) -> datetime | None: + """Calculate the next run of a dispatch. + + Args: + after: The time to calculate the next run from. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + ): + if after > self.start_time: + return None + return self.start_time + + # Make sure no weekday is UNSPECIFIED + if Weekday.UNSPECIFIED in self.recurrence.byweekdays: + _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id) + return None + + # No type information for rrule, so we need to cast + return cast(datetime | None, self._prepare_rrule().after(after, inc=True)) + + def _prepare_rrule(self) -> rrule.rrule: + """Prepare the rrule object. + + Returns: + The rrule object. + """ + count, until = (None, None) + if end := self.recurrence.end_criteria: + count = end.count + until = end.until + + rrule_obj = rrule.rrule( + freq=_RRULE_FREQ_MAP[self.recurrence.frequency], + dtstart=self.start_time, + count=count, + until=until, + byminute=self.recurrence.byminutes, + byhour=self.recurrence.byhours, + byweekday=[ + _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays + ], + bymonthday=self.recurrence.bymonthdays, + bymonth=self.recurrence.bymonths, + interval=self.recurrence.interval, + ) + + return rrule_obj + + def _until(self, now: datetime) -> datetime | None: + """Calculate the time when the dispatch should end. + + If no previous run is found, None is returned. + + Args: + now: The current time. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + """ + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + ): + return self.start_time + self.duration + + latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True) + + if not latest_past_start: + return None + + return latest_past_start + self.duration diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 29f1051..14b8319 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -8,10 +8,11 @@ import grpc.aio from frequenz.channels import Broadcast, Receiver -from frequenz.client.dispatch.types import Dispatch +from frequenz.client.dispatch import Client -from frequenz.dispatch._event import DispatchEvent -from frequenz.dispatch.actor import DispatchingActor +from ._dispatch import Dispatch +from ._event import DispatchEvent +from .actor import DispatchingActor ReceivedT_co = TypeVar("ReceivedT_co", covariant=True) """The type being received.""" @@ -41,44 +42,78 @@ class Dispatcher: This class provides a highlevel interface to the dispatch API. It provides two channels: - One that sends a dispatch event message whenever a dispatch is created, updated or deleted. + Lifecycle events: + A channel that sends a dispatch event message whenever a dispatch + is created, updated or deleted. - The other sends a dispatch message whenever a dispatch is ready to be - executed according to the schedule. + Running status change: + Sends a dispatch message whenever a dispatch is ready + to be executed according to the schedule or the running status of the + dispatch changed in a way that could potentially require the consumer to start, + stop or reconfigure itself. - allows to receive new dispatches and ready dispatches. - - Example: Processing ready-to-execute dispatches + Example: Processing running state change dispatches ```python + import os import grpc.aio + from frequenz.dispatch import Dispatcher, RunningState + from unittest.mock import MagicMock async def run(): - grpc_channel = grpc.aio.insecure_channel("localhost:50051") - microgrid_id = 1 - service_address = "localhost:50051" + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) - dispatcher.start() # this will start the actor - - ready_receiver = dispatcher.ready_to_execute.new_receiver() - - async for dispatch in ready_receiver: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - # execute the dispatch + await dispatcher.start() + + actor = MagicMock() # replace with your actor + + changed_running_status = dispatcher.running_status_change.new_receiver() + + async for dispatch in changed_running_status: + match dispatch.running("DEMO_TYPE"): + case RunningState.RUNNING: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + case RunningState.STOPPED: + actor.stop() # this will stop the actor + case RunningState.DIFFERENT_TYPE: + pass # dispatch not for this type ``` Example: Getting notification about dispatch lifecycle events ```python + import os from typing import assert_never import grpc.aio from frequenz.dispatch import Created, Deleted, Dispatcher, Updated - async def run(): - grpc_channel = grpc.aio.insecure_channel("localhost:50051") + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) microgrid_id = 1 - service_address = "localhost:50051" dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) dispatcher.start() # this will start the actor @@ -95,6 +130,48 @@ async def run(): case _ as unhandled: assert_never(unhandled) ``` + + Example: Creating a new dispatch and then modifying it. + Note that this uses the lower-level `Client` class to create and update the dispatch. + + ```python + import os + from datetime import datetime, timedelta, timezone + + import grpc.aio + from frequenz.client.common.microgrid.components import ComponentCategory + + from frequenz.dispatch import Dispatcher + + async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() # this will start the actor + + # Create a new dispatch + new_dispatch = await dispatcher.client.create( + microgrid_id=microgrid_id, + _type="ECHO_FREQUENCY", # replace with your own type + start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10), + duration=timedelta(minutes=5), + selector=ComponentCategory.INVERTER, + payload={"font": "Times New Roman"}, # Arbitrary payload data + ) + + # Modify the dispatch + await dispatcher.client.update( + dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)} + ) + + # Validate the modification + modified_dispatch = await dispatcher.client.get(new_dispatch.id) + assert modified_dispatch.duration == timedelta(minutes=10) + ``` """ def __init__( @@ -107,34 +184,69 @@ def __init__( grpc_channel: The gRPC channel. svc_addr: The service address. """ - self._ready_channel = Broadcast[Dispatch](name="ready_dispatches") - self._updated_channel = Broadcast[DispatchEvent](name="new_dispatches") + self._running_state_channel = Broadcast[Dispatch](name="running_state_change") + self._lifecycle_events_channel = Broadcast[DispatchEvent]( + name="lifecycle_events" + ) + self._client = Client(grpc_channel, svc_addr) self._actor = DispatchingActor( microgrid_id, - grpc_channel, - svc_addr, - self._updated_channel.new_sender(), - self._ready_channel.new_sender(), + self._client, + self._lifecycle_events_channel.new_sender(), + self._running_state_channel.new_sender(), ) async def start(self) -> None: """Start the actor.""" self._actor.start() + @property + def client(self) -> Client: + """Return the client.""" + return self._client + @property def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: - """Return new, updated or deleted dispatches receiver. + """Return new, updated or deleted dispatches receiver fetcher. Returns: A new receiver for new dispatches. """ - return self._updated_channel + return self._lifecycle_events_channel @property - def ready_to_execute(self) -> ReceiverFetcher[Dispatch]: - """Return ready dispatches receiver. + def running_status_change(self) -> ReceiverFetcher[Dispatch]: + """Return running status change receiver fetcher. + + This receiver will receive a message whenever the current running + status of a dispatch changes. + + Usually, one message per scheduled run is to be expected. + However, things get complicated when a dispatch was modified: + + If it was currently running and the modification now says + it should not be running or running with different parameters, + then a message will be sent. + + In other words: Any change that is expected to make an actor start, stop + or reconfigure itself with new parameters causes a message to be + sent. + + A non-exhaustive list of possible changes that will cause a message to be sent: + - The normal scheduled start_time has been reached + - The duration of the dispatch has been modified + - The start_time has been modified to be in the future + - The component selection changed + - The active status changed + - The dry_run status changed + - The payload changed + - The dispatch was deleted + + Note: Reaching the end time (start_time + duration) will not + send a message, except when it was reached by modifying the duration. + Returns: - A new receiver for ready dispatches. + A new receiver for dispatches whose running status changed. """ - return self._ready_channel + return self._running_state_channel diff --git a/src/frequenz/dispatch/_event.py b/src/frequenz/dispatch/_event.py index 670c501..f70d0dd 100644 --- a/src/frequenz/dispatch/_event.py +++ b/src/frequenz/dispatch/_event.py @@ -5,7 +5,7 @@ from dataclasses import dataclass -from frequenz.client.dispatch.types import Dispatch +from ._dispatch import Dispatch @dataclass(frozen=True) diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 5ca072f..30e137d 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -6,17 +6,15 @@ import asyncio import logging from datetime import datetime, timedelta, timezone -from typing import cast import grpc.aio -from dateutil import rrule from frequenz.channels import Sender from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.dispatch import Client -from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday from frequenz.sdk.actor import Actor -from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated +from ._dispatch import Dispatch, RunningState +from ._event import Created, Deleted, DispatchEvent, Updated _MAX_AHEAD_SCHEDULE = timedelta(hours=5) """The maximum time ahead to schedule a dispatch. @@ -36,27 +34,6 @@ """The logger for this module.""" -_RRULE_FREQ_MAP = { - Frequency.MINUTELY: rrule.MINUTELY, - Frequency.HOURLY: rrule.HOURLY, - Frequency.DAILY: rrule.DAILY, - Frequency.WEEKLY: rrule.WEEKLY, - Frequency.MONTHLY: rrule.MONTHLY, -} -"""To map from our Frequency enum to the dateutil library enum.""" - -_RRULE_WEEKDAY_MAP = { - Weekday.MONDAY: rrule.MO, - Weekday.TUESDAY: rrule.TU, - Weekday.WEDNESDAY: rrule.WE, - Weekday.THURSDAY: rrule.TH, - Weekday.FRIDAY: rrule.FR, - Weekday.SATURDAY: rrule.SA, - Weekday.SUNDAY: rrule.SU, -} -"""To map from our Weekday enum to the dateutil library enum.""" - - class DispatchingActor(Actor): """Dispatch actor. @@ -70,30 +47,28 @@ class DispatchingActor(Actor): def __init__( self, microgrid_id: int, - grpc_channel: grpc.aio.Channel, - svc_addr: str, - updated_dispatch_sender: Sender[DispatchEvent], - ready_dispatch_sender: Sender[Dispatch], + client: Client, + lifecycle_updates_sender: Sender[DispatchEvent], + running_state_change_sender: Sender[Dispatch], poll_interval: timedelta = _DEFAULT_POLL_INTERVAL, ) -> None: """Initialize the actor. Args: microgrid_id: The microgrid ID to handle dispatches for. - grpc_channel: The gRPC channel to use for communication with the API. - svc_addr: Address of the service to connect to. - updated_dispatch_sender: A sender for new or updated dispatches. - ready_dispatch_sender: A sender for ready dispatches. + client: The client to use for fetching dispatches. + lifecycle_updates_sender: A sender for dispatch lifecycle events. + running_state_change_sender: A sender for dispatch running state changes. poll_interval: The interval to poll the API for dispatche changes. """ super().__init__(name="dispatch") - self._client = Client(grpc_channel, svc_addr) + self._client = client self._dispatches: dict[int, Dispatch] = {} self._scheduled: dict[int, asyncio.Task[None]] = {} self._microgrid_id = microgrid_id - self._updated_dispatch_sender = updated_dispatch_sender - self._ready_dispatch_sender = ready_dispatch_sender + self._lifecycle_updates_sender = lifecycle_updates_sender + self._running_state_change_sender = running_state_change_sender self._poll_timer = Timer(poll_interval, SkipMissedAndDrift()) async def _run(self) -> None: @@ -114,18 +89,28 @@ async def _fetch(self) -> None: try: _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id) - async for dispatch in self._client.list(microgrid_id=self._microgrid_id): - self._dispatches[dispatch.id] = dispatch + async for client_dispatch in self._client.list( + microgrid_id=self._microgrid_id + ): + dispatch = Dispatch(client_dispatch) + self._dispatches[dispatch.id] = Dispatch(client_dispatch) old_dispatch = old_dispatches.pop(dispatch.id, None) if not old_dispatch: self._update_dispatch_schedule(dispatch, None) _logger.info("New dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Created(dispatch=dispatch)) + await self._lifecycle_updates_sender.send( + Created(dispatch=dispatch) + ) elif dispatch.update_time != old_dispatch.update_time: self._update_dispatch_schedule(dispatch, old_dispatch) _logger.info("Updated dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Updated(dispatch=dispatch)) + await self._lifecycle_updates_sender.send( + Updated(dispatch=dispatch) + ) + + if self._running_state_change(dispatch, old_dispatch): + await self._send_running_state_change(dispatch) except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error) @@ -134,10 +119,14 @@ async def _fetch(self) -> None: for dispatch in old_dispatches.values(): _logger.info("Deleted dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Deleted(dispatch=dispatch)) + dispatch._set_deleted() # pylint: disable=protected-access + await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) if task := self._scheduled.pop(dispatch.id, None): task.cancel() + if self._running_state_change(None, dispatch): + await self._send_running_state_change(dispatch) + def _update_dispatch_schedule( self, dispatch: Dispatch, old_dispatch: Dispatch | None ) -> None: @@ -178,7 +167,7 @@ async def _schedule_task(self, dispatch: Dispatch) -> None: def next_run_info() -> tuple[datetime, datetime] | None: now = datetime.now(tz=timezone.utc) - next_run = self.calculate_next_run(dispatch, now) + next_run = dispatch.next_run_after(now) if next_run is None: return None @@ -196,56 +185,72 @@ def next_run_info() -> tuple[datetime, datetime] | None: await asyncio.sleep((next_time - now).total_seconds()) _logger.info("Dispatch ready: %s", dispatch) - await self._ready_dispatch_sender.send(dispatch) + await self._running_state_change_sender.send(dispatch) _logger.info("Dispatch finished: %s", dispatch) self._scheduled.pop(dispatch.id) - @classmethod - def calculate_next_run(cls, dispatch: Dispatch, _from: datetime) -> datetime | None: - """Calculate the next run of a dispatch. + def _running_state_change( + self, updated_dispatch: Dispatch | None, previous_dispatch: Dispatch | None + ) -> bool: + """Check if the running state of a dispatch has changed. + + Checks if any of the running state changes to the dispatch + require a new message to be sent to the actor so that it can potentially + change its runtime configuration or start/stop itself. + + Also checks if a dispatch update was not sent due to connection issues + in which case we need to send the message now. Args: - dispatch: The dispatch to calculate the next run for. - _from: The time to calculate the next run from. + updated_dispatch: The new dispatch, if available. + previous_dispatch: The old dispatch, if available. Returns: - The next run of the dispatch or None if the dispatch is finished. + True if the running state has changed, False otherwise. """ - if ( - not dispatch.recurrence.frequency - or dispatch.recurrence.frequency == Frequency.UNSPECIFIED - ): - if _from > dispatch.start_time: - return None - return dispatch.start_time - - # Make sure no weekday is UNSPECIFIED - if Weekday.UNSPECIFIED in dispatch.recurrence.byweekdays: - _logger.warning( - "Dispatch %s has UNSPECIFIED weekday, ignoring...", dispatch.id + # New dispatch + if previous_dispatch is None: + assert updated_dispatch is not None + + # Client was not informed about the dispatch, do it now + # pylint: disable=protected-access + if not updated_dispatch._running_status_notified: + return True + + # Deleted dispatch + if updated_dispatch is None: + assert previous_dispatch is not None + return ( + previous_dispatch.running(previous_dispatch.type) + == RunningState.RUNNING ) - return None - - count, until = (None, None) - if end := dispatch.recurrence.end_criteria: - count = end.count - until = end.until - - next_run = rrule.rrule( - freq=_RRULE_FREQ_MAP[dispatch.recurrence.frequency], - dtstart=dispatch.start_time, - count=count, - until=until, - byminute=dispatch.recurrence.byminutes, - byhour=dispatch.recurrence.byhours, - byweekday=[ - _RRULE_WEEKDAY_MAP[weekday] - for weekday in dispatch.recurrence.byweekdays - ], - bymonthday=dispatch.recurrence.bymonthdays, - bymonth=dispatch.recurrence.bymonths, - interval=dispatch.recurrence.interval, - ) - - return cast(datetime | None, next_run.after(_from, inc=True)) + + # If any of the runtime attributes changed, we need to send a message + runtime_state_attributes = [ + "running", + "type", + "selector", + "duration", + "dry_run", + "payload", + ] + + for attribute in runtime_state_attributes: + if getattr(updated_dispatch, attribute) != getattr( + previous_dispatch, attribute + ): + return True + + return False + + async def _send_running_state_change(self, dispatch: Dispatch) -> None: + """Send a running state change message. + + Args: + dispatch: The dispatch that changed. + """ + await self._running_state_change_sender.send(dispatch) + # Update the last sent notification time + # so we know if this change was already sent + dispatch._set_running_status_notified() # pylint: disable=protected-access diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 194be06..74c5521 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -8,17 +8,17 @@ from datetime import datetime, timedelta, timezone from random import randint from typing import AsyncIterator, Iterator -from unittest.mock import MagicMock import async_solipsism import time_machine from frequenz.channels import Broadcast, Receiver from frequenz.client.dispatch.test.client import FakeClient, to_create_params from frequenz.client.dispatch.test.generator import DispatchGenerator -from frequenz.client.dispatch.types import Dispatch, Frequency +from frequenz.client.dispatch.types import Dispatch as BaseDispatch +from frequenz.client.dispatch.types import Frequency from pytest import fixture -from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated +from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated from frequenz.dispatch.actor import DispatchingActor @@ -62,27 +62,24 @@ class ActorTestEnv: @fixture async def actor_env() -> AsyncIterator[ActorTestEnv]: """Return an actor test environment.""" - updated_dispatches = Broadcast[DispatchEvent](name="updated_dispatches") - ready_dispatches = Broadcast[Dispatch](name="ready_dispatches") + lifecycle_updates_dispatches = Broadcast[DispatchEvent](name="lifecycle_updates") + running_state_change_dispatches = Broadcast[Dispatch](name="running_state_change") microgrid_id = randint(1, 100) + client = FakeClient() actor = DispatchingActor( microgrid_id=microgrid_id, - grpc_channel=MagicMock(), - svc_addr="localhost", - updated_dispatch_sender=updated_dispatches.new_sender(), - ready_dispatch_sender=ready_dispatches.new_sender(), + lifecycle_updates_sender=lifecycle_updates_dispatches.new_sender(), + running_state_change_sender=running_state_change_dispatches.new_sender(), + client=client, ) - client = FakeClient() - actor._client = client # pylint: disable=protected-access - actor.start() yield ActorTestEnv( actor, - updated_dispatches.new_receiver(), - ready_dispatches.new_receiver(), + lifecycle_updates_dispatches.new_receiver(), + running_state_change_dispatches.new_receiver(), client, microgrid_id, ) @@ -106,10 +103,28 @@ async def test_new_dispatch_created( await _test_new_dispatch_created(actor_env, sample) +def update_dispatch(sample: BaseDispatch, dispatch: BaseDispatch) -> BaseDispatch: + """Update the sample dispatch with the creation fields from the dispatch. + + Args: + sample: The sample dispatch to update + dispatch: The dispatch to update the sample with + + Returns: + The updated sample dispatch + """ + return replace( + sample, + update_time=dispatch.update_time, + create_time=dispatch.create_time, + id=dispatch.id, + ) + + async def _test_new_dispatch_created( actor_env: ActorTestEnv, - sample: Dispatch, -) -> Dispatch: + sample: BaseDispatch, +) -> BaseDispatch: """Test that a new dispatch is created. Args: @@ -127,13 +142,8 @@ async def _test_new_dispatch_created( case Deleted(dispatch) | Updated(dispatch): assert False, "Expected a created event" case Created(dispatch): - sample = replace( - sample, - update_time=dispatch.update_time, - create_time=dispatch.create_time, - id=dispatch.id, - ) - assert dispatch == sample + sample = update_dispatch(sample, dispatch) + assert dispatch == Dispatch(sample) return sample @@ -170,14 +180,17 @@ async def test_existing_dispatch_updated( case Created(dispatch) | Deleted(dispatch): assert False, "Expected an updated event" case Updated(dispatch): + sample = update_dispatch(sample, dispatch) sample = replace( sample, active=True, recurrence=replace(sample.recurrence, frequency=Frequency.UNSPECIFIED), - update_time=dispatch.update_time, ) - assert dispatch == sample + assert dispatch == Dispatch( + sample, + running_state_change_synced=dispatch.running_state_change_synced, + ) async def test_existing_dispatch_deleted( @@ -191,15 +204,17 @@ async def test_existing_dispatch_deleted( sample = await _test_new_dispatch_created(actor_env, sample) await actor_env.client.delete(sample.id) - fake_time.shift(timedelta(seconds=1)) + fake_time.shift(timedelta(seconds=10)) + await asyncio.sleep(10) + print("Awaiting deleted dispatch update") dispatch_event = await actor_env.updated_dispatches.receive() match dispatch_event: case Created(dispatch) | Updated(dispatch): assert False, "Expected a deleted event" case Deleted(dispatch): - sample = replace(sample, update_time=dispatch.update_time) - assert dispatch == sample + sample = update_dispatch(sample, dispatch) + assert dispatch == Dispatch(sample, deleted=True) async def test_dispatch_schedule( @@ -210,9 +225,9 @@ async def test_dispatch_schedule( """Test that a random dispatch is scheduled correctly.""" sample = generator.generate_dispatch(actor_env.microgrid_id) await actor_env.client.create(**to_create_params(sample)) - dispatch = actor_env.client.dispatches[0] + dispatch = Dispatch(actor_env.client.dispatches[0]) - next_run = DispatchingActor.calculate_next_run(dispatch, _now()) + next_run = dispatch.next_run_after(_now()) assert next_run is not None fake_time.shift(next_run - _now() - timedelta(seconds=1))