Skip to content

Make ready_to_execute channel more general #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 52 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,61 @@
## Introduction

A highlevel interface for the dispatch API.
The interface is made of the dispatch actor which should run once per microgrid.
It provides two channels for clients:
- "new_dispatches" for newly created dispatches
- "ready_dispatches" for dispatches that are ready to be executed

## Example Usage
See [the documentation](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch) for more information.

## Usage

The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher), the main entry point for the API, provides two channels:

* [Lifecycle events](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.lifecycle_events): A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
* [Running status change](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.running_status_change): Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.
Comment on lines +15 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About these links, we were using redirect for version aliases because GitHub didn't support symlinks. With redirects we can't use direct links to an alias, but with symlinks we can. You could update the alias_type configuration from redirect to symlink and then link to the latest alias for example, instead of v0.1.

That said, any linking to any particular version will be bad as things are now, because the README is also rendered as the home page of the project, and if you are visiting the docs for any particular version, the links in that version home page will point to a different version.

This is why I'm more inclined to not provide deep links to docs in the README, just explain in plain text and add a link to the docs home page (without any version specification, just https://frequenz-floss.github.io/frequenz-dispatch-python/) for people looking for more info.

That, or stop including the README entirely in the docs/index.md and do some partial inclusions (and leave stuff with deep links to docs out of the included parts) like we do with channels:

Happy to leave this for a future PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that in the next iteration that we already plan :) I am afraid to add more stuff here currently


### Example using the running status change channel

```python
async def run():
# dispatch helper sends out dispatches when they are due
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()

async for selected in select(dispatch_ready, dispatch_arrived):
if selected_from(selected, dispatch_ready):
dispatch = selected.value
match dispatch.type:
case DISPATCH_TYPE_BATTERY_CHARGE:
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
battery_pool.set_charge(dispatch.power)
...
if selected_from(selected, dispatch_arrived):
match selected.value:
case Created(dispatch):
log.info("New dispatch arrived %s", dispatch)
...
case Updated(dispatch):
log.info("Dispatch updated %s", dispatch)
...
case Deleted(dispatch):
log.info("Dispatch deleted %s", dispatch)
...
import os
import grpc.aio
from unittest.mock import MagicMock

async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")

service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
await dispatcher.start()

actor = MagicMock() # replace with your actor

changed_running_status_rx = dispatcher.running_status_change.new_receiver()

async for dispatch in changed_running_status_rx:
match dispatch.running("DEMO_TYPE"):
case RunningState.RUNNING:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
case RunningState.STOPPED:
actor.stop() # this will stop the actor
case RunningState.DIFFERENT_TYPE:
pass # dispatch not for this type
```

