Skip to content

Fix that every actor instance receives every update for their type #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
* ActorDispatcher: Fix that every instance of the same type receives every dispatch update
50 changes: 32 additions & 18 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import timedelta
from typing import Any, Awaitable

from frequenz.channels import Broadcast, Receiver, select
from frequenz.channels import Broadcast, Receiver, Sender, select
from frequenz.client.dispatch.types import TargetComponents
from frequenz.sdk.actor import Actor, BackgroundService

Expand Down Expand Up @@ -189,6 +189,19 @@ async def _retry_after_delay(self, dispatch: Dispatch) -> None:
_logger.info("Retrying dispatch %s now", dispatch.id)
await self._sender.send(dispatch)

@dataclass(frozen=True, kw_only=True)
class ActorAndChannel:
"""Actor and its sender."""

actor: Actor
"""The actor."""

channel: Broadcast[DispatchInfo]
"""The channel for dispatch updates."""

sender: Sender[DispatchInfo]
"""The sender for dispatch updates."""

def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
actor_factory: Callable[
Expand All @@ -215,11 +228,8 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen

self._dispatch_rx = running_status_receiver
self._actor_factory = actor_factory
self._actors: dict[int, Actor] = {}
self._updates_channel = Broadcast[DispatchInfo](
name="dispatch_updates_channel", resend_latest=True
)
self._updates_sender = self._updates_channel.new_sender()
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}

self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)

def start(self) -> None:
Expand All @@ -236,24 +246,25 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
)

identity = self._dispatch_identity(dispatch)
actor: Actor | None = self._actors.get(identity)
actor_and_channel = self._actors.get(identity)

if actor:
sent_str = ""
if self._updates_sender is not None:
sent_str = ", sent a dispatch update instead of creating a new actor"
await self._updates_sender.send(dispatch_update)
if actor_and_channel:
await actor_and_channel.sender.send(dispatch_update)
_logger.info(
"Actor for dispatch type %r is already running%s",
"Actor for dispatch type %r is already running, "
"sent a dispatch update instead of creating a new actor",
dispatch.type,
sent_str,
)
else:
try:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
channel = Broadcast[DispatchInfo](
name=f"dispatch_updates_channel_instance={identity}",
resend_latest=True,
)
actor = await self._actor_factory(
dispatch_update,
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
channel.new_receiver(limit=1, warn_on_overflow=False),
)

actor.start()
Expand All @@ -267,7 +278,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
self._retrier.retry(dispatch)
else:
# No exception occurred, so we can add the actor to the list
self._actors[identity] = actor
self._actors[identity] = ActorDispatcher.ActorAndChannel(
actor=actor, channel=channel, sender=channel.new_sender()
)

async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
Expand All @@ -278,8 +291,9 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""
identity = self._dispatch_identity(stopping_dispatch)

if actor := self._actors.pop(identity, None):
await actor.stop(msg)
if actor_and_channel := self._actors.pop(identity, None):
await actor_and_channel.actor.stop(msg)
await actor_and_channel.channel.close()
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
Expand Down
111 changes: 109 additions & 2 deletions tests/test_mananging_actor.py → tests/test_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time_machine
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.dispatch import recurrence
from frequenz.client.dispatch.recurrence import Frequency
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
from frequenz.client.dispatch.test.client import FakeClient
from frequenz.client.dispatch.test.generator import DispatchGenerator
from frequenz.sdk.actor import Actor
Expand Down Expand Up @@ -104,9 +104,17 @@ def actor(self, identity: int) -> MockActor:
"""Return the actor."""
# pylint: disable=protected-access
assert identity in self.actors_service._actors
return cast(MockActor, self.actors_service._actors[identity])
return cast(MockActor, self.actors_service._actors[identity].actor)
# pylint: enable=protected-access

def is_running(self, identity: int) -> bool:
"""Return whether the actor is running."""
# pylint: disable-next=protected-access
if identity not in self.actors_service._actors:
return False

return self.actor(identity).is_running


