Skip to content

Commit 66cbcb5

Browse files
committed
Enhance & extend documentation and examples
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e2f6a70 commit 66cbcb5

File tree

3 files changed

+178
-103
lines changed

3 files changed

+178
-103
lines changed

README.md

Lines changed: 89 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,98 @@
77
## Introduction
88

99
A highlevel interface for the dispatch API.
10-
The interface is made of the dispatch actor which should run once per microgrid.
11-
It provides two channels for clients:
12-
- "new_dispatches" for newly created dispatches
13-
- "ready_dispatches" for dispatches that are ready to be executed
1410

15-
## Example Usage
11+
A small overview of the most important classes in this package:
12+
13+
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
14+
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
15+
* [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
16+
17+
## Usage
18+
19+
The `Dispatcher` class, the main entry point for the API, provides two channels:
20+
21+
* [Lifecycle events][frequenz.dispatch.Dispatcher.lifecycle_events]: A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
22+
* [Running status change][frequenz.dispatch.Dispatcher.running_status_change]: Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.
23+
24+
### Example using the running status change channel
1625

1726
```python
18-
async def run():
19-
# dispatch helper sends out dispatches when they are due
20-
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
21-
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()
22-
23-
async for selected in select(dispatch_ready, dispatch_arrived):
24-
if selected_from(selected, dispatch_ready):
25-
dispatch = selected.value
26-
match dispatch.type:
27-
case DISPATCH_TYPE_BATTERY_CHARGE:
28-
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
29-
battery_pool.set_charge(dispatch.power)
30-
...
31-
if selected_from(selected, dispatch_arrived):
32-
match selected.value:
33-
case Created(dispatch):
34-
log.info("New dispatch arrived %s", dispatch)
35-
...
36-
case Updated(dispatch):
37-
log.info("Dispatch updated %s", dispatch)
38-
...
39-
case Deleted(dispatch):
40-
log.info("Dispatch deleted %s", dispatch)
41-
...
27+
import os
28+
import grpc.aio
29+
from unittest.mock import MagicMock
30+
31+
async def run():
32+
host = os.getenv("DISPATCH_API_HOST", "localhost")
33+
port = os.getenv("DISPATCH_API_PORT", "50051")
34+
35+
service_address = f"{host}:{port}"
36+
grpc_channel = grpc.aio.insecure_channel(service_address)
37+
microgrid_id = 1
38+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
39+
await dispatcher.start()
40+
41+
actor = MagicMock() # replace with your actor
42+
43+
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
44+
45+
async for dispatch in changed_running_status_rx:
46+
match dispatch.running("DEMO_TYPE"):
47+
case RunningState.RUNNING:
48+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
49+
if actor.is_running:
50+
actor.reconfigure(
51+
components=dispatch.selector,
52+
run_parameters=dispatch.payload, # custom actor parameters
53+
dry_run=dispatch.dry_run,
54+
until=dispatch.until,
55+
) # this will reconfigure the actor
56+
else:
57+
# this will start a new actor with the given components
58+
# and run it for the duration of the dispatch
59+
actor.start(
60+
components=dispatch.selector,
61+
run_parameters=dispatch.payload, # custom actor parameters
62+
dry_run=dispatch.dry_run,
63+
until=dispatch.until,
64+
)
65+
case RunningState.STOPPED:
66+
actor.stop() # this will stop the actor
67+
case RunningState.DIFFERENT_TYPE:
68+
pass # dispatch not for this type
69+
```
70+
71+
### Example using the lifecycle events channel
72+
73+
```python
74+
import os
75+
from typing import assert_never
76+
77+
import grpc.aio
78+
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
79+
80+
async def run():
81+
host = os.getenv("DISPATCH_API_HOST", "localhost")
82+
port = os.getenv("DISPATCH_API_PORT", "50051")
83+
84+
service_address = f"{host}:{port}"
85+
grpc_channel = grpc.aio.insecure_channel(service_address)
86+
microgrid_id = 1
87+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
88+
dispatcher.start() # this will start the actor
89+
90+
events_receiver = dispatcher.lifecycle_events.new_receiver()
91+
92+
async for event in events_receiver:
93+
match event:
94+
case Created(dispatch):
95+
print(f"A dispatch was created: {dispatch}")
96+
case Deleted(dispatch):
97+
print(f"A dispatch was deleted: {dispatch}")
98+
case Updated(dispatch):
99+
print(f"A dispatch was updated: {dispatch}")
100+
case _ as unhandled:
101+
assert_never(unhandled)
42102
```
43103

44104
## Supported Platforms

src/frequenz/dispatch/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""A highlevel interface for the dispatch API."""
4+
"""A highlevel interface for the dispatch API.
5+
6+
A small overview of the most important classes in this module:
7+
8+
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
9+
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [Created][frequenz.dispatch.Created],
11+
[Updated][frequenz.dispatch.Updated],
12+
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
13+
14+
"""
515

616
from ._dispatch import Dispatch, RunningState
717
from ._dispatcher import Dispatcher, ReceiverFetcher

src/frequenz/dispatch/_dispatcher.py

Lines changed: 78 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,20 @@ class Dispatcher:
4242
This class provides a highlevel interface to the dispatch API.
4343
It provides two channels:
4444
45-
One that sends a dispatch event message whenever a dispatch is created, updated or deleted.
45+
Lifecycle events: A channel that sends a dispatch event message whenever a
46+
dispatch is created, updated or deleted.
4647
47-
The other sends a dispatch message whenever a dispatch is ready to be
48-
executed according to the schedule or the running status of the dispatch
49-
changed in a way that could potentially require the actor to start, stop or
50-
reconfigure itself.
48+
Running status change: Sends a dispatch message whenever a dispatch is ready
49+
to be executed according to the schedule or the running status of the
50+
dispatch changed in a way that could potentially require the actor to start,
51+
stop or reconfigure itself.
5152
5253
Example: Processing running state change dispatches
5354
5455
```python
5556
import os
5657
import grpc.aio
58+
from frequenz.dispatch import Dispatcher, RunningState
5759
from unittest.mock import MagicMock
5860
5961
async def run():
@@ -97,76 +99,79 @@ async def run():
9799
```
98100
99101
Example: Getting notification about dispatch lifecycle events
100-
```python
101-
import os
102-
from typing import assert_never
103-
104-
import grpc.aio
105-
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
106-
107-
async def run():
108-
host = os.getenv("DISPATCH_API_HOST", "localhost")
109-
port = os.getenv("DISPATCH_API_PORT", "50051")
110-
111-
service_address = f"{host}:{port}"
112-
grpc_channel = grpc.aio.insecure_channel(service_address)
113-
microgrid_id = 1
114-
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
115-
dispatcher.start() # this will start the actor
116-
117-
events_receiver = dispatcher.lifecycle_events.new_receiver()
118-
119-
async for event in events_receiver:
120-
match event:
121-
case Created(dispatch):
122-
print(f"A dispatch was created: {dispatch}")
123-
case Deleted(dispatch):
124-
print(f"A dispatch was deleted: {dispatch}")
125-
case Updated(dispatch):
126-
print(f"A dispatch was updated: {dispatch}")
127-
case _ as unhandled:
128-
assert_never(unhandled)
129-
```
102+
103+
```python
104+
import os
105+
from typing import assert_never
106+
107+
import grpc.aio
108+
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
109+
110+
async def run():
111+
host = os.getenv("DISPATCH_API_HOST", "localhost")
112+
port = os.getenv("DISPATCH_API_PORT", "50051")
113+
114+
service_address = f"{host}:{port}"
115+
grpc_channel = grpc.aio.insecure_channel(service_address)
116+
microgrid_id = 1
117+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
118+
dispatcher.start() # this will start the actor
119+
120+
events_receiver = dispatcher.lifecycle_events.new_receiver()
121+
122+
async for event in events_receiver:
123+
match event:
124+
case Created(dispatch):
125+
print(f"A dispatch was created: {dispatch}")
126+
case Deleted(dispatch):
127+
print(f"A dispatch was deleted: {dispatch}")
128+
case Updated(dispatch):
129+
print(f"A dispatch was updated: {dispatch}")
130+
case _ as unhandled:
131+
assert_never(unhandled)
132+
```
133+
130134
Example: Creating a new dispatch and then modifying it. Note that this uses
131135
the lower-level `Client` class to create and update the dispatch.
132-
```python
133-
import os
134-
from datetime import datetime, timedelta, timezone
135-
136-
import grpc.aio
137-
from frequenz.client.common.microgrid.components import ComponentCategory
138-
139-
from frequenz.dispatch import Dispatcher
140-
141-
async def run():
142-
host = os.getenv("DISPATCH_API_HOST", "localhost")
143-
port = os.getenv("DISPATCH_API_PORT", "50051")
144-
145-
service_address = f"{host}:{port}"
146-
grpc_channel = grpc.aio.insecure_channel(service_address)
147-
microgrid_id = 1
148-
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
149-
await dispatcher.start() # this will start the actor
150-
151-
# Create a new dispatch
152-
new_dispatch = await dispatcher.client.create(
153-
microgrid_id=microgrid_id,
154-
_type="ECHO_FREQUENCY", # replace with your own type
155-
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
156-
duration=timedelta(minutes=5),
157-
selector=ComponentCategory.INVERTER,
158-
payload={"font": "Times New Roman"}, # Arbitrary payload data
159-
)
160-
161-
# Modify the dispatch
162-
await dispatcher.client.update(
163-
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
164-
)
165-
166-
# Validate the modification
167-
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
168-
assert modified_dispatch.duration == timedelta(minutes=10)
169-
```
136+
137+
```python
138+
import os
139+
from datetime import datetime, timedelta, timezone
140+
141+
import grpc.aio
142+
from frequenz.client.common.microgrid.components import ComponentCategory
143+
144+
from frequenz.dispatch import Dispatcher
145+
146+
async def run():
147+
host = os.getenv("DISPATCH_API_HOST", "localhost")
148+
port = os.getenv("DISPATCH_API_PORT", "50051")
149+
150+
service_address = f"{host}:{port}"
151+
grpc_channel = grpc.aio.insecure_channel(service_address)
152+
microgrid_id = 1
153+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
154+
await dispatcher.start() # this will start the actor
155+
156+
# Create a new dispatch
157+
new_dispatch = await dispatcher.client.create(
158+
microgrid_id=microgrid_id,
159+
_type="ECHO_FREQUENCY", # replace with your own type
160+
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
161+
duration=timedelta(minutes=5),
162+
selector=ComponentCategory.INVERTER,
163+
payload={"font": "Times New Roman"}, # Arbitrary payload data
164+
)
165+
166+
# Modify the dispatch
167+
await dispatcher.client.update(
168+
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
169+
)
170+
171+
# Validate the modification
172+
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
173+
assert modified_dispatch.duration == timedelta(minutes=10)
174+
```
170175
"""
171176

172177
def __init__(

0 commit comments

Comments
 (0)