From 173eb9a239cb2867b3800f766d4c0e7955412706 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 4 Apr 2024 19:09:06 +0200 Subject: [PATCH 1/7] Dispatch python interface changes Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 4 +- src/frequenz/dispatch/__init__.py | 3 + src/frequenz/dispatch/_dispatch.py | 234 +++++++++++++++++++++++++++ src/frequenz/dispatch/_dispatcher.py | 84 +++++++--- src/frequenz/dispatch/_event.py | 2 +- src/frequenz/dispatch/actor.py | 162 ++++++++++--------- tests/test_frequenz_dispatch.py | 67 +++++--- 7 files changed, 429 insertions(+), 127 deletions(-) create mode 100644 src/frequenz/dispatch/_dispatch.py diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..2f3db22 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,11 @@ ## Upgrading - +* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()` ## New Features - +* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch. ## Bug Fixes diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 6435da9..4f2bc5b 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -6,6 +6,8 @@ from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated +from ._dispatch import Dispatch + __all__ = [ "Created", "Deleted", @@ -13,4 +15,5 @@ "Dispatcher", "ReceiverFetcher", "Updated", + "Dispatch", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py new file mode 100644 index 0000000..1210b94 --- /dev/null +++ b/src/frequenz/dispatch/_dispatch.py @@ -0,0 +1,234 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Dispatch type with support for next_run calculation.""" + + +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Iterator, cast + +from dateutil import rrule +from frequenz.client.dispatch.types import Dispatch as BaseDispatch +from frequenz.client.dispatch.types import Frequency, Weekday + +_logger = logging.getLogger(__name__) +"""The logger for this module.""" + +_RRULE_FREQ_MAP = { + Frequency.MINUTELY: rrule.MINUTELY, + Frequency.HOURLY: rrule.HOURLY, + Frequency.DAILY: rrule.DAILY, + Frequency.WEEKLY: rrule.WEEKLY, + Frequency.MONTHLY: rrule.MONTHLY, +} +"""To map from our Frequency enum to the dateutil library enum.""" + +_RRULE_WEEKDAY_MAP = { + Weekday.MONDAY: rrule.MO, + Weekday.TUESDAY: rrule.TU, + Weekday.WEDNESDAY: rrule.WE, + Weekday.THURSDAY: rrule.TH, + Weekday.FRIDAY: rrule.FR, + Weekday.SATURDAY: rrule.SA, + Weekday.SUNDAY: rrule.SU, +} +"""To map from our Weekday enum to the dateutil library enum.""" + + +@dataclass(frozen=True) +class Dispatch(BaseDispatch): + """Dispatch type with extra functionality.""" + + deleted: bool = False + """Whether the dispatch is deleted.""" + + running_state_change_synced: datetime | None = None + """The last time a message was sent about the running state change.""" + + def __init__( + self, + client_dispatch: BaseDispatch, + deleted: bool = False, + running_state_change_synced: datetime | None = None, + ): + """Initialize the dispatch. + + Args: + client_dispatch: The client dispatch. + deleted: Whether the dispatch is deleted. + running_state_change_synced: Timestamp of the last running state change message. + """ + super().__init__(**client_dispatch.__dict__) + # Work around frozen to set deleted + object.__setattr__(self, "deleted", deleted) + object.__setattr__( + self, + "running_state_change_synced", + running_state_change_synced, + ) + + def set_deleted(self) -> None: + """Mark the dispatch as deleted.""" + object.__setattr__(self, "deleted", True) + + def running_status_notified(self) -> bool: + """Check that the latest running state change notification was sent. + + Returns: + True if the latest running state change notification was sent, False otherwise. + """ + return self.running_state_change_synced == self.update_time + + def set_running_status_notified(self) -> None: + """Mark the latest running state change notification as sent.""" + object.__setattr__(self, "running_state_change_synced", self.update_time) + + def running(self, type_: str) -> bool: + """Check if the dispatch is currently supposed to be running. + + Args: + type_: The type of the dispatch that should be running. + + Returns: + True if the dispatch is currently meant to be running, False otherwise. + """ + if self.type != type_: + return False + + if not self.active or self.deleted: + return False + + now = datetime.now(tz=timezone.utc) + if until := self._until(now): + return now < until + + return False + + @property + def until(self) -> datetime | None: + """Time when the dispatch should end. + + Returns the time that a running dispatch should end. + If the dispatch is not running, None is returned. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + """ + if not self.active or self.deleted: + return None + + now = datetime.now(tz=timezone.utc) + return self._until(now) + + @property + # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return + # value needs documenting + def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 + """Yield all missed runs of a dispatch. + + Yields all missed runs of a dispatch. + + If a running state change notification was not sent in time + due to connection issues, this method will yield all missed runs + since the last sent notification. + + Returns: + A generator that yields all missed runs of a dispatch. + """ + if self.update_time == self.running_state_change_synced: + return + + from_time = self.update_time + now = datetime.now(tz=timezone.utc) + + while (next_run := self.next_run_after(from_time)) and next_run < now: + yield next_run + from_time = next_run + + @property + def next_run(self) -> datetime | None: + """Calculate the next run of a dispatch. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + return self.next_run_after(datetime.now(tz=timezone.utc)) + + def next_run_after(self, after: datetime) -> datetime | None: + """Calculate the next run of a dispatch. + + Args: + after: The time to calculate the next run from. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + ): + if after > self.start_time: + return None + return self.start_time + + # Make sure no weekday is UNSPECIFIED + if Weekday.UNSPECIFIED in self.recurrence.byweekdays: + _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id) + return None + + # No type information for rrule, so we need to cast + return cast(datetime | None, self._prepare_rrule().after(after, inc=True)) + + def _prepare_rrule(self) -> rrule.rrule: + """Prepare the rrule object. + + Returns: + The rrule object. + """ + count, until = (None, None) + if end := self.recurrence.end_criteria: + count = end.count + until = end.until + + rrule_obj = rrule.rrule( + freq=_RRULE_FREQ_MAP[self.recurrence.frequency], + dtstart=self.start_time, + count=count, + until=until, + byminute=self.recurrence.byminutes, + byhour=self.recurrence.byhours, + byweekday=[ + _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays + ], + bymonthday=self.recurrence.bymonthdays, + bymonth=self.recurrence.bymonths, + interval=self.recurrence.interval, + ) + + return rrule_obj + + def _until(self, now: datetime) -> datetime | None: + """Calculate the time when the dispatch should end. + + If no previous run is found, None is returned. + + Args: + now: The current time. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + """ + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + ): + return self.start_time + self.duration + + latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True) + + if not latest_past_start: + return None + + return latest_past_start + self.duration diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 29f1051..b93cecb 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -8,10 +8,10 @@ import grpc.aio from frequenz.channels import Broadcast, Receiver -from frequenz.client.dispatch.types import Dispatch -from frequenz.dispatch._event import DispatchEvent -from frequenz.dispatch.actor import DispatchingActor +from ._dispatch import Dispatch +from ._event import DispatchEvent +from .actor import DispatchingActor ReceivedT_co = TypeVar("ReceivedT_co", covariant=True) """The type being received.""" @@ -44,13 +44,14 @@ class Dispatcher: One that sends a dispatch event message whenever a dispatch is created, updated or deleted. The other sends a dispatch message whenever a dispatch is ready to be - executed according to the schedule. + executed according to the schedule or the running status of the dispatch + changed in a way that could potentially require the actor to start, stop or + reconfigure itself. - allows to receive new dispatches and ready dispatches. - - Example: Processing ready-to-execute dispatches + Example: Processing running state change dispatches ```python import grpc.aio + from unittest.mock import MagicMock async def run(): grpc_channel = grpc.aio.insecure_channel("localhost:50051") @@ -58,13 +59,24 @@ async def run(): service_address = "localhost:50051" dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) - dispatcher.start() # this will start the actor + actor = MagicMock() # replace with your actor - ready_receiver = dispatcher.ready_to_execute.new_receiver() + changed_running_status_rx = dispatcher.running_status_change.new_receiver() - async for dispatch in ready_receiver: + async for dispatch in changed_running_status_rx: print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - # execute the dispatch + if dispatch.running: + if actor.is_running: + actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload + ) # this will reconfigure the actor + else: + # this will start the actor + # and run it for the duration of the dispatch + actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run) + else: + actor.stop() # this will stop the actor ``` Example: Getting notification about dispatch lifecycle events @@ -107,14 +119,16 @@ def __init__( grpc_channel: The gRPC channel. svc_addr: The service address. """ - self._ready_channel = Broadcast[Dispatch](name="ready_dispatches") - self._updated_channel = Broadcast[DispatchEvent](name="new_dispatches") + self._running_state_channel = Broadcast[Dispatch](name="running_state_change") + self._lifecycle_events_channel = Broadcast[DispatchEvent]( + name="lifecycle_events" + ) self._actor = DispatchingActor( microgrid_id, grpc_channel, svc_addr, - self._updated_channel.new_sender(), - self._ready_channel.new_sender(), + self._lifecycle_events_channel.new_sender(), + self._running_state_channel.new_sender(), ) async def start(self) -> None: @@ -123,18 +137,46 @@ async def start(self) -> None: @property def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: - """Return new, updated or deleted dispatches receiver. + """Return new, updated or deleted dispatches receiver fetcher. Returns: A new receiver for new dispatches. """ - return self._updated_channel + return self._lifecycle_events_channel @property - def ready_to_execute(self) -> ReceiverFetcher[Dispatch]: - """Return ready dispatches receiver. + def running_status_change(self) -> ReceiverFetcher[Dispatch]: + """Return running status change receiver fetcher. + + This receiver will receive a message whenever the current running + status of a dispatch changes. + + Usually, one message per scheduled run is to be expected. + However, things get complicated when a dispatch was modified: + + If it was currently running and the modification now says + it should not be running or running with different parameters, + then a message will be sent. + + In other words: Any change that is expected to make an actor start, stop + or reconfigure itself with new parameters causes a message to be + sent. + + A non-exhaustive list of possible changes that will cause a message to be sent: + - The normal scheduled start_time has been reached + - The duration of the dispatch has been modified + - The start_time has been modified to be in the future + - The component selection changed + - The active status changed + - The dry_run status changed + - The payload changed + - The dispatch was deleted + + Note: Reaching the end time (start_time + duration) will not + send a message, except when it was reached by modifying the duration. + Returns: - A new receiver for ready dispatches. + A new receiver for dispatches whose running status changed. """ - return self._ready_channel + return self._running_state_channel diff --git a/src/frequenz/dispatch/_event.py b/src/frequenz/dispatch/_event.py index 670c501..f70d0dd 100644 --- a/src/frequenz/dispatch/_event.py +++ b/src/frequenz/dispatch/_event.py @@ -5,7 +5,7 @@ from dataclasses import dataclass -from frequenz.client.dispatch.types import Dispatch +from ._dispatch import Dispatch @dataclass(frozen=True) diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 5ca072f..0fa1387 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -6,18 +6,17 @@ import asyncio import logging from datetime import datetime, timedelta, timezone -from typing import cast import grpc.aio -from dateutil import rrule from frequenz.channels import Sender from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.dispatch import Client -from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday from frequenz.sdk.actor import Actor from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated +from ._dispatch import Dispatch + _MAX_AHEAD_SCHEDULE = timedelta(hours=5) """The maximum time ahead to schedule a dispatch. @@ -36,27 +35,6 @@ """The logger for this module.""" -_RRULE_FREQ_MAP = { - Frequency.MINUTELY: rrule.MINUTELY, - Frequency.HOURLY: rrule.HOURLY, - Frequency.DAILY: rrule.DAILY, - Frequency.WEEKLY: rrule.WEEKLY, - Frequency.MONTHLY: rrule.MONTHLY, -} -"""To map from our Frequency enum to the dateutil library enum.""" - -_RRULE_WEEKDAY_MAP = { - Weekday.MONDAY: rrule.MO, - Weekday.TUESDAY: rrule.TU, - Weekday.WEDNESDAY: rrule.WE, - Weekday.THURSDAY: rrule.TH, - Weekday.FRIDAY: rrule.FR, - Weekday.SATURDAY: rrule.SA, - Weekday.SUNDAY: rrule.SU, -} -"""To map from our Weekday enum to the dateutil library enum.""" - - class DispatchingActor(Actor): """Dispatch actor. @@ -72,8 +50,8 @@ def __init__( microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str, - updated_dispatch_sender: Sender[DispatchEvent], - ready_dispatch_sender: Sender[Dispatch], + lifecycle_updates_sender: Sender[DispatchEvent], + running_state_change_sender: Sender[Dispatch], poll_interval: timedelta = _DEFAULT_POLL_INTERVAL, ) -> None: """Initialize the actor. @@ -82,8 +60,8 @@ def __init__( microgrid_id: The microgrid ID to handle dispatches for. grpc_channel: The gRPC channel to use for communication with the API. svc_addr: Address of the service to connect to. - updated_dispatch_sender: A sender for new or updated dispatches. - ready_dispatch_sender: A sender for ready dispatches. + lifecycle_updates_sender: A sender for dispatch lifecycle events. + running_state_change_sender: A sender for dispatch running state changes. poll_interval: The interval to poll the API for dispatche changes. """ super().__init__(name="dispatch") @@ -92,8 +70,8 @@ def __init__( self._dispatches: dict[int, Dispatch] = {} self._scheduled: dict[int, asyncio.Task[None]] = {} self._microgrid_id = microgrid_id - self._updated_dispatch_sender = updated_dispatch_sender - self._ready_dispatch_sender = ready_dispatch_sender + self._lifecycle_updates_sender = lifecycle_updates_sender + self._running_state_change_sender = running_state_change_sender self._poll_timer = Timer(poll_interval, SkipMissedAndDrift()) async def _run(self) -> None: @@ -114,18 +92,28 @@ async def _fetch(self) -> None: try: _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id) - async for dispatch in self._client.list(microgrid_id=self._microgrid_id): - self._dispatches[dispatch.id] = dispatch + async for client_dispatch in self._client.list( + microgrid_id=self._microgrid_id + ): + dispatch = Dispatch(client_dispatch) + self._dispatches[dispatch.id] = Dispatch(client_dispatch) old_dispatch = old_dispatches.pop(dispatch.id, None) if not old_dispatch: self._update_dispatch_schedule(dispatch, None) _logger.info("New dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Created(dispatch=dispatch)) + await self._lifecycle_updates_sender.send( + Created(dispatch=dispatch) + ) elif dispatch.update_time != old_dispatch.update_time: self._update_dispatch_schedule(dispatch, old_dispatch) _logger.info("Updated dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Updated(dispatch=dispatch)) + await self._lifecycle_updates_sender.send( + Updated(dispatch=dispatch) + ) + + if self._running_state_change(dispatch, old_dispatch): + await self._send_running_state_change(dispatch) except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error) @@ -134,10 +122,14 @@ async def _fetch(self) -> None: for dispatch in old_dispatches.values(): _logger.info("Deleted dispatch: %s", dispatch) - await self._updated_dispatch_sender.send(Deleted(dispatch=dispatch)) + dispatch.set_deleted() + await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) if task := self._scheduled.pop(dispatch.id, None): task.cancel() + if self._running_state_change(None, dispatch): + await self._send_running_state_change(dispatch) + def _update_dispatch_schedule( self, dispatch: Dispatch, old_dispatch: Dispatch | None ) -> None: @@ -178,7 +170,7 @@ async def _schedule_task(self, dispatch: Dispatch) -> None: def next_run_info() -> tuple[datetime, datetime] | None: now = datetime.now(tz=timezone.utc) - next_run = self.calculate_next_run(dispatch, now) + next_run = dispatch.next_run_after(now) if next_run is None: return None @@ -196,56 +188,68 @@ def next_run_info() -> tuple[datetime, datetime] | None: await asyncio.sleep((next_time - now).total_seconds()) _logger.info("Dispatch ready: %s", dispatch) - await self._ready_dispatch_sender.send(dispatch) + await self._running_state_change_sender.send(dispatch) _logger.info("Dispatch finished: %s", dispatch) self._scheduled.pop(dispatch.id) - @classmethod - def calculate_next_run(cls, dispatch: Dispatch, _from: datetime) -> datetime | None: - """Calculate the next run of a dispatch. + def _running_state_change( + self, updated_dispatch: Dispatch | None, previous_dispatch: Dispatch | None + ) -> bool: + """Check if the running state of a dispatch has changed. + + Checks if any of the running state changes to the dispatch + require a new message to be sent to the actor so that it can potentially + change its runtime configuration or start/stop itself. + + Also checks if a dispatch update was not sent due to connection issues + in which case we need to send the message now. Args: - dispatch: The dispatch to calculate the next run for. - _from: The time to calculate the next run from. + updated_dispatch: The new dispatch, if available. + previous_dispatch: The old dispatch, if available. Returns: - The next run of the dispatch or None if the dispatch is finished. + True if the running state has changed, False otherwise. """ - if ( - not dispatch.recurrence.frequency - or dispatch.recurrence.frequency == Frequency.UNSPECIFIED - ): - if _from > dispatch.start_time: - return None - return dispatch.start_time + # New dispatch + if previous_dispatch is None: + assert updated_dispatch is not None + + # Client was not informed about the dispatch, do it now + if not updated_dispatch.running_status_notified(): + return True + + # Deleted dispatch + if updated_dispatch is None: + assert previous_dispatch is not None + return previous_dispatch.running(previous_dispatch.type) + + # If any of the runtime attributes changed, we need to send a message + runtime_state_attributes = [ + "running", + "type", + "selector", + "duration", + "dry_run", + "payload", + ] + + for attribute in runtime_state_attributes: + if getattr(updated_dispatch, attribute) != getattr( + previous_dispatch, attribute + ): + return True + + return False + + async def _send_running_state_change(self, dispatch: Dispatch) -> None: + """Send a running state change message. - # Make sure no weekday is UNSPECIFIED - if Weekday.UNSPECIFIED in dispatch.recurrence.byweekdays: - _logger.warning( - "Dispatch %s has UNSPECIFIED weekday, ignoring...", dispatch.id - ) - return None - - count, until = (None, None) - if end := dispatch.recurrence.end_criteria: - count = end.count - until = end.until - - next_run = rrule.rrule( - freq=_RRULE_FREQ_MAP[dispatch.recurrence.frequency], - dtstart=dispatch.start_time, - count=count, - until=until, - byminute=dispatch.recurrence.byminutes, - byhour=dispatch.recurrence.byhours, - byweekday=[ - _RRULE_WEEKDAY_MAP[weekday] - for weekday in dispatch.recurrence.byweekdays - ], - bymonthday=dispatch.recurrence.bymonthdays, - bymonth=dispatch.recurrence.bymonths, - interval=dispatch.recurrence.interval, - ) - - return cast(datetime | None, next_run.after(_from, inc=True)) + Args: + dispatch: The dispatch that changed. + """ + await self._running_state_change_sender.send(dispatch) + # Update the last sent notification time + # so we know if this change was already sent + dispatch.set_running_status_notified() diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 194be06..3bb4e44 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -15,10 +15,11 @@ from frequenz.channels import Broadcast, Receiver from frequenz.client.dispatch.test.client import FakeClient, to_create_params from frequenz.client.dispatch.test.generator import DispatchGenerator -from frequenz.client.dispatch.types import Dispatch, Frequency +from frequenz.client.dispatch.types import Dispatch as BaseDispatch +from frequenz.client.dispatch.types import Frequency from pytest import fixture -from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated +from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated from frequenz.dispatch.actor import DispatchingActor @@ -62,16 +63,16 @@ class ActorTestEnv: @fixture async def actor_env() -> AsyncIterator[ActorTestEnv]: """Return an actor test environment.""" - updated_dispatches = Broadcast[DispatchEvent](name="updated_dispatches") - ready_dispatches = Broadcast[Dispatch](name="ready_dispatches") + lifecycle_updates_dispatches = Broadcast[DispatchEvent](name="lifecycle_updates") + running_state_change_dispatches = Broadcast[Dispatch](name="running_state_change") microgrid_id = randint(1, 100) actor = DispatchingActor( microgrid_id=microgrid_id, grpc_channel=MagicMock(), svc_addr="localhost", - updated_dispatch_sender=updated_dispatches.new_sender(), - ready_dispatch_sender=ready_dispatches.new_sender(), + lifecycle_updates_sender=lifecycle_updates_dispatches.new_sender(), + running_state_change_sender=running_state_change_dispatches.new_sender(), ) client = FakeClient() @@ -81,8 +82,8 @@ async def actor_env() -> AsyncIterator[ActorTestEnv]: yield ActorTestEnv( actor, - updated_dispatches.new_receiver(), - ready_dispatches.new_receiver(), + lifecycle_updates_dispatches.new_receiver(), + running_state_change_dispatches.new_receiver(), client, microgrid_id, ) @@ -106,10 +107,28 @@ async def test_new_dispatch_created( await _test_new_dispatch_created(actor_env, sample) +def update_dispatch(sample: BaseDispatch, dispatch: BaseDispatch) -> BaseDispatch: + """Update the sample dispatch with the creation fields from the dispatch. + + Args: + sample: The sample dispatch to update + dispatch: The dispatch to update the sample with + + Returns: + The updated sample dispatch + """ + return replace( + sample, + update_time=dispatch.update_time, + create_time=dispatch.create_time, + id=dispatch.id, + ) + + async def _test_new_dispatch_created( actor_env: ActorTestEnv, - sample: Dispatch, -) -> Dispatch: + sample: BaseDispatch, +) -> BaseDispatch: """Test that a new dispatch is created. Args: @@ -127,13 +146,8 @@ async def _test_new_dispatch_created( case Deleted(dispatch) | Updated(dispatch): assert False, "Expected a created event" case Created(dispatch): - sample = replace( - sample, - update_time=dispatch.update_time, - create_time=dispatch.create_time, - id=dispatch.id, - ) - assert dispatch == sample + sample = update_dispatch(sample, dispatch) + assert dispatch == Dispatch(sample) return sample @@ -170,14 +184,17 @@ async def test_existing_dispatch_updated( case Created(dispatch) | Deleted(dispatch): assert False, "Expected an updated event" case Updated(dispatch): + sample = update_dispatch(sample, dispatch) sample = replace( sample, active=True, recurrence=replace(sample.recurrence, frequency=Frequency.UNSPECIFIED), - update_time=dispatch.update_time, ) - assert dispatch == sample + assert dispatch == Dispatch( + sample, + running_state_change_synced=dispatch.running_state_change_synced, + ) async def test_existing_dispatch_deleted( @@ -191,15 +208,17 @@ async def test_existing_dispatch_deleted( sample = await _test_new_dispatch_created(actor_env, sample) await actor_env.client.delete(sample.id) - fake_time.shift(timedelta(seconds=1)) + fake_time.shift(timedelta(seconds=10)) + await asyncio.sleep(10) + print("Awaiting deleted dispatch update") dispatch_event = await actor_env.updated_dispatches.receive() match dispatch_event: case Created(dispatch) | Updated(dispatch): assert False, "Expected a deleted event" case Deleted(dispatch): - sample = replace(sample, update_time=dispatch.update_time) - assert dispatch == sample + sample = update_dispatch(sample, dispatch) + assert dispatch == Dispatch(sample, deleted=True) async def test_dispatch_schedule( @@ -210,9 +229,9 @@ async def test_dispatch_schedule( """Test that a random dispatch is scheduled correctly.""" sample = generator.generate_dispatch(actor_env.microgrid_id) await actor_env.client.create(**to_create_params(sample)) - dispatch = actor_env.client.dispatches[0] + dispatch = Dispatch(actor_env.client.dispatches[0]) - next_run = DispatchingActor.calculate_next_run(dispatch, _now()) + next_run = dispatch.next_run_after(_now()) assert next_run is not None fake_time.shift(next_run - _now() - timedelta(seconds=1)) From 20e6c21d0c1c0bf1a92f23f143028da224b94241 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 15 Apr 2024 15:41:44 +0200 Subject: [PATCH 2/7] Use relative imports Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/__init__.py | 5 ++--- src/frequenz/dispatch/actor.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 4f2bc5b..ee7e261 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -3,10 +3,9 @@ """A highlevel interface for the dispatch API.""" -from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher -from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated - from ._dispatch import Dispatch +from ._dispatcher import Dispatcher, ReceiverFetcher +from ._event import Created, Deleted, DispatchEvent, Updated __all__ = [ "Created", diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 0fa1387..6e821c1 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -13,9 +13,8 @@ from frequenz.client.dispatch import Client from frequenz.sdk.actor import Actor -from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated - from ._dispatch import Dispatch +from ._event import Created, Deleted, DispatchEvent, Updated _MAX_AHEAD_SCHEDULE = timedelta(hours=5) """The maximum time ahead to schedule a dispatch. From b2b482320b802ca4fb7078f55da64631f6d108b3 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 17 Apr 2024 19:17:31 +0200 Subject: [PATCH 3/7] Add `Dispatcher.client` property for easy client access When you want to manipulate dispatches, you will need direct access to the client. Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/_dispatcher.py | 10 ++++++++-- src/frequenz/dispatch/actor.py | 8 +++----- tests/test_frequenz_dispatch.py | 8 ++------ 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 2f3db22..48e894a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,6 +11,7 @@ ## New Features * Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch. +* `Dispatcher.client` was added to provide an easy access to the client for updating, deleting and creating dispatches ## Bug Fixes diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index b93cecb..ce6f3a2 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -8,6 +8,7 @@ import grpc.aio from frequenz.channels import Broadcast, Receiver +from frequenz.client.dispatch import Client from ._dispatch import Dispatch from ._event import DispatchEvent @@ -123,10 +124,10 @@ def __init__( self._lifecycle_events_channel = Broadcast[DispatchEvent]( name="lifecycle_events" ) + self._client = Client(grpc_channel, svc_addr) self._actor = DispatchingActor( microgrid_id, - grpc_channel, - svc_addr, + self._client, self._lifecycle_events_channel.new_sender(), self._running_state_channel.new_sender(), ) @@ -135,6 +136,11 @@ async def start(self) -> None: """Start the actor.""" self._actor.start() + @property + def client(self) -> Client: + """Return the client.""" + return self._client + @property def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: """Return new, updated or deleted dispatches receiver fetcher. diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 6e821c1..8589682 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -47,8 +47,7 @@ class DispatchingActor(Actor): def __init__( self, microgrid_id: int, - grpc_channel: grpc.aio.Channel, - svc_addr: str, + client: Client, lifecycle_updates_sender: Sender[DispatchEvent], running_state_change_sender: Sender[Dispatch], poll_interval: timedelta = _DEFAULT_POLL_INTERVAL, @@ -57,15 +56,14 @@ def __init__( Args: microgrid_id: The microgrid ID to handle dispatches for. - grpc_channel: The gRPC channel to use for communication with the API. - svc_addr: Address of the service to connect to. + client: The client to use for fetching dispatches. lifecycle_updates_sender: A sender for dispatch lifecycle events. running_state_change_sender: A sender for dispatch running state changes. poll_interval: The interval to poll the API for dispatche changes. """ super().__init__(name="dispatch") - self._client = Client(grpc_channel, svc_addr) + self._client = client self._dispatches: dict[int, Dispatch] = {} self._scheduled: dict[int, asyncio.Task[None]] = {} self._microgrid_id = microgrid_id diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 3bb4e44..74c5521 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta, timezone from random import randint from typing import AsyncIterator, Iterator -from unittest.mock import MagicMock import async_solipsism import time_machine @@ -66,18 +65,15 @@ async def actor_env() -> AsyncIterator[ActorTestEnv]: lifecycle_updates_dispatches = Broadcast[DispatchEvent](name="lifecycle_updates") running_state_change_dispatches = Broadcast[Dispatch](name="running_state_change") microgrid_id = randint(1, 100) + client = FakeClient() actor = DispatchingActor( microgrid_id=microgrid_id, - grpc_channel=MagicMock(), - svc_addr="localhost", lifecycle_updates_sender=lifecycle_updates_dispatches.new_sender(), running_state_change_sender=running_state_change_dispatches.new_sender(), + client=client, ) - client = FakeClient() - actor._client = client # pylint: disable=protected-access - actor.start() yield ActorTestEnv( From 901649adeeb863d8ee5f6814306b629f9b70d892 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 17 Apr 2024 19:17:58 +0200 Subject: [PATCH 4/7] Update & extend usage examples Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 74 ++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index ce6f3a2..26a52f1 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -51,47 +51,61 @@ class Dispatcher: Example: Processing running state change dispatches ```python + import os import grpc.aio from unittest.mock import MagicMock async def run(): - grpc_channel = grpc.aio.insecure_channel("localhost:50051") - microgrid_id = 1 - service_address = "localhost:50051" + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() + actor = MagicMock() # replace with your actor changed_running_status_rx = dispatcher.running_status_change.new_receiver() async for dispatch in changed_running_status_rx: + if dispatch.type != "DEMO_TYPE": + continue + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") if dispatch.running: if actor.is_running: actor.reconfigure( - components=dispatch.selector, - run_parameters=dispatch.payload ) # this will reconfigure the actor else: - # this will start the actor + # this will start a new or reconfigure a running actor # and run it for the duration of the dispatch - actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run) + actor.start_or_reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) else: actor.stop() # this will stop the actor ``` Example: Getting notification about dispatch lifecycle events ```python + import os from typing import assert_never import grpc.aio from frequenz.dispatch import Created, Deleted, Dispatcher, Updated - async def run(): - grpc_channel = grpc.aio.insecure_channel("localhost:50051") + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) microgrid_id = 1 - service_address = "localhost:50051" dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) dispatcher.start() # this will start the actor @@ -108,6 +122,46 @@ async def run(): case _ as unhandled: assert_never(unhandled) ``` + Example: Creating a new dispatch and then modifying it. Note that this uses + the lower-level `Client` class to create and update the dispatch. + ```python + import os + from datetime import datetime, timedelta, timezone + + import grpc.aio + from frequenz.client.common.microgrid.components import ComponentCategory + + from frequenz.dispatch import Dispatcher + + async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() # this will start the actor + + # Create a new dispatch + new_dispatch = await dispatcher.client.create( + microgrid_id=microgrid_id, + _type="ECHO_FREQUENCY", # replace with your own type + start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10), + duration=timedelta(minutes=5), + selector=ComponentCategory.INVERTER, + payload={"font": "Times New Roman"}, # Arbitrary payload data + ) + + # Modify the dispatch + await dispatcher.client.update( + dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)} + ) + + # Validate the modification + modified_dispatch = await dispatcher.client.get(new_dispatch.id) + assert modified_dispatch.duration == timedelta(minutes=10) + ``` """ def __init__( From 1afaf09d82a92d02d78ad5bd516bf489508b51bc Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 29 Apr 2024 13:05:58 +0200 Subject: [PATCH 5/7] Use enum for the values returned by `is_running()` Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/__init__.py | 3 +- src/frequenz/dispatch/_dispatch.py | 28 +++++++++++---- src/frequenz/dispatch/_dispatcher.py | 51 +++++++++++++++------------- src/frequenz/dispatch/actor.py | 7 ++-- 4 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index ee7e261..f2f8cbb 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -3,7 +3,7 @@ """A highlevel interface for the dispatch API.""" -from ._dispatch import Dispatch +from ._dispatch import Dispatch, RunningState from ._dispatcher import Dispatcher, ReceiverFetcher from ._event import Created, Deleted, DispatchEvent, Updated @@ -15,4 +15,5 @@ "ReceiverFetcher", "Updated", "Dispatch", + "RunningState", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index 1210b94..4fd5c94 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -7,6 +7,7 @@ import logging from dataclasses import dataclass from datetime import datetime, timezone +from enum import Enum from typing import Iterator, cast from dateutil import rrule @@ -37,6 +38,19 @@ """To map from our Weekday enum to the dateutil library enum.""" +class RunningState(Enum): + """The running state of a dispatch.""" + + RUNNING = "RUNNING" + """The dispatch is running.""" + + STOPPED = "STOPPED" + """The dispatch is stopped.""" + + DIFFERENT_TYPE = "DIFFERENT_TYPE" + """The dispatch is for a different type.""" + + @dataclass(frozen=True) class Dispatch(BaseDispatch): """Dispatch type with extra functionality.""" @@ -85,26 +99,28 @@ def set_running_status_notified(self) -> None: """Mark the latest running state change notification as sent.""" object.__setattr__(self, "running_state_change_synced", self.update_time) - def running(self, type_: str) -> bool: + def running(self, type_: str) -> RunningState: """Check if the dispatch is currently supposed to be running. Args: type_: The type of the dispatch that should be running. Returns: - True if the dispatch is currently meant to be running, False otherwise. + RUNNING if the dispatch is running, + STOPPED if it is stopped, + DIFFERENT_TYPE if it is for a different type. """ if self.type != type_: - return False + return RunningState.DIFFERENT_TYPE if not self.active or self.deleted: - return False + return RunningState.STOPPED now = datetime.now(tz=timezone.utc) if until := self._until(now): - return now < until + return RunningState.RUNNING if now < until else RunningState.STOPPED - return False + return RunningState.STOPPED @property def until(self) -> datetime | None: diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 26a52f1..6656229 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -50,46 +50,51 @@ class Dispatcher: reconfigure itself. Example: Processing running state change dispatches - ```python - import os - import grpc.aio - from unittest.mock import MagicMock - async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") + ```python + import os + import grpc.aio + from unittest.mock import MagicMock - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.insecure_channel(service_address) - microgrid_id = 1 - dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) - await dispatcher.start() + async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") - actor = MagicMock() # replace with your actor + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() - changed_running_status_rx = dispatcher.running_status_change.new_receiver() + actor = MagicMock() # replace with your actor - async for dispatch in changed_running_status_rx: - if dispatch.type != "DEMO_TYPE": - continue + changed_running_status_rx = dispatcher.running_status_change.new_receiver() - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if dispatch.running: + async for dispatch in changed_running_status_rx: + match dispatch.running("DEMO_TYPE"): + case RunningState.RUNNING: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") if actor.is_running: actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, ) # this will reconfigure the actor else: - # this will start a new or reconfigure a running actor + # this will start a new actor with the given components # and run it for the duration of the dispatch - actor.start_or_reconfigure( + actor.start( components=dispatch.selector, run_parameters=dispatch.payload, # custom actor parameters dry_run=dispatch.dry_run, until=dispatch.until, ) - else: + case RunningState.STOPPED: actor.stop() # this will stop the actor - ``` + case RunningState.DIFFERENT_TYPE: + pass # dispatch not for this type + ``` Example: Getting notification about dispatch lifecycle events ```python diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 8589682..4bdb0c8 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -13,7 +13,7 @@ from frequenz.client.dispatch import Client from frequenz.sdk.actor import Actor -from ._dispatch import Dispatch +from ._dispatch import Dispatch, RunningState from ._event import Created, Deleted, DispatchEvent, Updated _MAX_AHEAD_SCHEDULE = timedelta(hours=5) @@ -220,7 +220,10 @@ def _running_state_change( # Deleted dispatch if updated_dispatch is None: assert previous_dispatch is not None - return previous_dispatch.running(previous_dispatch.type) + return ( + previous_dispatch.running(previous_dispatch.type) + == RunningState.RUNNING + ) # If any of the runtime attributes changed, we need to send a message runtime_state_attributes = [ From 52d22f17112876c6ce490c5c49f82875a606f168 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 30 Apr 2024 14:39:45 +0200 Subject: [PATCH 6/7] Enhance & extend documentation and examples Signed-off-by: Mathias L. Baumann --- README.md | 81 ++++++++++++-------- src/frequenz/dispatch/__init__.py | 12 ++- src/frequenz/dispatch/_dispatcher.py | 107 ++++++++++++++------------- 3 files changed, 119 insertions(+), 81 deletions(-) diff --git a/README.md b/README.md index f8cb1b6..c50cb3f 100644 --- a/README.md +++ b/README.md @@ -7,38 +7,61 @@ ## Introduction A highlevel interface for the dispatch API. -The interface is made of the dispatch actor which should run once per microgrid. -It provides two channels for clients: -- "new_dispatches" for newly created dispatches -- "ready_dispatches" for dispatches that are ready to be executed -## Example Usage +See [the documentation](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch) for more information. + +## Usage + +The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher), the main entry point for the API, provides two channels: + +* [Lifecycle events](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.lifecycle_events): A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted. +* [Running status change](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.running_status_change): Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself. + +### Example using the running status change channel ```python - async def run(): - # dispatch helper sends out dispatches when they are due - dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver() - dispatch_ready = dispatch_helper.ready_dispatches().new_receiver() - - async for selected in select(dispatch_ready, dispatch_arrived): - if selected_from(selected, dispatch_ready): - dispatch = selected.value - match dispatch.type: - case DISPATCH_TYPE_BATTERY_CHARGE: - battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id) - battery_pool.set_charge(dispatch.power) - ... - if selected_from(selected, dispatch_arrived): - match selected.value: - case Created(dispatch): - log.info("New dispatch arrived %s", dispatch) - ... - case Updated(dispatch): - log.info("Dispatch updated %s", dispatch) - ... - case Deleted(dispatch): - log.info("Dispatch deleted %s", dispatch) - ... +import os +import grpc.aio +from unittest.mock import MagicMock + +async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") + + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() + + actor = MagicMock() # replace with your actor + + changed_running_status_rx = dispatcher.running_status_change.new_receiver() + + async for dispatch in changed_running_status_rx: + match dispatch.running("DEMO_TYPE"): + case RunningState.RUNNING: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + case RunningState.STOPPED: + actor.stop() # this will stop the actor + case RunningState.DIFFERENT_TYPE: + pass # dispatch not for this type ``` ## Supported Platforms diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index f2f8cbb..16df1b5 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -1,7 +1,17 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""A highlevel interface for the dispatch API.""" +"""A highlevel interface for the dispatch API. + +A small overview of the most important classes in this module: + +* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. +* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. +* [Created][frequenz.dispatch.Created], + [Updated][frequenz.dispatch.Updated], + [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. + +""" from ._dispatch import Dispatch, RunningState from ._dispatcher import Dispatcher, ReceiverFetcher diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 6656229..14b8319 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -42,59 +42,62 @@ class Dispatcher: This class provides a highlevel interface to the dispatch API. It provides two channels: - One that sends a dispatch event message whenever a dispatch is created, updated or deleted. + Lifecycle events: + A channel that sends a dispatch event message whenever a dispatch + is created, updated or deleted. - The other sends a dispatch message whenever a dispatch is ready to be - executed according to the schedule or the running status of the dispatch - changed in a way that could potentially require the actor to start, stop or - reconfigure itself. + Running status change: + Sends a dispatch message whenever a dispatch is ready + to be executed according to the schedule or the running status of the + dispatch changed in a way that could potentially require the consumer to start, + stop or reconfigure itself. Example: Processing running state change dispatches + ```python + import os + import grpc.aio + from frequenz.dispatch import Dispatcher, RunningState + from unittest.mock import MagicMock + + async def run(): + host = os.getenv("DISPATCH_API_HOST", "localhost") + port = os.getenv("DISPATCH_API_PORT", "50051") - ```python - import os - import grpc.aio - from unittest.mock import MagicMock - - async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") - - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.insecure_channel(service_address) - microgrid_id = 1 - dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) - await dispatcher.start() - - actor = MagicMock() # replace with your actor - - changed_running_status_rx = dispatcher.running_status_change.new_receiver() - - async for dispatch in changed_running_status_rx: - match dispatch.running("DEMO_TYPE"): - case RunningState.RUNNING: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.selector, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.selector, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - case RunningState.STOPPED: - actor.stop() # this will stop the actor - case RunningState.DIFFERENT_TYPE: - pass # dispatch not for this type - ``` + service_address = f"{host}:{port}" + grpc_channel = grpc.aio.insecure_channel(service_address) + microgrid_id = 1 + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + await dispatcher.start() + + actor = MagicMock() # replace with your actor + + changed_running_status = dispatcher.running_status_change.new_receiver() + + async for dispatch in changed_running_status: + match dispatch.running("DEMO_TYPE"): + case RunningState.RUNNING: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.selector, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + case RunningState.STOPPED: + actor.stop() # this will stop the actor + case RunningState.DIFFERENT_TYPE: + pass # dispatch not for this type + ``` Example: Getting notification about dispatch lifecycle events ```python @@ -127,8 +130,10 @@ async def run(): case _ as unhandled: assert_never(unhandled) ``` - Example: Creating a new dispatch and then modifying it. Note that this uses - the lower-level `Client` class to create and update the dispatch. + + Example: Creating a new dispatch and then modifying it. + Note that this uses the lower-level `Client` class to create and update the dispatch. + ```python import os from datetime import datetime, timedelta, timezone From addb81d1e19e5608a3b4f87bb27e14db5fbeff5c Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 30 Apr 2024 14:40:46 +0200 Subject: [PATCH 7/7] Make running status methods private They are meant to be used only by the dispatch actor. Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatch.py | 7 ++++--- src/frequenz/dispatch/actor.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index 4fd5c94..cd67bd1 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -83,11 +83,12 @@ def __init__( running_state_change_synced, ) - def set_deleted(self) -> None: + def _set_deleted(self) -> None: """Mark the dispatch as deleted.""" object.__setattr__(self, "deleted", True) - def running_status_notified(self) -> bool: + @property + def _running_status_notified(self) -> bool: """Check that the latest running state change notification was sent. Returns: @@ -95,7 +96,7 @@ def running_status_notified(self) -> bool: """ return self.running_state_change_synced == self.update_time - def set_running_status_notified(self) -> None: + def _set_running_status_notified(self) -> None: """Mark the latest running state change notification as sent.""" object.__setattr__(self, "running_state_change_synced", self.update_time) diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 4bdb0c8..30e137d 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -119,7 +119,7 @@ async def _fetch(self) -> None: for dispatch in old_dispatches.values(): _logger.info("Deleted dispatch: %s", dispatch) - dispatch.set_deleted() + dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) if task := self._scheduled.pop(dispatch.id, None): task.cancel() @@ -214,7 +214,8 @@ def _running_state_change( assert updated_dispatch is not None # Client was not informed about the dispatch, do it now - if not updated_dispatch.running_status_notified(): + # pylint: disable=protected-access + if not updated_dispatch._running_status_notified: return True # Deleted dispatch @@ -252,4 +253,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None: await self._running_state_change_sender.send(dispatch) # Update the last sent notification time # so we know if this change was already sent - dispatch.set_running_status_notified() + dispatch._set_running_status_notified() # pylint: disable=protected-access