@fixture
async def test_env() -> AsyncIterator[_TestEnv]:
Expand Down Expand Up @@ -383,3 +391,102 @@ async def new_mock_receiver(

# Check if actor instance is created
assert identity(dispatch) in actor_manager._actors


async def test_actor_dispatcher_update_isolation(
test_env: _TestEnv,
fake_time: time_machine.Coordinates,
) -> None:
"""Test that updates for one dispatch don't affect other actors of the same type."""
dispatch_type = "ISOLATION_TEST"
start_time = _now()
duration = timedelta(minutes=5)

# Create first dispatch
dispatch1_spec = replace(
test_env.generator.generate_dispatch(),
id=101, # Unique ID
type=dispatch_type,
active=True,
dry_run=False,
start_time=start_time + timedelta(seconds=1), # Stagger start slightly
duration=duration,
payload={"instance": 1},
recurrence=RecurrenceRule(),
)
dispatch1 = Dispatch(dispatch1_spec)

# Create second dispatch of the same type, different ID
dispatch2_spec = replace(
test_env.generator.generate_dispatch(),
id=102, # Unique ID
type=dispatch_type, # Same type
active=True,
dry_run=False,
start_time=start_time + timedelta(seconds=2), # Stagger start slightly
duration=duration,
payload={"instance": 2},
recurrence=RecurrenceRule(),
)
dispatch2 = Dispatch(dispatch2_spec)

# Send dispatch 1 to start actor 1
# print(f"Sending dispatch 1: {dispatch1}")
await test_env.running_status_sender.send(dispatch1)
fake_time.shift(timedelta(seconds=1.1)) # Move time past dispatch1 start
await asyncio.sleep(0.1) # Allow actor to start

assert test_env.is_running(101), "Actor 1 should be running"
actor1 = test_env.actor(101)
assert actor1 is not None
# pylint: disable-next=protected-access
assert actor1.initial_dispatch._src.id == 101
assert actor1.initial_dispatch.options == {"instance": 1}
assert not test_env.is_running(102), "Actor 2 should not be running yet"

# Send dispatch 2 to start actor 2
# print(f"Sending dispatch 2: {dispatch2}")
await test_env.running_status_sender.send(dispatch2)
fake_time.shift(timedelta(seconds=1)) # Move time past dispatch2 start
await asyncio.sleep(0.1) # Allow actor to start

assert test_env.actor(101).is_running, "Actor 1 should still be running"
assert test_env.actor(102).is_running, "Actor 2 should now be running"
actor2 = test_env.actor(102)
assert actor2 is not None
# pylint: disable-next=protected-access
assert actor2.initial_dispatch._src.id == 102
assert actor2.initial_dispatch.options == {"instance": 2}

# Now, send an update to stop dispatch 1
dispatch1_stop = Dispatch(
replace(dispatch1_spec, duration=timedelta(seconds=1), active=False)
)
# print(f"Sending stop for dispatch 1: {dispatch1_stop}")
await test_env.running_status_sender.send(dispatch1_stop)
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop

# THE CORE ASSERTION: Actor 1 should stop, Actor 2 should remain running
# pylint: disable=protected-access
assert (
101 not in test_env.actors_service._actors
), "Actor 1 should have been removed"
# pylint: enable=protected-access
assert (
test_env.actor(102).is_running is True
), "Actor 2 should be running after Actor 1 stopped"
# Double check actor1 object state if needed (though removal is stronger check)
# assert not actor1.is_running

# Cleanup: Stop Actor 2
dispatch2_stop = Dispatch(replace(dispatch2_spec, active=False))
# print(f"Sending stop for dispatch 2: {dispatch2_stop}")
await test_env.running_status_sender.send(dispatch2_stop)
await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop

# pylint: disable=protected-access
assert (
102 not in test_env.actors_service._actors
), "Actor 2 should have been removed"
# pylint: enable=protected-access
assert not test_env.is_running(102), "Actor 2 should be stopped"
Loading