Skip to content

Commit dd42670

Browse files
committed
Enhance & extend documentation and examples
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e2f6a70 commit dd42670

File tree

3 files changed

+175
-103
lines changed

3 files changed

+175
-103
lines changed

README.md

Lines changed: 89 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,98 @@
77
## Introduction
88

99
A highlevel interface for the dispatch API.
10-
The interface is made of the dispatch actor which should run once per microgrid.
11-
It provides two channels for clients:
12-
- "new_dispatches" for newly created dispatches
13-
- "ready_dispatches" for dispatches that are ready to be executed
1410

15-
## Example Usage
11+
A small overview of the most important classes in this package:
12+
13+
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
14+
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
15+
* [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
16+
17+
## Usage
18+
19+
The `Dispatcher` class, the main entry point for the API, provides two channels:
20+
21+
* [Lifecycle events][frequenz.dispatch.Dispatcher.lifecycle_events]: A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
22+
* [Running status change][frequenz.dispatch.Dispatcher.running_status_change]: Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.
23+
24+
### Example using the running status change channel
1625

1726
```python
18-
async def run():
19-
# dispatch helper sends out dispatches when they are due
20-
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
21-
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()
22-
23-
async for selected in select(dispatch_ready, dispatch_arrived):
24-
if selected_from(selected, dispatch_ready):
25-
dispatch = selected.value
26-
match dispatch.type:
27-
case DISPATCH_TYPE_BATTERY_CHARGE:
28-
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
29-
battery_pool.set_charge(dispatch.power)
30-
...
31-
if selected_from(selected, dispatch_arrived):
32-
match selected.value:
33-
case Created(dispatch):
34-
log.info("New dispatch arrived %s", dispatch)
35-
...
36-
case Updated(dispatch):
37-
log.info("Dispatch updated %s", dispatch)
38-
...
39-
case Deleted(dispatch):
40-
log.info("Dispatch deleted %s", dispatch)
41-
...
27+
import os
28+
import grpc.aio
29+
from unittest.mock import MagicMock
30+
31+
async def run():
32+
host = os.getenv("DISPATCH_API_HOST", "localhost")
33+
port = os.getenv("DISPATCH_API_PORT", "50051")
34+
35+
service_address = f"{host}:{port}"
36+
grpc_channel = grpc.aio.insecure_channel(service_address)
37+
microgrid_id = 1
38+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
39+
await dispatcher.start()
40+
41+
actor = MagicMock() # replace with your actor
42+
43+
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
44+
45+
async for dispatch in changed_running_status_rx:
46+
match dispatch.running("DEMO_TYPE"):
47+
case RunningState.RUNNING:
48+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
49+
if actor.is_running:
50+
actor.reconfigure(
51+
components=dispatch.selector,
52+
run_parameters=dispatch.payload, # custom actor parameters
53+
dry_run=dispatch.dry_run,
54+
until=dispatch.until,
55+
) # this will reconfigure the actor
56+
else:
57+
# this will start a new actor with the given components
58+
# and run it for the duration of the dispatch
59+
actor.start(
60+
components=dispatch.selector,
61+
run_parameters=dispatch.payload, # custom actor parameters
62+
dry_run=dispatch.dry_run,
63+
until=dispatch.until,
64+
)
65+
case RunningState.STOPPED:
66+
actor.stop() # this will stop the actor
67+
case RunningState.DIFFERENT_TYPE:
68+
pass # dispatch not for this type
69+
```
70+
71+
### Example using the lifecycle events channel
72+
73+
```python
74+
import os
75+
from typing import assert_never
76+
77+
import grpc.aio
78+
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
79+
80+
async def run():
81+
host = os.getenv("DISPATCH_API_HOST", "localhost")
82+
port = os.getenv("DISPATCH_API_PORT", "50051")
83+
84+
service_address = f"{host}:{port}"
85+
grpc_channel = grpc.aio.insecure_channel(service_address)
86+
microgrid_id = 1
87+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
88+
dispatcher.start() # this will start the actor
89+
90+
events_receiver = dispatcher.lifecycle_events.new_receiver()
91+
92+
async for event in events_receiver:
93+
match event:
94+
case Created(dispatch):
95+
print(f"A dispatch was created: {dispatch}")
96+
case Deleted(dispatch):
97+
print(f"A dispatch was deleted: {dispatch}")
98+
case Updated(dispatch):
99+
print(f"A dispatch was updated: {dispatch}")
100+
case _ as unhandled:
101+
assert_never(unhandled)
42102
```
43103

44104
## Supported Platforms

src/frequenz/dispatch/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""A highlevel interface for the dispatch API."""
4+
"""A highlevel interface for the dispatch API.
5+
6+
A small overview of the most important classes in this module:
7+
8+
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
9+
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
11+
12+
"""
513

614
from ._dispatch import Dispatch, RunningState
715
from ._dispatcher import Dispatcher, ReceiverFetcher

src/frequenz/dispatch/_dispatcher.py

Lines changed: 77 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ class Dispatcher:
4242
This class provides a highlevel interface to the dispatch API.
4343
It provides two channels:
4444
45-
One that sends a dispatch event message whenever a dispatch is created, updated or deleted.
45+
Lifecycle events: A channel that sends a dispatch event message whenever a
46+
dispatch is created, updated or deleted.
4647
47-
The other sends a dispatch message whenever a dispatch is ready to be
48-
executed according to the schedule or the running status of the dispatch
49-
changed in a way that could potentially require the actor to start, stop or
50-
reconfigure itself.
48+
Running status change: Sends a dispatch message whenever a dispatch is ready
49+
to be executed according to the schedule or the running status of the
50+
dispatch changed in a way that could potentially require the actor to start,
51+
stop or reconfigure itself.
5152
5253
Example: Processing running state change dispatches
5354
@@ -97,76 +98,79 @@ async def run():
9798
```
9899
99100
Example: Getting notification about dispatch lifecycle events
100-
```python
101-
import os
102-
from typing import assert_never
103-
104-
import grpc.aio
105-
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
106-
107-
async def run():
108-
host = os.getenv("DISPATCH_API_HOST", "localhost")
109-
port = os.getenv("DISPATCH_API_PORT", "50051")
110-
111-
service_address = f"{host}:{port}"
112-
grpc_channel = grpc.aio.insecure_channel(service_address)
113-
microgrid_id = 1
114-
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
115-
dispatcher.start() # this will start the actor
116-
117-
events_receiver = dispatcher.lifecycle_events.new_receiver()
118-
119-
async for event in events_receiver:
120-
match event:
121-
case Created(dispatch):
122-
print(f"A dispatch was created: {dispatch}")
123-
case Deleted(dispatch):
124-
print(f"A dispatch was deleted: {dispatch}")
125-
case Updated(dispatch):
126-
print(f"A dispatch was updated: {dispatch}")
127-
case _ as unhandled:
128-
assert_never(unhandled)
129-
```
101+
102+
```python
103+
import os
104+
from typing import assert_never
105+
106+
import grpc.aio
107+
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
108+
109+
async def run():
110+
host = os.getenv("DISPATCH_API_HOST", "localhost")
111+
port = os.getenv("DISPATCH_API_PORT", "50051")
112+
113+
service_address = f"{host}:{port}"
114+
grpc_channel = grpc.aio.insecure_channel(service_address)
115+
microgrid_id = 1
116+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
117+
dispatcher.start() # this will start the actor
118+
119+
events_receiver = dispatcher.lifecycle_events.new_receiver()
120+
121+
async for event in events_receiver:
122+
match event:
123+
case Created(dispatch):
124+
print(f"A dispatch was created: {dispatch}")
125+
case Deleted(dispatch):
126+
print(f"A dispatch was deleted: {dispatch}")
127+
case Updated(dispatch):
128+
print(f"A dispatch was updated: {dispatch}")
129+
case _ as unhandled:
130+
assert_never(unhandled)
131+
```
132+
130133
Example: Creating a new dispatch and then modifying it. Note that this uses
131134
the lower-level `Client` class to create and update the dispatch.
132-
```python
133-
import os
134-
from datetime import datetime, timedelta, timezone
135-
136-
import grpc.aio
137-
from frequenz.client.common.microgrid.components import ComponentCategory
138-
139-
from frequenz.dispatch import Dispatcher
140-
141-
async def run():
142-
host = os.getenv("DISPATCH_API_HOST", "localhost")
143-
port = os.getenv("DISPATCH_API_PORT", "50051")
144-
145-
service_address = f"{host}:{port}"
146-
grpc_channel = grpc.aio.insecure_channel(service_address)
147-
microgrid_id = 1
148-
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
149-
await dispatcher.start() # this will start the actor
150-
151-
# Create a new dispatch
152-
new_dispatch = await dispatcher.client.create(
153-
microgrid_id=microgrid_id,
154-
_type="ECHO_FREQUENCY", # replace with your own type
155-
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
156-
duration=timedelta(minutes=5),
157-
selector=ComponentCategory.INVERTER,
158-
payload={"font": "Times New Roman"}, # Arbitrary payload data
159-
)
160-
161-
# Modify the dispatch
162-
await dispatcher.client.update(
163-
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
164-
)
165-
166-
# Validate the modification
167-
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
168-
assert modified_dispatch.duration == timedelta(minutes=10)
169-
```
135+
136+
```python
137+
import os
138+
from datetime import datetime, timedelta, timezone
139+
140+
import grpc.aio
141+
from frequenz.client.common.microgrid.components import ComponentCategory
142+
143+
from frequenz.dispatch import Dispatcher
144+
145+
async def run():
146+
host = os.getenv("DISPATCH_API_HOST", "localhost")
147+
port = os.getenv("DISPATCH_API_PORT", "50051")
148+
149+
service_address = f"{host}:{port}"
150+
grpc_channel = grpc.aio.insecure_channel(service_address)
151+
microgrid_id = 1
152+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
153+
await dispatcher.start() # this will start the actor
154+
155+
# Create a new dispatch
156+
new_dispatch = await dispatcher.client.create(
157+
microgrid_id=microgrid_id,
158+
_type="ECHO_FREQUENCY", # replace with your own type
159+
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
160+
duration=timedelta(minutes=5),
161+
selector=ComponentCategory.INVERTER,
162+
payload={"font": "Times New Roman"}, # Arbitrary payload data
163+
)
164+
165+
# Modify the dispatch
166+
await dispatcher.client.update(
167+
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
168+
)
169+
170+
# Validate the modification
171+
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
172+
assert modified_dispatch.duration == timedelta(minutes=10)
173+
```
170174
"""
171175

172176
def __init__(

0 commit comments

Comments
 (0)