-
Notifications
You must be signed in to change notification settings - Fork 5
Make ready_to_execute
channel more general
#22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
173eb9a
20e6c21
b2b4823
901649a
1afaf09
52d22f1
addb81d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,251 @@ | ||||||||
# License: MIT | ||||||||
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH | ||||||||
|
||||||||
"""Dispatch type with support for next_run calculation.""" | ||||||||
|
||||||||
|
||||||||
import logging | ||||||||
from dataclasses import dataclass | ||||||||
from datetime import datetime, timezone | ||||||||
from enum import Enum | ||||||||
from typing import Iterator, cast | ||||||||
|
||||||||
from dateutil import rrule | ||||||||
from frequenz.client.dispatch.types import Dispatch as BaseDispatch | ||||||||
from frequenz.client.dispatch.types import Frequency, Weekday | ||||||||
Comment on lines
+14
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (unrelated) I don't remember the rationale about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also unrelated and just purely brainstorming, but it we reversed the namespaces here, it might be more natural to use with google style imports: from frequenz.dispatch import client
...
class Dispatch(client.Dispatch):
... |
||||||||
|
||||||||
_logger = logging.getLogger(__name__) | ||||||||
"""The logger for this module.""" | ||||||||
|
||||||||
_RRULE_FREQ_MAP = { | ||||||||
Frequency.MINUTELY: rrule.MINUTELY, | ||||||||
Frequency.HOURLY: rrule.HOURLY, | ||||||||
Frequency.DAILY: rrule.DAILY, | ||||||||
Frequency.WEEKLY: rrule.WEEKLY, | ||||||||
Frequency.MONTHLY: rrule.MONTHLY, | ||||||||
} | ||||||||
"""To map from our Frequency enum to the dateutil library enum.""" | ||||||||
|
||||||||
_RRULE_WEEKDAY_MAP = { | ||||||||
Weekday.MONDAY: rrule.MO, | ||||||||
Weekday.TUESDAY: rrule.TU, | ||||||||
Weekday.WEDNESDAY: rrule.WE, | ||||||||
Weekday.THURSDAY: rrule.TH, | ||||||||
Weekday.FRIDAY: rrule.FR, | ||||||||
Weekday.SATURDAY: rrule.SA, | ||||||||
Weekday.SUNDAY: rrule.SU, | ||||||||
} | ||||||||
"""To map from our Weekday enum to the dateutil library enum.""" | ||||||||
|
||||||||
|
||||||||
class RunningState(Enum): | ||||||||
"""The running state of a dispatch.""" | ||||||||
|
||||||||
RUNNING = "RUNNING" | ||||||||
"""The dispatch is running.""" | ||||||||
|
||||||||
STOPPED = "STOPPED" | ||||||||
"""The dispatch is stopped.""" | ||||||||
|
||||||||
DIFFERENT_TYPE = "DIFFERENT_TYPE" | ||||||||
"""The dispatch is for a different type.""" | ||||||||
|
||||||||
|
||||||||
@dataclass(frozen=True) | ||||||||
class Dispatch(BaseDispatch): | ||||||||
"""Dispatch type with extra functionality.""" | ||||||||
|
||||||||
deleted: bool = False | ||||||||
"""Whether the dispatch is deleted.""" | ||||||||
|
||||||||
running_state_change_synced: datetime | None = None | ||||||||
"""The last time a message was sent about the running state change.""" | ||||||||
|
||||||||
def __init__( | ||||||||
self, | ||||||||
client_dispatch: BaseDispatch, | ||||||||
deleted: bool = False, | ||||||||
running_state_change_synced: datetime | None = None, | ||||||||
): | ||||||||
"""Initialize the dispatch. | ||||||||
|
||||||||
Args: | ||||||||
client_dispatch: The client dispatch. | ||||||||
deleted: Whether the dispatch is deleted. | ||||||||
running_state_change_synced: Timestamp of the last running state change message. | ||||||||
""" | ||||||||
super().__init__(**client_dispatch.__dict__) | ||||||||
Marenz marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
# Work around frozen to set deleted | ||||||||
object.__setattr__(self, "deleted", deleted) | ||||||||
object.__setattr__( | ||||||||
self, | ||||||||
"running_state_change_synced", | ||||||||
running_state_change_synced, | ||||||||
) | ||||||||
|
||||||||
def _set_deleted(self) -> None: | ||||||||
"""Mark the dispatch as deleted.""" | ||||||||
object.__setattr__(self, "deleted", True) | ||||||||
|
||||||||
@property | ||||||||
def _running_status_notified(self) -> bool: | ||||||||
"""Check that the latest running state change notification was sent. | ||||||||
|
||||||||
Returns: | ||||||||
True if the latest running state change notification was sent, False otherwise. | ||||||||
""" | ||||||||
return self.running_state_change_synced == self.update_time | ||||||||
|
||||||||
def _set_running_status_notified(self) -> None: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how I'd call this setter given that I provide no value... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, sorry, I missed there was no argument. I think it would be better to either take an argument or maybe rename to something like |
||||||||
"""Mark the latest running state change notification as sent.""" | ||||||||
object.__setattr__(self, "running_state_change_synced", self.update_time) | ||||||||
|
||||||||
def running(self, type_: str) -> RunningState: | ||||||||
"""Check if the dispatch is currently supposed to be running. | ||||||||
|
||||||||
Args: | ||||||||
type_: The type of the dispatch that should be running. | ||||||||
|
||||||||
Returns: | ||||||||
RUNNING if the dispatch is running, | ||||||||
STOPPED if it is stopped, | ||||||||
DIFFERENT_TYPE if it is for a different type. | ||||||||
""" | ||||||||
if self.type != type_: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am just thinking, maybe this should return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or maybe a three-state enum? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about raising an exception? I think a 3-state enum will be annoying to use and If type changes are not allowed by the API this could only happen due to bugs or intentionally bad players, right? If you don't like the exception approach I guess returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmm, looking at the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Every actor receives every dispatch, so they have to check if it is for them. To make sure they have to check, the If the users don't handle the three-state enum, they instead need to filter the dispatch. Seems at the end the same amount of work to me, only that this way forces the user and makes it impossible to forget There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably for a separate PR, but I was thinking that maybe this is the wrong approach. If we know any users will only be interested in a sub-set of dispatch types, then it would probably be better to have an interface like: class FcrDispatch(Dispatch):
def __init__(self, dispatch: Dispatch) -> None:
...
# Probably parsing the payload here too, using pydantic or some other schema
# we could even offer tools for this
super().__init__(dispatch, FcrPayloadSchema())
class OtherDispatch(Dispatch):
...
async for dispatch in dispatcher.active.new_receiver({"FCR": FcrDispatch, "OTHER": OtherDispatch}):
match dispatch:
case FcrDispatch() as fcr_dispatch:
...
case OtherDispatch() as other_dispatch:
...
case _ as unhandled:
assert_never(unhandled) This way you are forced to check the type, and you can have a more specific interface for the types you care about. So the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we decided to go with this approach, and also the class approach on top of it, so simple dispatch executing actors should be trivial to implement, and if you need something a bit more complex, there is still a pretty high-level interface that takes most of the complexity out. But we'll do this in a separate PR. @Marenz do you want to create a new issue with this info or do you plan to start implementing it immediately after this is merged? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll implement it right a way one FCR dispatching is up and running |
||||||||
return RunningState.DIFFERENT_TYPE | ||||||||
|
||||||||
if not self.active or self.deleted: | ||||||||
return RunningState.STOPPED | ||||||||
|
||||||||
now = datetime.now(tz=timezone.utc) | ||||||||
if until := self._until(now): | ||||||||
return RunningState.RUNNING if now < until else RunningState.STOPPED | ||||||||
|
||||||||
return RunningState.STOPPED | ||||||||
|
||||||||
@property | ||||||||
def until(self) -> datetime | None: | ||||||||
"""Time when the dispatch should end. | ||||||||
|
||||||||
Returns the time that a running dispatch should end. | ||||||||
If the dispatch is not running, None is returned. | ||||||||
|
||||||||
Returns: | ||||||||
The time when the dispatch should end or None if the dispatch is not running. | ||||||||
""" | ||||||||
if not self.active or self.deleted: | ||||||||
return None | ||||||||
|
||||||||
now = datetime.now(tz=timezone.utc) | ||||||||
return self._until(now) | ||||||||
|
||||||||
@property | ||||||||
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return | ||||||||
# value needs documenting | ||||||||
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe explain te There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe? 🙏 😆 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure what to say.... the whole thing is your fault anyway, wanting to use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the new comment. We should probably also report the issue, it shouldn't be hard to fix I guess, just treating a |
||||||||
"""Yield all missed runs of a dispatch. | ||||||||
|
||||||||
Yields all missed runs of a dispatch. | ||||||||
|
||||||||
If a running state change notification was not sent in time | ||||||||
due to connection issues, this method will yield all missed runs | ||||||||
since the last sent notification. | ||||||||
|
||||||||
Returns: | ||||||||
A generator that yields all missed runs of a dispatch. | ||||||||
""" | ||||||||
if self.update_time == self.running_state_change_synced: | ||||||||
return | ||||||||
|
||||||||
from_time = self.update_time | ||||||||
now = datetime.now(tz=timezone.utc) | ||||||||
|
||||||||
while (next_run := self.next_run_after(from_time)) and next_run < now: | ||||||||
yield next_run | ||||||||
from_time = next_run | ||||||||
|
||||||||
@property | ||||||||
def next_run(self) -> datetime | None: | ||||||||
"""Calculate the next run of a dispatch. | ||||||||
|
||||||||
Returns: | ||||||||
The next run of the dispatch or None if the dispatch is finished. | ||||||||
""" | ||||||||
return self.next_run_after(datetime.now(tz=timezone.utc)) | ||||||||
|
||||||||
def next_run_after(self, after: datetime) -> datetime | None: | ||||||||
"""Calculate the next run of a dispatch. | ||||||||
|
||||||||
Args: | ||||||||
after: The time to calculate the next run from. | ||||||||
|
||||||||
Returns: | ||||||||
The next run of the dispatch or None if the dispatch is finished. | ||||||||
""" | ||||||||
if ( | ||||||||
not self.recurrence.frequency | ||||||||
or self.recurrence.frequency == Frequency.UNSPECIFIED | ||||||||
): | ||||||||
if after > self.start_time: | ||||||||
return None | ||||||||
return self.start_time | ||||||||
|
||||||||
# Make sure no weekday is UNSPECIFIED | ||||||||
if Weekday.UNSPECIFIED in self.recurrence.byweekdays: | ||||||||
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id) | ||||||||
return None | ||||||||
|
||||||||
# No type information for rrule, so we need to cast | ||||||||
return cast(datetime | None, self._prepare_rrule().after(after, inc=True)) | ||||||||
|
||||||||
def _prepare_rrule(self) -> rrule.rrule: | ||||||||
"""Prepare the rrule object. | ||||||||
|
||||||||
Returns: | ||||||||
The rrule object. | ||||||||
""" | ||||||||
count, until = (None, None) | ||||||||
if end := self.recurrence.end_criteria: | ||||||||
count = end.count | ||||||||
until = end.until | ||||||||
|
||||||||
rrule_obj = rrule.rrule( | ||||||||
freq=_RRULE_FREQ_MAP[self.recurrence.frequency], | ||||||||
dtstart=self.start_time, | ||||||||
count=count, | ||||||||
until=until, | ||||||||
byminute=self.recurrence.byminutes, | ||||||||
byhour=self.recurrence.byhours, | ||||||||
byweekday=[ | ||||||||
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays | ||||||||
], | ||||||||
bymonthday=self.recurrence.bymonthdays, | ||||||||
bymonth=self.recurrence.bymonths, | ||||||||
interval=self.recurrence.interval, | ||||||||
) | ||||||||
|
||||||||
return rrule_obj | ||||||||
|
||||||||
def _until(self, now: datetime) -> datetime | None: | ||||||||
"""Calculate the time when the dispatch should end. | ||||||||
|
||||||||
If no previous run is found, None is returned. | ||||||||
|
||||||||
Args: | ||||||||
now: The current time. | ||||||||
|
||||||||
Returns: | ||||||||
The time when the dispatch should end or None if the dispatch is not running. | ||||||||
""" | ||||||||
if ( | ||||||||
not self.recurrence.frequency | ||||||||
or self.recurrence.frequency == Frequency.UNSPECIFIED | ||||||||
): | ||||||||
return self.start_time + self.duration | ||||||||
|
||||||||
latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True) | ||||||||
|
||||||||
if not latest_past_start: | ||||||||
return None | ||||||||
|
||||||||
return latest_past_start + self.duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About these links, we were using redirect for version aliases because GitHub didn't support symlinks. With redirects we can't use direct links to an alias, but with symlinks we can. You could update the
alias_type
configuration fromredirect
tosymlink
and then link to thelatest
alias for example, instead ofv0.1
.That said, any linking to any particular version will be bad as things are now, because the README is also rendered as the home page of the project, and if you are visiting the docs for any particular version, the links in that version home page will point to a different version.
This is why I'm more inclined to not provide deep links to docs in the README, just explain in plain text and add a link to the docs home page (without any version specification, just https://frequenz-floss.github.io/frequenz-dispatch-python/) for people looking for more info.
That, or stop including the README entirely in the
docs/index.md
and do some partial inclusions (and leave stuff with deep links to docs out of the included parts) like we do with channels:Happy to leave this for a future PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that in the next iteration that we already plan :) I am afraid to add more stuff here currently