From 432d140272a333db943d35565037c8611a9bd7f4 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 16 Jul 2025 17:18:32 +0200 Subject: [PATCH 1/7] Reset release notes Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0e0dee1b..d4546f43 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,15 +6,12 @@ ## Upgrading -The `frequenz.dispatch.TargetComponents` type was removed, use `frequenz.client.dispatch.TargetComponents` instead. + ## New Features -* The dispatcher offers two new parameters to control the client's call and stream timeout: - - `call_timeout`: The maximum time to wait for a response from the client. - - `stream_timeout`: The maximum time to wait before restarting a stream. -* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates. + ## Bug Fixes -* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered. + From 88304adc7e444a33635d8cc98f3d4652a389cf31 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 16 Jul 2025 10:56:03 +0200 Subject: [PATCH 2/7] Use proper logging paths Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 +- src/frequenz/dispatch/_merge_strategies.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f43..66a9e668 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +* The merge by type class now uses the correct logger path. diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 0a955987..81e1ff4b 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -15,6 +15,8 @@ from ._bg_service import MergeStrategy from ._dispatch import Dispatch +_logger = logging.getLogger(__name__) + def _hash_positive(args: Any) -> int: """Make a positive hash.""" @@ -40,7 +42,7 @@ def filter( strategy's criteria are running. """ if dispatch.started: - logging.debug("Keeping start event %s", dispatch.id) + _logger.debug("Keeping start event %s", dispatch.id) return True other_dispatches_running = any( @@ -52,7 +54,7 @@ def filter( ) ) - logging.debug( + _logger.debug( "stop event %s because other_dispatches_running=%s", dispatch.id, other_dispatches_running, From 227da854dde37ce90aac267307bef68d3a5da88f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 16 Jul 2025 13:43:35 +0200 Subject: [PATCH 3/7] Use the same `now` for all in merge check Requires v0.11.1 of `frequenz-client-dispatch` for the new `started_at` method. Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + pyproject.toml | 2 +- src/frequenz/dispatch/_dispatch.py | 21 ++++++++++++++++++++- src/frequenz/dispatch/_merge_strategies.py | 7 +++++-- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 66a9e668..294a6575 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,3 +15,4 @@ ## Bug Fixes * The merge by type class now uses the correct logger path. +* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked. diff --git a/pyproject.toml b/pyproject.toml index c287ad58..c36f5cfd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ # plugins.mkdocstrings.handlers.python.import) "frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200", "frequenz-channels >= 1.6.1, < 2.0.0", - "frequenz-client-dispatch >= 0.11.0, < 0.12.0", + "frequenz-client-dispatch >= 0.11.1, < 0.12.0", "frequenz-client-common >= 0.3.2, < 0.4.0", "frequenz-client-base >= 0.11.0, < 0.12.0", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index 5d0d3a0a..bb4fbc89 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -43,10 +43,29 @@ def started(self) -> bool: Returns: True if the dispatch is started, False otherwise. """ + now = datetime.now(tz=timezone.utc) + return self.started_at(now) + + def started_at(self, now: datetime) -> bool: + """Check if the dispatch has started. + + A dispatch is considered started if the current time is after the start + time but before the end time. + + Recurring dispatches are considered started if the current time is after + the start time of the last occurrence but before the end time of the + last occurrence. + + Args: + now: time to use as now + + Returns: + True if the dispatch is started + """ if self.deleted: return False - return super().started + return super().started_at(now) # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return # value needs documenting diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 81e1ff4b..5c2059fd 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -5,6 +5,7 @@ import logging from collections.abc import Mapping +from datetime import datetime, timezone from sys import maxsize from typing import Any @@ -41,12 +42,14 @@ def filter( Keeps stop events only if no other dispatches matching the strategy's criteria are running. """ - if dispatch.started: + now = datetime.now(tz=timezone.utc) + + if dispatch.started_at(now): _logger.debug("Keeping start event %s", dispatch.id) return True other_dispatches_running = any( - existing_dispatch.started + existing_dispatch.started_at(now) for existing_dispatch in dispatches.values() if ( self.identity(existing_dispatch) == self.identity(dispatch) From 091cbe86dac02e8d1599c3cdef954d9351e3c9e5 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 16 Jul 2025 17:48:33 +0200 Subject: [PATCH 4/7] Adjust logging to be more informative and clear Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 4 +++- src/frequenz/dispatch/_merge_strategies.py | 23 +++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index fef96409..4750d33f 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -253,11 +253,13 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: identity = self._dispatch_identity(stopping_dispatch) if actor_and_channel := self._actors.pop(identity, None): + _logger.info("Stopping actor for dispatch type %r", stopping_dispatch.type) await actor_and_channel.actor.stop(msg) await actor_and_channel.channel.close() else: _logger.warning( - "Actor for dispatch type %r is not running", stopping_dispatch.type + "Actor for dispatch type %r is not running, ignoring stop request", + stopping_dispatch.type, ) async def _run(self) -> None: diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 5c2059fd..29c5e7a4 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -48,20 +48,37 @@ def filter( _logger.debug("Keeping start event %s", dispatch.id) return True - other_dispatches_running = any( - existing_dispatch.started_at(now) + running_dispatch_list = [ + existing_dispatch for existing_dispatch in dispatches.values() if ( self.identity(existing_dispatch) == self.identity(dispatch) and existing_dispatch.id != dispatch.id ) + ] + + other_dispatches_running = any( + running_dispatch.started_at(now) + for running_dispatch in running_dispatch_list ) _logger.debug( - "stop event %s because other_dispatches_running=%s", + "%s stop event %s because other_dispatches_running=%s", + "Ignoring" if other_dispatches_running else "Allowing", dispatch.id, other_dispatches_running, ) + + if other_dispatches_running: + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Active other dispatches: %s", + list( + running_dispatch.id + for running_dispatch in running_dispatch_list + ), + ) + return not other_dispatches_running From 062cf1699fc70a3df8e154d261c903596b7f8c42 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 17 Jul 2025 19:20:24 +0200 Subject: [PATCH 5/7] Minor: Extend documentation phrasing Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index c00ab3fb..7660baa7 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -337,7 +337,7 @@ async def _run(self) -> None: pass async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None: - """Execute a scheduled event. + """Execute a scheduled event and schedules the next one if any. Args: dispatch: The dispatch to execute. From 77626024df4006330b8db06f27ac1e358219da5f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 17 Jul 2025 19:19:03 +0200 Subject: [PATCH 6/7] Fix filter not seeing the latest dispatches after periodic fetch()'s By using a workflow that avoids reassigning the dispatches dict. When `fetch()`, which runs periodically, would assign a fresh map to `self._dispatches` the filter would not see the latest dispatches, as it would still reference the old map. Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/_bg_service.py | 38 ++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 294a6575..f93d4746 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,3 +16,4 @@ * The merge by type class now uses the correct logger path. * The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked. +* Fix that the merge filter was using an outdated dispatches dict once fetch() ran. diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 7660baa7..9556c6cf 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -366,8 +366,7 @@ async def _fetch(self, timer: Timer) -> None: """ self._initial_fetch_event.clear() - old_dispatches = self._dispatches - self._dispatches = {} + new_dispatches = {} try: _logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id) @@ -381,9 +380,9 @@ async def _fetch(self, timer: Timer) -> None: continue dispatch = Dispatch(client_dispatch) - self._dispatches[dispatch.id] = dispatch - old_dispatch = old_dispatches.pop(dispatch.id, None) - if not old_dispatch: + new_dispatches[dispatch.id] = dispatch + old_dispatch = self._dispatches.get(dispatch.id, None) + if old_dispatch is None: _logger.debug("New dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify( dispatch, None, timer @@ -396,23 +395,40 @@ async def _fetch(self, timer: Timer) -> None: ) await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) - _logger.debug("Received %s dispatches", len(self._dispatches)) + _logger.debug("Received %s dispatches", len(new_dispatches)) except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error) - self._dispatches = old_dispatches return - for dispatch in old_dispatches.values(): + # We make a copy because we mutate self._dispatches.keys() inside the loop + for dispatch_id in frozenset(self._dispatches.keys() - new_dispatches.keys()): + # Use try/except as the `self._dispatches` cache can be mutated by + # stream delete events while we're iterating + try: + dispatch = self._dispatches.pop(dispatch_id) + except KeyError as error: + _logger.warning( + "Inconsistency in cache detected. " + + "Tried to delete non-existing dispatch %s (%s)", + dispatch_id, + error, + ) + continue + _logger.debug("Deleted dispatch: %s", dispatch) - await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) await self._update_dispatch_schedule_and_notify(None, dispatch, timer) - # Set deleted only here as it influences the result of dispatch.started - # which is used in above in _running_state_change + # Set deleted only here as it influences the result of + # dispatch.started, which is used in + # _update_dispatch_schedule_and_notify above. dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) + # Update the dispatch list with the dispatches + self._dispatches.update(new_dispatches) + + # Set event to indicate fetch ran at least once self._initial_fetch_event.set() async def _update_dispatch_schedule_and_notify( From 8800b304840056685e860fd8199cdf777f5720df Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 17 Jul 2025 17:53:29 +0200 Subject: [PATCH 7/7] Add test for periodic fetch and merging Signed-off-by: Mathias L. Baumann --- tests/test_frequenz_dispatch.py | 83 ++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index c63049af..92860a4a 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -13,12 +13,13 @@ import pytest import time_machine from frequenz.channels import Receiver +from frequenz.channels.timer import SkipMissedAndResync, Timer from frequenz.client.common.microgrid import MicrogridId from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient, to_create_params from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.client.dispatch.types import Dispatch as BaseDispatch -from frequenz.client.dispatch.types import TargetIds +from frequenz.client.dispatch.types import DispatchId, TargetIds from pytest import fixture from frequenz.dispatch import ( @@ -692,6 +693,86 @@ async def test_multiple_dispatches_sequential_intervals_merge( assert not stopped.started +async def test_sequential_overlapping_dispatches_between_fetch( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, +) -> None: + """Test that sequential overlapping dispatches are handled correctly.""" + microgrid_id = MicrogridId(randint(1, 100)) + client = FakeClient() + service = DispatchScheduler(microgrid_id=microgrid_id, client=client) + service.start() + + receiver = await service.new_running_state_event_receiver( + "TEST_TYPE", merge_strategy=MergeByType() + ) + + # Create two overlapping dispatches + dispatch1 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=10), + target=TargetIds(1, 2), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + dispatch2 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=10), + target=TargetIds(3, 4), + start_time=_now() + timedelta(seconds=8), # overlaps with dispatch1 + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + await client.create(**to_create_params(microgrid_id, dispatch1)) + + timer = Timer(timedelta(seconds=100), SkipMissedAndResync(), auto_start=False) + await service._fetch(timer) # pylint: disable=protected-access + + await client.create(**to_create_params(microgrid_id, dispatch2)) + + # Move time forward to start first + fake_time.shift(timedelta(seconds=6)) + await asyncio.sleep(1) + import logging + + logging.debug("We see: %s", service._dispatches) + + started1 = await receiver.receive() + assert started1.id == DispatchId(1) + + # Move time to second dispatch + fake_time.shift(timedelta(seconds=6)) + await asyncio.sleep(1) + + started2 = await receiver.receive() + assert started2.id == DispatchId(2) + assert started2.started + assert started1.started + + # Now we move to when the first one ended + fake_time.shift(timedelta(seconds=5)) + await asyncio.sleep(1) + + with pytest.raises(asyncio.TimeoutError): + logging.debug("Wait for now starts %s", _now()) + started3 = await receiver.receive() + assert started3.id != started2.id, "Received unexpected event" + + assert not started1.started + assert started2.started + await asyncio.sleep(1) + + # Next we move to when all dispatches should have stopped + fake_time.shift(timedelta(seconds=4)) + started4 = await receiver.receive() + + # We only expect a message for dispatch2, dispatch1 should never send a stop + assert started4.id == DispatchId(2) + + @pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()]) async def test_at_least_one_running_filter( fake_time: time_machine.Coordinates,