Skip to content

Commit de4682a

Browse files
committed
Add dispatch mananging actor
A useful actor to help control and manange another actor using dispatches. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent f316833 commit de4682a

File tree

5 files changed

+349
-2
lines changed

5 files changed

+349
-2
lines changed

RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* We now provide the `DispatchManagingActor` class, a class to manage actors based on incoming dispatches.
1414

1515
## Bug Fixes
1616

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ dependencies = [
4141
# plugins.mkdocstrings.handlers.python.import)
4242
"frequenz-sdk == 1.0.0-rc900",
4343
# "frequenz-channels >= 1.1.0, < 2.0.0",
44-
"frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@refs/pull/323/head",
44+
"frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@v1.x.x",
4545
# "frequenz-client-dispatch >= 0.6.0, < 0.7.0",
4646
"frequenz-client-dispatch @ git+https://github.com/frequenz-floss/frequenz-client-dispatch-python.git@refs/pull/87/head",
4747
]

src/frequenz/dispatch/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to
11+
manage other actors based on incoming dispatches.
1012
* [Created][frequenz.dispatch.Created],
1113
[Updated][frequenz.dispatch.Updated],
1214
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
@@ -16,6 +18,7 @@
1618
from ._dispatch import Dispatch, RunningState
1719
from ._dispatcher import Dispatcher, ReceiverFetcher
1820
from ._event import Created, Deleted, DispatchEvent, Updated
21+
from ._managing_actor import DispatchManagingActor, DispatchUpdate
1922

