Skip to content

Commit 81d115f

Browse files
authored
Fix that every actor instance receives every update for their type (#135)
2 parents 77a581e + 8ec74c4 commit 81d115f

File tree

3 files changed

+143
-21
lines changed

3 files changed

+143
-21
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
* ActorDispatcher: Fix that every actor instance wrongly received all updates for their dispatch type. This is only relevant to you if your actor has more than one running instance at any time.
18+

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from datetime import timedelta
1111
from typing import Any, Awaitable
1212

13-
from frequenz.channels import Broadcast, Receiver, select
13+
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.client.dispatch.types import TargetComponents
1515
from frequenz.sdk.actor import Actor, BackgroundService
1616

@@ -189,6 +189,19 @@ async def _retry_after_delay(self, dispatch: Dispatch) -> None:
189189
_logger.info("Retrying dispatch %s now", dispatch.id)
190190
await self._sender.send(dispatch)
191191

192+
@dataclass(frozen=True, kw_only=True)
193+
class ActorAndChannel:
194+
"""Actor and its sender."""
195+
196+
actor: Actor
197+
"""The actor."""
198+
199+
channel: Broadcast[DispatchInfo]
200+
"""The channel for dispatch updates."""
201+
202+
sender: Sender[DispatchInfo]
203+
"""The sender for dispatch updates."""
204+
192205
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
193206
self,
194207
actor_factory: Callable[
@@ -215,11 +228,8 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
215228

216229
self._dispatch_rx = running_status_receiver
217230
self._actor_factory = actor_factory
218-
self._actors: dict[int, Actor] = {}
219-
self._updates_channel = Broadcast[DispatchInfo](
220-
name="dispatch_updates_channel", resend_latest=True
221-
)
222-
self._updates_sender = self._updates_channel.new_sender()
231+
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
232+
223233
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
224234

225235
def start(self) -> None:
@@ -236,24 +246,25 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
236246
)
237247

238248
identity = self._dispatch_identity(dispatch)
239-
actor: Actor | None = self._actors.get(identity)
249+
actor_and_channel = self._actors.get(identity)
240250

241-
if actor:
242-
sent_str = ""
243-
if self._updates_sender is not None:
244-
sent_str = ", sent a dispatch update instead of creating a new actor"
245-
await self._updates_sender.send(dispatch_update)
251+
if actor_and_channel:
252+
await actor_and_channel.sender.send(dispatch_update)
246253
_logger.info(
247-
"Actor for dispatch type %r is already running%s",
254+
"Actor for dispatch type %r is already running, "
255+
"sent a dispatch update instead of creating a new actor",
248256
dispatch.type,
249-
sent_str,
250257
)
251258
else:
252259
try:
253260
_logger.info("Starting actor for dispatch type %r", dispatch.type)
261+
channel = Broadcast[DispatchInfo](
262+
name=f"dispatch_updates_channel_instance={identity}",
263+
resend_latest=True,
264+
)
254265
actor = await self._actor_factory(
255266
dispatch_update,
256-
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
267+
channel.new_receiver(limit=1, warn_on_overflow=False),
257268
)
258269

259270
actor.start()
@@ -267,7 +278,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
267278
self._retrier.retry(dispatch)
268279
else:
269280
# No exception occurred, so we can add the actor to the list
270-
self._actors[identity] = actor
281+
self._actors[identity] = ActorDispatcher.ActorAndChannel(
282+
actor=actor, channel=channel, sender=channel.new_sender()
283+
)
271284

272285
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
273286
"""Stop all actors.
@@ -278,8 +291,9 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
278291
"""
279292
identity = self._dispatch_identity(stopping_dispatch)
280293

281-
if actor := self._actors.pop(identity, None):
282-
await actor.stop(msg)
294+
if actor_and_channel := self._actors.pop(identity, None):
295+
await actor_and_channel.actor.stop(msg)
296+
await actor_and_channel.channel.close()
283297
else:
284298
_logger.warning(
285299
"Actor for dispatch type %r is not running", stopping_dispatch.type

tests/test_mananging_actor.py renamed to tests/test_managing_actor.py

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import time_machine
1616
from frequenz.channels import Broadcast, Receiver, Sender
1717
from frequenz.client.dispatch import recurrence
18-
from frequenz.client.dispatch.recurrence import Frequency
18+
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
1919
from frequenz.client.dispatch.test.client import FakeClient
2020
from frequenz.client.dispatch.test.generator import DispatchGenerator
2121
from frequenz.sdk.actor import Actor
@@ -104,9 +104,17 @@ def actor(self, identity: int) -> MockActor:
104104
"""Return the actor."""
105105
# pylint: disable=protected-access
106106
assert identity in self.actors_service._actors
107-
return cast(MockActor, self.actors_service._actors[identity])
107+
return cast(MockActor, self.actors_service._actors[identity].actor)
108108
# pylint: enable=protected-access
109109

110+
def is_running(self, identity: int) -> bool:
111+
"""Return whether the actor is running."""
112+
# pylint: disable-next=protected-access
113+
if identity not in self.actors_service._actors:
114+
return False
115+
116+
return self.actor(identity).is_running
117+
110118

111119
@fixture
112120
async def test_env() -> AsyncIterator[_TestEnv]:
@@ -383,3 +391,102 @@ async def new_mock_receiver(
383391

384392
# Check if actor instance is created
385393
assert identity(dispatch) in actor_manager._actors
394+
395+
396+
async def test_actor_dispatcher_update_isolation(
397+
test_env: _TestEnv,
398+
fake_time: time_machine.Coordinates,
399+
) -> None:
400+
"""Test that updates for one dispatch don't affect other actors of the same type."""
401+
dispatch_type = "ISOLATION_TEST"
402+
start_time = _now()
403+
duration = timedelta(minutes=5)
404+
405+
# Create first dispatch
406+
dispatch1_spec = replace(
407+
test_env.generator.generate_dispatch(),
408+
id=101, # Unique ID
409+
type=dispatch_type,
410+
active=True,
411+
dry_run=False,
412+
start_time=start_time + timedelta(seconds=1), # Stagger start slightly
413+
duration=duration,
414+
payload={"instance": 1},
415+
recurrence=RecurrenceRule(),
416+
)
417+
dispatch1 = Dispatch(dispatch1_spec)
418+
419+
# Create second dispatch of the same type, different ID
420+
dispatch2_spec = replace(
421+
test_env.generator.generate_dispatch(),
422+
id=102, # Unique ID
423+
type=dispatch_type, # Same type
424+
active=True,
425+
dry_run=False,
426+
start_time=start_time + timedelta(seconds=2), # Stagger start slightly
427+
duration=duration,
428+
payload={"instance": 2},
429+
recurrence=RecurrenceRule(),
430+
)
431+
dispatch2 = Dispatch(dispatch2_spec)
432+
433+
# Send dispatch 1 to start actor 1
434+
# print(f"Sending dispatch 1: {dispatch1}")
435+
await test_env.running_status_sender.send(dispatch1)
436+
fake_time.shift(timedelta(seconds=1.1)) # Move time past dispatch1 start
437+
await asyncio.sleep(0.1) # Allow actor to start
438+
439+
assert test_env.is_running(101), "Actor 1 should be running"
440+
actor1 = test_env.actor(101)
441+
assert actor1 is not None
442+
# pylint: disable-next=protected-access
443+
assert actor1.initial_dispatch._src.id == 101
444+
assert actor1.initial_dispatch.options == {"instance": 1}
445+
assert not test_env.is_running(102), "Actor 2 should not be running yet"
446+
447+
# Send dispatch 2 to start actor 2
448+
# print(f"Sending dispatch 2: {dispatch2}")
449+
await test_env.running_status_sender.send(dispatch2)
450+
fake_time.shift(timedelta(seconds=1)) # Move time past dispatch2 start
451+
await asyncio.sleep(0.1) # Allow actor to start
452+
453+
assert test_env.actor(101).is_running, "Actor 1 should still be running"
454+
assert test_env.actor(102).is_running, "Actor 2 should now be running"
455+
actor2 = test_env.actor(102)
456+
assert actor2 is not None
457+
# pylint: disable-next=protected-access
458+
assert actor2.initial_dispatch._src.id == 102
459+
assert actor2.initial_dispatch.options == {"instance": 2}
460+
461+
# Now, send an update to stop dispatch 1
462+
dispatch1_stop = Dispatch(
463+
replace(dispatch1_spec, duration=timedelta(seconds=1), active=False)
464+
)
465+
# print(f"Sending stop for dispatch 1: {dispatch1_stop}")
466+
await test_env.running_status_sender.send(dispatch1_stop)
467+
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop
468+
469+
# THE CORE ASSERTION: Actor 1 should stop, Actor 2 should remain running
470+
# pylint: disable=protected-access
471+
assert (
472+
101 not in test_env.actors_service._actors
473+
), "Actor 1 should have been removed"
474+
# pylint: enable=protected-access
475+
assert (
476+
test_env.actor(102).is_running is True
477+
), "Actor 2 should be running after Actor 1 stopped"
478+
# Double check actor1 object state if needed (though removal is stronger check)
479+
# assert not actor1.is_running
480+
481+
# Cleanup: Stop Actor 2
482+
dispatch2_stop = Dispatch(replace(dispatch2_spec, active=False))
483+
# print(f"Sending stop for dispatch 2: {dispatch2_stop}")
484+
await test_env.running_status_sender.send(dispatch2_stop)
485+
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop
486+
487+
# pylint: disable=protected-access
488+
assert (
489+
102 not in test_env.actors_service._actors
490+
), "Actor 2 should have been removed"
491+
# pylint: enable=protected-access
492+
assert not test_env.is_running(102), "Actor 2 should be stopped"

0 commit comments

Comments
 (0)