Skip to content

Commit 0ff0cdf

Browse files
committed
Fix that every actor instance receives every update for their type
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 77a581e commit 0ff0cdf

File tree

3 files changed

+142
-21
lines changed

3 files changed

+142
-21
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
* ActorDispatcher: Fix that every instance of the same type receives every dispatch update

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from datetime import timedelta
1111
from typing import Any, Awaitable
1212

13-
from frequenz.channels import Broadcast, Receiver, select
13+
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.client.dispatch.types import TargetComponents
1515
from frequenz.sdk.actor import Actor, BackgroundService
1616

@@ -189,6 +189,19 @@ async def _retry_after_delay(self, dispatch: Dispatch) -> None:
189189
_logger.info("Retrying dispatch %s now", dispatch.id)
190190
await self._sender.send(dispatch)
191191

192+
@dataclass(frozen=True, kw_only=True)
193+
class ActorAndChannel:
194+
"""Actor and its sender."""
195+
196+
actor: Actor
197+
"""The actor."""
198+
199+
channel: Broadcast[DispatchInfo]
200+
"""The channel for dispatch updates."""
201+
202+
sender: Sender[DispatchInfo]
203+
"""The sender for dispatch updates."""
204+
192205
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
193206
self,
194207
actor_factory: Callable[
@@ -215,11 +228,8 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
215228

216229
self._dispatch_rx = running_status_receiver
217230
self._actor_factory = actor_factory
218-
self._actors: dict[int, Actor] = {}
219-
self._updates_channel = Broadcast[DispatchInfo](
220-
name="dispatch_updates_channel", resend_latest=True
221-
)
222-
self._updates_sender = self._updates_channel.new_sender()
231+
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
232+
223233
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
224234

225235
def start(self) -> None:
@@ -236,24 +246,25 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
236246
)
237247

238248
identity = self._dispatch_identity(dispatch)
239-
actor: Actor | None = self._actors.get(identity)
249+
actor_and_channel = self._actors.get(identity)
240250

241-
if actor:
242-
sent_str = ""
243-
if self._updates_sender is not None:
244-
sent_str = ", sent a dispatch update instead of creating a new actor"
245-
await self._updates_sender.send(dispatch_update)
251+
if actor_and_channel:
252+
await actor_and_channel.sender.send(dispatch_update)
246253
_logger.info(
247-
"Actor for dispatch type %r is already running%s",
254+
"Actor for dispatch type %r is already running, "
255+
"sent a dispatch update instead of creating a new actor",
248256
dispatch.type,
249-
sent_str,
250257
)
251258
else:
252259
try:
253260
_logger.info("Starting actor for dispatch type %r", dispatch.type)
261+
channel = Broadcast[DispatchInfo](
262+
name=f"dispatch_updates_channel_instance={identity}",
263+
resend_latest=True,
264+
)
254265
actor = await self._actor_factory(
255266
dispatch_update,
256-
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
267+
channel.new_receiver(limit=1, warn_on_overflow=False),
257268
)
258269

259270
actor.start()
@@ -267,7 +278,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
267278
self._retrier.retry(dispatch)
268279
else:
269280
# No exception occurred, so we can add the actor to the list
270-
self._actors[identity] = actor
281+
self._actors[identity] = ActorDispatcher.ActorAndChannel(
282+
actor=actor, channel=channel, sender=channel.new_sender()
283+
)
271284

272285
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
273286
"""Stop all actors.
@@ -278,8 +291,9 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
278291
"""
279292
identity = self._dispatch_identity(stopping_dispatch)
280293

