diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..9c6a924 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,5 @@ ## Bug Fixes - +* ActorDispatcher: Fix that every actor instance wrongly received all updates for their dispatch type. This is only relevant to you if your actor has more than one running instance at any time. + diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index e97a434..6e0e793 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -10,7 +10,7 @@ from datetime import timedelta from typing import Any, Awaitable -from frequenz.channels import Broadcast, Receiver, select +from frequenz.channels import Broadcast, Receiver, Sender, select from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor, BackgroundService @@ -189,6 +189,19 @@ async def _retry_after_delay(self, dispatch: Dispatch) -> None: _logger.info("Retrying dispatch %s now", dispatch.id) await self._sender.send(dispatch) + @dataclass(frozen=True, kw_only=True) + class ActorAndChannel: + """Actor and its sender.""" + + actor: Actor + """The actor.""" + + channel: Broadcast[DispatchInfo] + """The channel for dispatch updates.""" + + sender: Sender[DispatchInfo] + """The sender for dispatch updates.""" + def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments self, actor_factory: Callable[ @@ -215,11 +228,8 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory - self._actors: dict[int, Actor] = {} - self._updates_channel = Broadcast[DispatchInfo]( - name="dispatch_updates_channel", resend_latest=True - ) - self._updates_sender = self._updates_channel.new_sender() + self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {} + self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval) def start(self) -> None: @@ -236,24 +246,25 @@ async def _start_actor(self, dispatch: Dispatch) -> None: ) identity = self._dispatch_identity(dispatch) - actor: Actor | None = self._actors.get(identity) + actor_and_channel = self._actors.get(identity) - if actor: - sent_str = "" - if self._updates_sender is not None: - sent_str = ", sent a dispatch update instead of creating a new actor" - await self._updates_sender.send(dispatch_update) + if actor_and_channel: + await actor_and_channel.sender.send(dispatch_update) _logger.info( - "Actor for dispatch type %r is already running%s", + "Actor for dispatch type %r is already running, " + "sent a dispatch update instead of creating a new actor", dispatch.type, - sent_str, ) else: try: _logger.info("Starting actor for dispatch type %r", dispatch.type) + channel = Broadcast[DispatchInfo]( + name=f"dispatch_updates_channel_instance={identity}", + resend_latest=True, + ) actor = await self._actor_factory( dispatch_update, - self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), + channel.new_receiver(limit=1, warn_on_overflow=False), ) actor.start() @@ -267,7 +278,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None: self._retrier.retry(dispatch) else: # No exception occurred, so we can add the actor to the list - self._actors[identity] = actor + self._actors[identity] = ActorDispatcher.ActorAndChannel( + actor=actor, channel=channel, sender=channel.new_sender() + ) async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. @@ -278,8 +291,9 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """ identity = self._dispatch_identity(stopping_dispatch) - if actor := self._actors.pop(identity, None): - await actor.stop(msg) + if actor_and_channel := self._actors.pop(identity, None): + await actor_and_channel.actor.stop(msg) + await actor_and_channel.channel.close() else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type diff --git a/tests/test_mananging_actor.py b/tests/test_managing_actor.py similarity index 73% rename from tests/test_mananging_actor.py rename to tests/test_managing_actor.py index 4ac2c3d..e8b3639 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_managing_actor.py @@ -15,7 +15,7 @@ import time_machine from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.dispatch import recurrence -from frequenz.client.dispatch.recurrence import Frequency +from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.sdk.actor import Actor @@ -104,9 +104,17 @@ def actor(self, identity: int) -> MockActor: """Return the actor.""" # pylint: disable=protected-access assert identity in self.actors_service._actors - return cast(MockActor, self.actors_service._actors[identity]) + return cast(MockActor, self.actors_service._actors[identity].actor) # pylint: enable=protected-access + def is_running(self, identity: int) -> bool: + """Return whether the actor is running.""" + # pylint: disable-next=protected-access + if identity not in self.actors_service._actors: + return False + + return self.actor(identity).is_running + @fixture async def test_env() -> AsyncIterator[_TestEnv]: @@ -383,3 +391,102 @@ async def new_mock_receiver( # Check if actor instance is created assert identity(dispatch) in actor_manager._actors + + +async def test_actor_dispatcher_update_isolation( + test_env: _TestEnv, + fake_time: time_machine.Coordinates, +) -> None: + """Test that updates for one dispatch don't affect other actors of the same type.""" + dispatch_type = "ISOLATION_TEST" + start_time = _now() + duration = timedelta(minutes=5) + + # Create first dispatch + dispatch1_spec = replace( + test_env.generator.generate_dispatch(), + id=101, # Unique ID + type=dispatch_type, + active=True, + dry_run=False, + start_time=start_time + timedelta(seconds=1), # Stagger start slightly + duration=duration, + payload={"instance": 1}, + recurrence=RecurrenceRule(), + ) + dispatch1 = Dispatch(dispatch1_spec) + + # Create second dispatch of the same type, different ID + dispatch2_spec = replace( + test_env.generator.generate_dispatch(), + id=102, # Unique ID + type=dispatch_type, # Same type + active=True, + dry_run=False, + start_time=start_time + timedelta(seconds=2), # Stagger start slightly + duration=duration, + payload={"instance": 2}, + recurrence=RecurrenceRule(), + ) + dispatch2 = Dispatch(dispatch2_spec) + + # Send dispatch 1 to start actor 1 + # print(f"Sending dispatch 1: {dispatch1}") + await test_env.running_status_sender.send(dispatch1) + fake_time.shift(timedelta(seconds=1.1)) # Move time past dispatch1 start + await asyncio.sleep(0.1) # Allow actor to start + + assert test_env.is_running(101), "Actor 1 should be running" + actor1 = test_env.actor(101) + assert actor1 is not None + # pylint: disable-next=protected-access + assert actor1.initial_dispatch._src.id == 101 + assert actor1.initial_dispatch.options == {"instance": 1} + assert not test_env.is_running(102), "Actor 2 should not be running yet" + + # Send dispatch 2 to start actor 2 + # print(f"Sending dispatch 2: {dispatch2}") + await test_env.running_status_sender.send(dispatch2) + fake_time.shift(timedelta(seconds=1)) # Move time past dispatch2 start + await asyncio.sleep(0.1) # Allow actor to start + + assert test_env.actor(101).is_running, "Actor 1 should still be running" + assert test_env.actor(102).is_running, "Actor 2 should now be running" + actor2 = test_env.actor(102) + assert actor2 is not None + # pylint: disable-next=protected-access + assert actor2.initial_dispatch._src.id == 102 + assert actor2.initial_dispatch.options == {"instance": 2} + + # Now, send an update to stop dispatch 1 + dispatch1_stop = Dispatch( + replace(dispatch1_spec, duration=timedelta(seconds=1), active=False) + ) + # print(f"Sending stop for dispatch 1: {dispatch1_stop}") + await test_env.running_status_sender.send(dispatch1_stop) + await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop + + # THE CORE ASSERTION: Actor 1 should stop, Actor 2 should remain running + # pylint: disable=protected-access + assert ( + 101 not in test_env.actors_service._actors + ), "Actor 1 should have been removed" + # pylint: enable=protected-access + assert ( + test_env.actor(102).is_running is True + ), "Actor 2 should be running after Actor 1 stopped" + # Double check actor1 object state if needed (though removal is stronger check) + # assert not actor1.is_running + + # Cleanup: Stop Actor 2 + dispatch2_stop = Dispatch(replace(dispatch2_spec, active=False)) + # print(f"Sending stop for dispatch 2: {dispatch2_stop}") + await test_env.running_status_sender.send(dispatch2_stop) + await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop + + # pylint: disable=protected-access + assert ( + 102 not in test_env.actors_service._actors + ), "Actor 2 should have been removed" + # pylint: enable=protected-access + assert not test_env.is_running(102), "Actor 2 should be stopped"