Skip to content

Commit e0c5d8d

Browse files
committed
Turn Dispatcher into a background service
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent f3b4146 commit e0c5d8d

File tree

1 file changed

+28
-7
lines changed

1 file changed

+28
-7
lines changed

src/frequenz/dispatch/_dispatcher.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77

88
import logging
99
from asyncio import Event
10-
from typing import Any, Callable, Generator
10+
from typing import Callable
1111

1212
from frequenz.channels import Receiver
1313
from frequenz.client.dispatch import Client
14-
from frequenz.sdk.actor import Actor
14+
from frequenz.sdk.actor import Actor, BackgroundService
15+
from typing_extensions import override
1516

1617
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
1718
from ._bg_service import DispatchScheduler, MergeStrategy
@@ -22,7 +23,7 @@
2223
_logger = logging.getLogger(__name__)
2324

2425

25-
class Dispatcher:
26+
class Dispatcher(BackgroundService):
2627
"""A highlevel interface for the dispatch API.
2728
2829
This class provides a highlevel interface to the dispatch API.
@@ -183,6 +184,8 @@ def __init__(
183184
server_url: The URL of the dispatch service.
184185
key: The key to access the service.
185186
"""
187+
super().__init__(name="Dispatcher")
188+
186189
self._client = Client(server_url=server_url, key=key)
187190
self._bg_service = DispatchScheduler(
188191
microgrid_id,
@@ -192,10 +195,32 @@ def __init__(
192195
self._empty_event = Event()
193196
self._empty_event.set()
194197

198+
@override
195199
def start(self) -> None:
196200
"""Start the local dispatch service."""
197201
self._bg_service.start()
198202

203+
@property
204+
@override
205+
def is_running(self) -> bool:
206+
"""Whether the local dispatch service is running."""
207+
return self._bg_service.is_running
208+
209+
@override
210+
async def wait(self) -> None:
211+
"""Wait until all actor dispatches are stopped."""
212+
await self._empty_event.wait()
213+
214+
@override
215+
def cancel(self, msg: str | None = None) -> None:
216+
"""Stop the local dispatch service."""
217+
self._bg_service.cancel(msg)
218+
219+
for instance in self._actor_dispatchers.values():
220+
instance.cancel()
221+
222+
self._actor_dispatchers.clear()
223+
199224
async def start_dispatching(
200225
self,
201226
dispatch_type: str,
@@ -256,10 +281,6 @@ async def stop_dispatching(self, dispatch_type: str) -> None:
256281
if not self._actor_dispatchers:
257282
self._empty_event.set()
258283

259-
def __await__(self) -> Generator[Any, None, bool]:
260-
"""Wait until all actor dispatches are stopped."""
261-
return self._empty_event.wait().__await__()
262-
263284
@property
264285
def client(self) -> Client:
265286
"""Return the client."""

0 commit comments

Comments
 (0)