@@ -51,47 +51,61 @@ class Dispatcher:
51
51
52
52
Example: Processing running state change dispatches
53
53
```python
54
+ import os
54
55
import grpc.aio
55
56
from unittest.mock import MagicMock
56
57
57
58
async def run():
58
- grpc_channel = grpc.aio.insecure_channel("localhost:50051")
59
- microgrid_id = 1
60
- service_address = "localhost:50051"
59
+ host = os.getenv("DISPATCH_API_HOST", "localhost")
60
+ port = os.getenv("DISPATCH_API_PORT", "50051")
61
61
62
+ service_address = f"{host}:{port}"
63
+ grpc_channel = grpc.aio.insecure_channel(service_address)
64
+ microgrid_id = 1
62
65
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
66
+ await dispatcher.start()
67
+
63
68
actor = MagicMock() # replace with your actor
64
69
65
70
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
66
71
67
72
async for dispatch in changed_running_status_rx:
73
+ if dispatch.type != "DEMO_TYPE":
74
+ continue
75
+
68
76
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
69
77
if dispatch.running:
70
78
if actor.is_running:
71
79
actor.reconfigure(
72
- components=dispatch.selector,
73
- run_parameters=dispatch.payload
74
80
) # this will reconfigure the actor
75
81
else:
76
- # this will start the actor
82
+ # this will start a new or reconfigure a running actor
77
83
# and run it for the duration of the dispatch
78
- actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run)
84
+ actor.start_or_reconfigure(
85
+ components=dispatch.selector,
86
+ run_parameters=dispatch.payload, # custom actor parameters
87
+ dry_run=dispatch.dry_run,
88
+ until=dispatch.until,
89
+ )
79
90
else:
80
91
actor.stop() # this will stop the actor
81
92
```
82
93
83
94
Example: Getting notification about dispatch lifecycle events
84
95
```python
96
+ import os
85
97
from typing import assert_never
86
98
87
99
import grpc.aio
88
100
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
89
101
90
-
91
102
async def run():
92
- grpc_channel = grpc.aio.insecure_channel("localhost:50051")
103
+ host = os.getenv("DISPATCH_API_HOST", "localhost")
104
+ port = os.getenv("DISPATCH_API_PORT", "50051")
105
+
106
+ service_address = f"{host}:{port}"
107
+ grpc_channel = grpc.aio.insecure_channel(service_address)
93
108
microgrid_id = 1
94
- service_address = "localhost:50051"
95
109
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
96
110
dispatcher.start() # this will start the actor
97
111
@@ -108,6 +122,46 @@ async def run():
108
122
case _ as unhandled:
109
123
assert_never(unhandled)
110
124
```
125
+ Example: Creating a new dispatch and then modifying it. Note that this uses
126
+ the lower-level `Client` class to create and update the dispatch.
127
+ ```python
128
+ import os
129
+ from datetime import datetime, timedelta, timezone
130
+
131
+ import grpc.aio
132
+ from frequenz.client.common.microgrid.components import ComponentCategory
133
+
134
+ from frequenz.dispatch import Dispatcher
135
+
136
+ async def run():
137
+ host = os.getenv("DISPATCH_API_HOST", "localhost")
138
+ port = os.getenv("DISPATCH_API_PORT", "50051")
139
+
140
+ service_address = f"{host}:{port}"
141
+ grpc_channel = grpc.aio.insecure_channel(service_address)
142
+ microgrid_id = 1
143
+ dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
144
+ await dispatcher.start() # this will start the actor
145
+
146
+ # Create a new dispatch
147
+ new_dispatch = await dispatcher.client.create(
148
+ microgrid_id=microgrid_id,
149
+ _type="ECHO_FREQUENCY", # replace with your own type
150
+ start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
151
+ duration=timedelta(minutes=5),
152
+ selector=ComponentCategory.INVERTER,
153
+ payload={"font": "Times New Roman"}, # Arbitrary payload data
154
+ )
155
+
156
+ # Modify the dispatch
157
+ await dispatcher.client.update(
158
+ dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
159
+ )
160
+
161
+ # Validate the modification
162
+ modified_dispatch = await dispatcher.client.get(new_dispatch.id)
163
+ assert modified_dispatch.duration == timedelta(minutes=10)
164
+ ```
111
165
"""
112
166
113
167
def __init__ (
0 commit comments