281-
if actor := self._actors.pop(identity, None):
282-
await actor.stop(msg)
294+
if actor_and_channel := self._actors.pop(identity, None):
295+
await actor_and_channel.actor.stop(msg)
296+
await actor_and_channel.channel.close()
283297
else:
284298
_logger.warning(
285299
"Actor for dispatch type %r is not running", stopping_dispatch.type

tests/test_mananging_actor.py

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import time_machine
1616
from frequenz.channels import Broadcast, Receiver, Sender
1717
from frequenz.client.dispatch import recurrence
18-
from frequenz.client.dispatch.recurrence import Frequency
18+
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
1919
from frequenz.client.dispatch.test.client import FakeClient
2020
from frequenz.client.dispatch.test.generator import DispatchGenerator
2121
from frequenz.sdk.actor import Actor
@@ -104,9 +104,17 @@ def actor(self, identity: int) -> MockActor:
104104
"""Return the actor."""
105105
# pylint: disable=protected-access
106106
assert identity in self.actors_service._actors
107-
return cast(MockActor, self.actors_service._actors[identity])
107+
return cast(MockActor, self.actors_service._actors[identity].actor)
108108
# pylint: enable=protected-access
109109

110+
def is_running(self, identity: int) -> bool:
111+
"""Return whether the actor is running."""
112+
# pylint: disable-next=protected-access
113+
if identity not in self.actors_service._actors:
114+
return False
115+
116+
return self.actor(identity).is_running
117+
110118

111119
@fixture
112120
async def test_env() -> AsyncIterator[_TestEnv]:
@@ -383,3 +391,102 @@ async def new_mock_receiver(
383391

384392
# Check if actor instance is created
385393
assert identity(dispatch) in actor_manager._actors
394+
395+
396+
async def test_actor_dispatcher_update_isolation(
397+
test_env: _TestEnv,
398+
fake_time: time_machine.Coordinates,
399+
) -> None:
400+
"""Test that updates for one dispatch don't affect other actors of the same type."""
401+
dispatch_type = "ISOLATION_TEST"
402+
start_time = _now()
403+
duration = timedelta(minutes=5)
404+
405+
# Create first dispatch
406+
dispatch1_spec = replace(
407+
test_env.generator.generate_dispatch(),
408+
id=101, # Unique ID
409+
type=dispatch_type,
410+
active=True,
411+
dry_run=False,
412+
start_time=start_time + timedelta(seconds=1), # Stagger start slightly
413+
duration=duration,
414+
payload={"instance": 1},
415+
recurrence=RecurrenceRule(),
416+
)
417+
dispatch1 = Dispatch(dispatch1_spec)
418+
419+
# Create second dispatch of the same type, different ID
420+
dispatch2_spec = replace(
421+
test_env.generator.generate_dispatch(),
422+
id=102, # Unique ID
423+
type=dispatch_type, # Same type
424+
active=True,
425+
dry_run=False,
426+
start_time=start_time + timedelta(seconds=2), # Stagger start slightly
427+
duration=duration,
428+
payload={"instance": 2},
429+
recurrence=RecurrenceRule(),
430+
)
431+
dispatch2 = Dispatch(dispatch2_spec)
432+
433+
# Send dispatch 1 to start actor 1
434+
# print(f"Sending dispatch 1: {dispatch1}")
435+
await test_env.running_status_sender.send(dispatch1)
436+
fake_time.shift(timedelta(seconds=1.1)) # Move time past dispatch1 start
437+
await asyncio.sleep(0.1) # Allow actor to start
438+
439+
assert test_env.is_running(101), "Actor 1 should be running"
440+
actor1 = test_env.actor(101)
441+
assert actor1 is not None
442+
# pylint: disable-next=protected-access
443+
assert actor1.initial_dispatch._src.id == 101
444+
assert actor1.initial_dispatch.options == {"instance": 1}
445+
assert not test_env.is_running(102), "Actor 2 should not be running yet"
446+
447+
# Send dispatch 2 to start actor 2
448+
# print(f"Sending dispatch 2: {dispatch2}")
449+
await test_env.running_status_sender.send(dispatch2)
450+
fake_time.shift(timedelta(seconds=1)) # Move time past dispatch2 start
451+
await asyncio.sleep(0.1) # Allow actor to start
452+
453+
assert test_env.actor(101).is_running, "Actor 1 should still be running"
454+
assert test_env.actor(102).is_running, "Actor 2 should now be running"
455+
actor2 = test_env.actor(102)
456+
assert actor2 is not None
457+
# pylint: disable-next=protected-access
458+
assert actor2.initial_dispatch._src.id == 102
459+
assert actor2.initial_dispatch.options == {"instance": 2}
460+
461+
# Now, send an update to stop dispatch 1
462+
dispatch1_stop = Dispatch(
463+
replace(dispatch1_spec, duration=timedelta(seconds=1), active=False)
464+
)
465+
# print(f"Sending stop for dispatch 1: {dispatch1_stop}")
466+
await test_env.running_status_sender.send(dispatch1_stop)
467+
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop
468+
469+
# THE CORE ASSERTION: Actor 1 should stop, Actor 2 should remain running
470+
# pylint: disable=protected-access
471+
assert (
472+
101 not in test_env.actors_service._actors
473+
), "Actor 1 should have been removed"
474+
# pylint: enable=protected-access
475+
assert (
476+
test_env.actor(102).is_running is True
477+
), "Actor 2 should be running after Actor 1 stopped"
478+
# Double check actor1 object state if needed (though removal is stronger check)
479+
# assert not actor1.is_running
480+
481+
# Cleanup: Stop Actor 2
482+
dispatch2_stop = Dispatch(replace(dispatch2_spec, active=False))
483+
# print(f"Sending stop for dispatch 2: {dispatch2_stop}")
484+
await test_env.running_status_sender.send(dispatch2_stop)
485+
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop
486+
487+
# pylint: disable=protected-access
488+
assert (
489+
102 not in test_env.actors_service._actors
490+
), "Actor 2 should have been removed"
491+
# pylint: enable=protected-access
492+
assert not test_env.is_running(102), "Actor 2 should be stopped"

0 commit comments

Comments
 (0)