2023
__all__ = [
2124
"Created",
@@ -26,4 +29,6 @@
2629
"Updated",
2730
"Dispatch",
2831
"RunningState",
32+
"DispatchManagingActor",
33+
"DispatchUpdate",
2934
]
+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import logging
7+
from dataclasses import dataclass
8+
from typing import Any
9+
10+
from frequenz.channels import Receiver, Sender
11+
from frequenz.client.dispatch.types import ComponentSelector
12+
from frequenz.sdk.actor import Actor
13+
14+
from ._dispatch import Dispatch, RunningState
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class DispatchUpdate:
21+
"""Event emitted when the dispatch changes."""
22+
23+
components: ComponentSelector
24+
"""Components to be used."""
25+
26+
dry_run: bool
27+
"""Whether this is a dry run."""
28+
29+
options: dict[str, Any]
30+
"""Additional options."""
31+
32+
33+
class DispatchManagingActor(Actor):
34+
"""Helper class to manage actors based on dispatches.
35+
36+
Example usage:
37+
38+
```python
39+
import os
40+
import asyncio
41+
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
42+
from frequenz.client.dispatch.types import ComponentSelector
43+
from frequenz.client.common.microgrid.components import ComponentCategory
44+
45+
from frequenz.channels import Receiver, Broadcast
46+
47+
class MyActor(Actor):
48+
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49+
super().__init__()
50+
self._updates_channel = updates_channel
51+
self._dry_run: bool
52+
self._options : dict[str, Any]
53+
54+
async def _run(self) -> None:
55+
while True:
56+
update = await self._updates_channel.receive()
57+
print("Received update:", update)
58+
59+
self.set_components(update.components)
60+
self._dry_run = update.dry_run
61+
self._options = update.options
62+
63+
def set_components(self, components: ComponentSelector) -> None:
64+
match components:
65+
case [int(), *_] as component_ids:
66+
print("Dispatch: Setting components to %s", components)
67+
case [ComponentCategory.BATTERY, *_]:
68+
print("Dispatch: Using all battery components")
69+
case unsupported:
70+
print(
71+
"Dispatch: Requested an unsupported selector %r, "
72+
"but only component IDs or category BATTERY are supported.",
73+
unsupported,
74+
)
75+
76+
async def run():
77+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
78+
key = os.getenv("DISPATCH_API_KEY", "some-key")
79+
80+
microgrid_id = 1
81+
82+
dispatcher = Dispatcher(
83+
microgrid_id=microgrid_id,
84+
server_url=url,
85+
key=key
86+
)
87+
88+
# Create update channel to receive dispatch update events pre-start and mid-run
89+
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
90+
91+
# Start actor and supporting actor, give each a config channel receiver
92+
my_actor = MyActor(dispatch_updates_channel.new_receiver())
93+
94+
status_receiver = dispatcher.running_status_change.new_receiver()
95+
96+
dispatch_runner = DispatchManagingActor(
97+
actor=my_actor,
98+
dispatch_type="EXAMPLE",
99+
running_status_receiver=status_receiver,
100+
updates_sender=dispatch_updates_channel.new_sender(),
101+
)
102+
103+
await asyncio.gather(dispatcher.start(), dispatch_runner.start())
104+
```
105+
"""
106+
107+
def __init__(
108+
self,
109+
actor: Actor,
110+
dispatch_type: str,
111+
running_status_receiver: Receiver[Dispatch],
112+
updates_sender: Sender[DispatchUpdate] | None = None,
113+
) -> None:
114+
"""Initialize the dispatch handler.
115+
116+
Args:
117+
actor: The actor to manage.
118+
dispatch_type: The type of dispatches to handle.
119+
running_status_receiver: The receiver for dispatch running status changes.
120+
updates_sender: The sender for dispatch events
121+
"""
122+
super().__init__()
123+
self._dispatch_rx = running_status_receiver
124+
self._actor = actor
125+
self._dispatch_type = dispatch_type
126+
self._updates_sender = updates_sender
127+
128+
def _start_actor(self) -> None:
129+
"""Start the actor."""
130+
if self._actor.is_running:
131+
_logger.warning("Actor %s is already running", self._actor.name)
132+
else:
133+
self._actor.start()
134+
135+
async def _stop_actor(self, msg: str) -> None:
136+
"""Stop the actor.
137+
138+
Args:
139+
msg: The message to be passed to the actor being stopped.
140+
"""
141+
if self._actor.is_running:
142+
await self._actor.stop(msg)
143+
else:
144+
_logger.warning("Actor %s is not running", self._actor.name)
145+
146+
async def _run(self) -> None:
147+
"""Wait for dispatches and handle them."""
148+
async for dispatch in self._dispatch_rx:
149+
await self._handle_dispatch(dispatch=dispatch)
150+
151+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
152+
"""Handle a dispatch.
153+
154+
Args:
155+
dispatch: The dispatch to handle.
156+
"""
157+
running = dispatch.running(self._dispatch_type)
158+
match running:
159+
case RunningState.STOPPED:
160+
_logger.info("Stopped by dispatch %s", dispatch.id)
161+
await self._stop_actor("Dispatch stopped")
162+
case RunningState.RUNNING:
163+
if self._updates_sender is not None:
164+
_logger.info("Updated by dispatch %s", dispatch.id)
165+
await self._updates_sender.send(
166+
DispatchUpdate(
167+
components=dispatch.selector,
168+
dry_run=dispatch.dry_run,
169+
options=dispatch.payload,
170+
)
171+
)
172+
173+
_logger.info("Started by dispatch %s", dispatch.id)
174+
self._start_actor()
175+
case RunningState.DIFFERENT_TYPE:
176+
_logger.debug(
177+
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
178+
)

