Skip to content

Commit 6af21be

Browse files
committed
Update & extend usage examples
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent f65dac0 commit 6af21be

File tree

1 file changed

+64
-10
lines changed

1 file changed

+64
-10
lines changed

src/frequenz/dispatch/_dispatcher.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,47 +51,61 @@ class Dispatcher:
5151
5252
Example: Processing running state change dispatches
5353
```python
54+
import os
5455
import grpc.aio
5556
from unittest.mock import MagicMock
5657
5758
async def run():
58-
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
59-
microgrid_id = 1
60-
service_address = "localhost:50051"
59+
host = os.getenv("DISPATCH_API_HOST", "localhost")
60+
port = os.getenv("DISPATCH_API_PORT", "50051")
6161
62+
service_address = f"{host}:{port}"
63+
grpc_channel = grpc.aio.insecure_channel(service_address)
64+
microgrid_id = 1
6265
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
66+
await dispatcher.start()
67+
6368
actor = MagicMock() # replace with your actor
6469
6570
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
6671
6772
async for dispatch in changed_running_status_rx:
73+
if dispatch.type != "DEMO_TYPE":
74+
continue
75+
6876
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
6977
if dispatch.running:
7078
if actor.is_running:
7179
actor.reconfigure(
72-
components=dispatch.selector,
73-
run_parameters=dispatch.payload
7480
) # this will reconfigure the actor
7581
else:
76-
# this will start the actor
82+
# this will start a new or reconfigure a running actor
7783
# and run it for the duration of the dispatch
78-
actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run)
84+
actor.start_or_reconfigure(
85+
components=dispatch.selector,
86+
run_parameters=dispatch.payload, # custom actor parameters
87+
dry_run=dispatch.dry_run,
88+
until=dispatch.until,
89+
)
7990
else:
8091
actor.stop() # this will stop the actor
8192
```
8293
8394
Example: Getting notification about dispatch lifecycle events
8495
```python
96+
import os
8597
from typing import assert_never
8698
8799
import grpc.aio
88100
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
89101
90-
91102
async def run():
92-
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
103+
host = os.getenv("DISPATCH_API_HOST", "localhost")
104+
port = os.getenv("DISPATCH_API_PORT", "50051")
105+
106+
service_address = f"{host}:{port}"
107+
grpc_channel = grpc.aio.insecure_channel(service_address)
93108
microgrid_id = 1
94-
service_address = "localhost:50051"
95109
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
96110
dispatcher.start() # this will start the actor
97111
@@ -108,6 +122,46 @@ async def run():
108122
case _ as unhandled:
109123
assert_never(unhandled)
110124
```
125+
Example: Creating a new dispatch and then modifying it. Note that this uses
126+
the lower-level `Client` class to create and update the dispatch.
127+
```python
128+
import os
129+
from datetime import datetime, timedelta, timezone
130+
131+
import grpc.aio
132+
from frequenz.client.common.microgrid.components import ComponentCategory
133+
134+
from frequenz.dispatch import Dispatcher
135+
136+
async def run():
137+
host = os.getenv("DISPATCH_API_HOST", "localhost")
138+
port = os.getenv("DISPATCH_API_PORT", "50051")
139+
140+
service_address = f"{host}:{port}"
141+
grpc_channel = grpc.aio.insecure_channel(service_address)
142+
microgrid_id = 1
143+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
144+
await dispatcher.start() # this will start the actor
145+
146+
# Create a new dispatch
147+
new_dispatch = await dispatcher.client.create(
148+
microgrid_id=microgrid_id,
149+
_type="ECHO_FREQUENCY", # replace with your own type
150+
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
151+
duration=timedelta(minutes=5),
152+
selector=ComponentCategory.INVERTER,
153+
payload={"font": "Times New Roman"}, # Arbitrary payload data
154+
)
155+
156+
# Modify the dispatch
157+
await dispatcher.client.update(
158+
dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
159+
)
160+
161+
# Validate the modification
162+
modified_dispatch = await dispatcher.client.get(new_dispatch.id)
163+
assert modified_dispatch.duration == timedelta(minutes=10)
164+
```
111165
"""
112166

113167
def __init__(

0 commit comments

Comments
 (0)