## Supported Platforms
Expand Down
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()`

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch.
* `Dispatcher.client` was added to provide an easy access to the client for updating, deleting and creating dispatches

## Bug Fixes

Expand Down
19 changes: 16 additions & 3 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""A highlevel interface for the dispatch API."""
"""A highlevel interface for the dispatch API.

from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
A small overview of the most important classes in this module:

* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.

"""

from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated

__all__ = [
"Created",
Expand All @@ -13,4 +24,6 @@
"Dispatcher",
"ReceiverFetcher",
"Updated",
"Dispatch",
"RunningState",
]
251 changes: 251 additions & 0 deletions src/frequenz/dispatch/_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Dispatch type with support for next_run calculation."""


import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Iterator, cast

from dateutil import rrule
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
from frequenz.client.dispatch.types import Frequency, Weekday
Comment on lines +14 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unrelated) I don't remember the rationale about using .types here but to me it is still funny-looking that it is not just from frequenz.client.dispatch import ....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also unrelated and just purely brainstorming, but it we reversed the namespaces here, it might be more natural to use with google style imports:

from frequenz.dispatch import client

...

class Dispatch(client.Dispatch):
    ...


_logger = logging.getLogger(__name__)
"""The logger for this module."""

_RRULE_FREQ_MAP = {
Frequency.MINUTELY: rrule.MINUTELY,
Frequency.HOURLY: rrule.HOURLY,
Frequency.DAILY: rrule.DAILY,
Frequency.WEEKLY: rrule.WEEKLY,
Frequency.MONTHLY: rrule.MONTHLY,
}
"""To map from our Frequency enum to the dateutil library enum."""

_RRULE_WEEKDAY_MAP = {
Weekday.MONDAY: rrule.MO,
Weekday.TUESDAY: rrule.TU,
Weekday.WEDNESDAY: rrule.WE,
Weekday.THURSDAY: rrule.TH,
Weekday.FRIDAY: rrule.FR,
Weekday.SATURDAY: rrule.SA,
Weekday.SUNDAY: rrule.SU,
}
"""To map from our Weekday enum to the dateutil library enum."""


class RunningState(Enum):
"""The running state of a dispatch."""

RUNNING = "RUNNING"
"""The dispatch is running."""

STOPPED = "STOPPED"
"""The dispatch is stopped."""

DIFFERENT_TYPE = "DIFFERENT_TYPE"
"""The dispatch is for a different type."""


@dataclass(frozen=True)
class Dispatch(BaseDispatch):
"""Dispatch type with extra functionality."""

deleted: bool = False
"""Whether the dispatch is deleted."""

running_state_change_synced: datetime | None = None
"""The last time a message was sent about the running state change."""

def __init__(
self,
client_dispatch: BaseDispatch,
deleted: bool = False,
running_state_change_synced: datetime | None = None,
):
"""Initialize the dispatch.

Args:
client_dispatch: The client dispatch.
deleted: Whether the dispatch is deleted.
running_state_change_synced: Timestamp of the last running state change message.
"""
super().__init__(**client_dispatch.__dict__)
# Work around frozen to set deleted
object.__setattr__(self, "deleted", deleted)
object.__setattr__(
self,
"running_state_change_synced",
running_state_change_synced,
)

def _set_deleted(self) -> None:
"""Mark the dispatch as deleted."""
object.__setattr__(self, "deleted", True)

@property
def _running_status_notified(self) -> bool:
"""Check that the latest running state change notification was sent.

Returns:
True if the latest running state change notification was sent, False otherwise.
"""
return self.running_state_change_synced == self.update_time

def _set_running_status_notified(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _set_running_status_notified(self) -> None:
@_running_status_notified.setter
def _running_status_notified(self) -> None:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how I'd call this setter given that I provide no value...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sorry, I missed there was no argument. I think it would be better to either take an argument or maybe rename to something like _mark_as_sent or something conveying what this is actually doing better. But it is a minor thing if we plan to change this in the future, I'm fine with merging as is if it is going to be changed soon.

"""Mark the latest running state change notification as sent."""
object.__setattr__(self, "running_state_change_synced", self.update_time)

def running(self, type_: str) -> RunningState:
"""Check if the dispatch is currently supposed to be running.

Args:
type_: The type of the dispatch that should be running.

Returns:
RUNNING if the dispatch is running,
STOPPED if it is stopped,
DIFFERENT_TYPE if it is for a different type.
"""
if self.type != type_:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just thinking, maybe this should return None for this case, as in "Information not available"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe a three-state enum?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about raising an exception? I think a 3-state enum will be annoying to use and None will be, for most cases, the same as using False, as people will be still able to use it like if dispatch.running(): ...

If type changes are not allowed by the API this could only happen due to bugs or intentionally bad players, right?

If you don't like the exception approach I guess returning None might be the most convenient option, if it is relevant to anyone, they can do a if dispatch.running() is None: and if you don´t care you can use it as a bool (the only issue is it will probably lead to people not thinking about the None option at all, which could potentially mask issues).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, looking at the type_ argument I'm more lost than before, this interface looks very confusing to me. Why are we checking against a type passed by the user at all? Can't the user check this by itself if they are interested in it?

Copy link
Contributor Author

@Marenz Marenz Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every actor receives every dispatch, so they have to check if it is for them.

To make sure they have to check, the running() method requires the type and if the types don't match no action at all is expected.

If the users don't handle the three-state enum, they instead need to filter the dispatch. Seems at the end the same amount of work to me, only that this way forces the user and makes it impossible to forget

Copy link
Contributor

@llucax llucax Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably for a separate PR, but I was thinking that maybe this is the wrong approach. If we know any users will only be interested in a sub-set of dispatch types, then it would probably be better to have an interface like:

class FcrDispatch(Dispatch):
    def __init__(self, dispatch: Dispatch) -> None:
        ...
        # Probably parsing the payload here too, using pydantic or some other schema
        # we could even offer tools for this
        super().__init__(dispatch, FcrPayloadSchema())

class OtherDispatch(Dispatch):
    ...
        
async for dispatch in dispatcher.active.new_receiver({"FCR": FcrDispatch, "OTHER": OtherDispatch}):
    match dispatch:
        case FcrDispatch() as fcr_dispatch:
            ...
        case OtherDispatch() as other_dispatch:
            ...
        case _ as unhandled:
            assert_never(unhandled)

This way you are forced to check the type, and you can have a more specific interface for the types you care about.

So the new_receiver() method would become more complex, it needs to do the filtering and the conversion of the general Dispatch type to the specific one provided by the user, and be generic. It might be a challenge to do it properly. I guess worse case we can always only just do it with a simple T and then use: dispatcher.active.new_receiver[FcrDispatch | OtherDispatch]({"FCR": FcrDispatch, "OTHER": OtherDispatch}) which is a bit too verbose, but well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we decided to go with this approach, and also the class approach on top of it, so simple dispatch executing actors should be trivial to implement, and if you need something a bit more complex, there is still a pretty high-level interface that takes most of the complexity out.

But we'll do this in a separate PR. @Marenz do you want to create a new issue with this info or do you plan to start implementing it immediately after this is merged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll implement it right a way one FCR dispatching is up and running

return RunningState.DIFFERENT_TYPE

if not self.active or self.deleted:
return RunningState.STOPPED

now = datetime.now(tz=timezone.utc)
if until := self._until(now):
return RunningState.RUNNING if now < until else RunningState.STOPPED

return RunningState.STOPPED

@property
def until(self) -> datetime | None:
"""Time when the dispatch should end.

Returns the time that a running dispatch should end.
If the dispatch is not running, None is returned.

Returns:
The time when the dispatch should end or None if the dispatch is not running.
"""
if not self.active or self.deleted:
return None

now = datetime.now(tz=timezone.utc)
return self._until(now)

@property
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
# value needs documenting
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explain te # noqa?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? 🙏 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what to say.... the whole thing is your fault anyway, wanting to use Iterator :p

src/frequenz/dispatch/_dispatch.py:144:1: DOC405 Method Dispatch.missed_runs has both "return" and "yield" statements. Please use Generator[YieldType, SendType, ReturnType] as the return ty
pe annotation, and put your yield type in YieldType and return type in ReturnType. More details in https://jsh9.github.io/pydoclint/notes_generator_vs_iterator.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the new comment. We should probably also report the issue, it shouldn't be hard to fix I guess, just treating a return without a value specially.

"""Yield all missed runs of a dispatch.

Yields all missed runs of a dispatch.

If a running state change notification was not sent in time
due to connection issues, this method will yield all missed runs
since the last sent notification.

Returns:
A generator that yields all missed runs of a dispatch.
"""
if self.update_time == self.running_state_change_synced:
return

from_time = self.update_time
now = datetime.now(tz=timezone.utc)

while (next_run := self.next_run_after(from_time)) and next_run < now:
yield next_run
from_time = next_run

@property
def next_run(self) -> datetime | None:
"""Calculate the next run of a dispatch.

Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
return self.next_run_after(datetime.now(tz=timezone.utc))

def next_run_after(self, after: datetime) -> datetime | None:
"""Calculate the next run of a dispatch.

Args:
after: The time to calculate the next run from.

Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
):
if after > self.start_time:
return None
return self.start_time

# Make sure no weekday is UNSPECIFIED
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
return None

# No type information for rrule, so we need to cast
return cast(datetime | None, self._prepare_rrule().after(after, inc=True))

def _prepare_rrule(self) -> rrule.rrule:
"""Prepare the rrule object.

Returns:
The rrule object.
"""
count, until = (None, None)
if end := self.recurrence.end_criteria:
count = end.count
until = end.until

rrule_obj = rrule.rrule(
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
dtstart=self.start_time,
count=count,
until=until,
byminute=self.recurrence.byminutes,
byhour=self.recurrence.byhours,
byweekday=[
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
],
bymonthday=self.recurrence.bymonthdays,
bymonth=self.recurrence.bymonths,
interval=self.recurrence.interval,
)

return rrule_obj

def _until(self, now: datetime) -> datetime | None:
"""Calculate the time when the dispatch should end.

If no previous run is found, None is returned.

Args:
now: The current time.

Returns:
The time when the dispatch should end or None if the dispatch is not running.
"""
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
):
return self.start_time + self.duration

latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)

if not latest_past_start:
return None

return latest_past_start + self.duration
Loading
Loading