tests/test_mananging_actor.py

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# LICENSE: ALL RIGHTS RESERVED
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test the dispatch runner."""
5+
6+
import asyncio
7+
from dataclasses import dataclass, replace
8+
from datetime import datetime, timedelta, timezone
9+
from typing import AsyncIterator, Iterator
10+
11+
import async_solipsism
12+
import pytest
13+
import time_machine
14+
from frequenz.channels import Broadcast, Receiver, Sender
15+
from frequenz.client.dispatch.test.generator import DispatchGenerator
16+
from frequenz.client.dispatch.types import Frequency
17+
from frequenz.sdk.actor import Actor
18+
from pytest import fixture
19+
20+
from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate
21+
22+
23+
# This method replaces the event loop for all tests in the file.
24+
@pytest.fixture
25+
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
26+
"""Return an event loop policy that uses the async solipsism event loop."""
27+
return async_solipsism.EventLoopPolicy()
28+
29+
30+
@fixture
31+
def fake_time() -> Iterator[time_machine.Coordinates]:
32+
"""Replace real time with a time machine that doesn't automatically tick."""
33+
# destination can be a datetime or a timestamp (int), so are moving to the
34+
# epoch (in UTC!)
35+
with time_machine.travel(destination=0, tick=False) as traveller:
36+
yield traveller
37+
38+
39+
def _now() -> datetime:
40+
"""Return the current time in UTC."""
41+
return datetime.now(tz=timezone.utc)
42+
43+
44+
class MockActor(Actor):
45+
"""Mock actor for testing."""
46+
47+
async def _run(self) -> None:
48+
while True:
49+
await asyncio.sleep(1)
50+
51+
52+
@dataclass
53+
class TestEnv:
54+
"""Test environment."""
55+
56+
actor: Actor
57+
runner_actor: DispatchManagingActor
58+
running_status_sender: Sender[Dispatch]
59+
configuration_receiver: Receiver[DispatchUpdate]
60+
generator: DispatchGenerator = DispatchGenerator()
61+
62+
63+
@fixture
64+
async def test_env() -> AsyncIterator[TestEnv]:
65+
"""Create a test environment."""
66+
channel = Broadcast[Dispatch](name="dispatch ready test channel")
67+
config_channel = Broadcast[DispatchUpdate](name="dispatch config test channel")
68+
69+
actor = MockActor()
70+
71+
runner_actor = DispatchManagingActor(
72+
actor=actor,
73+
dispatch_type="UNIT_TEST",
74+
running_status_receiver=channel.new_receiver(),
75+
updates_sender=config_channel.new_sender(),
76+
)
77+
78+
runner_actor.start()
79+
80+
yield TestEnv(
81+
actor=actor,
82+
runner_actor=runner_actor,
83+
running_status_sender=channel.new_sender(),
84+
configuration_receiver=config_channel.new_receiver(),
85+
)
86+
87+
await runner_actor.stop()
88+
89+
90+
async def test_simple_start_stop(
91+
test_env: TestEnv,
92+
fake_time: time_machine.Coordinates,
93+
) -> None:
94+
"""Test behavior when receiving start/stop messages."""
95+
now = _now()
96+
duration = timedelta(minutes=10)
97+
dispatch = test_env.generator.generate_dispatch()
98+
dispatch = replace(
99+
dispatch,
100+
active=True,
101+
dry_run=False,
102+
duration=duration,
103+
start_time=now,
104+
payload={"test": True},
105+
type="UNIT_TEST",
106+
recurrence=replace(
107+
dispatch.recurrence,
108+
frequency=Frequency.UNSPECIFIED,
109+
),
110+
)
111+
112+
await test_env.running_status_sender.send(Dispatch(dispatch))
113+
fake_time.shift(timedelta(seconds=1))
114+
115+
event = await test_env.configuration_receiver.receive()
116+
assert event.options == {"test": True}
117+
assert event.components == dispatch.selector
118+
assert event.dry_run is False
119+
120+
assert test_env.actor.is_running is True
121+
122+
fake_time.shift(duration)
123+
await test_env.running_status_sender.send(Dispatch(dispatch))
124+
125+
# Give await actor.stop a chance to run in DispatchManagingActor
126+
await asyncio.sleep(0.1)
127+
128+
assert test_env.actor.is_running is False
129+
130+
131+
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
132+
"""Test the dry run mode."""
133+
dispatch = test_env.generator.generate_dispatch()
134+
dispatch = replace(
135+
dispatch,
136+
dry_run=True,
137+
active=True,
138+
start_time=_now(),
139+
duration=timedelta(minutes=10),
140+
type="UNIT_TEST",
141+
recurrence=replace(
142+
dispatch.recurrence,
143+
frequency=Frequency.UNSPECIFIED,
144+
),
145+
)
146+
147+
await test_env.running_status_sender.send(Dispatch(dispatch))
148+
fake_time.shift(timedelta(seconds=1))
149+
150+
event = await test_env.configuration_receiver.receive()
151+
152+
assert event.dry_run is dispatch.dry_run
153+
assert event.components == dispatch.selector
154+
assert event.options == dispatch.payload
155+
assert test_env.actor.is_running is True
156+
157+
assert dispatch.duration is not None
158+
fake_time.shift(dispatch.duration)
159+
await test_env.running_status_sender.send(Dispatch(dispatch))
160+
161+
# Give await actor.stop a chance to run in DispatchManagingActor
162+
await asyncio.sleep(0.1)
163+
164+
assert test_env.actor.is_running is False

0 commit comments

Comments
 (0)