Skip to content

Conversation

@leandro-lucarella-frequenz
Copy link
Contributor

@leandro-lucarella-frequenz leandro-lucarella-frequenz commented Dec 4, 2022

First we introduce a timeseries Source (async iterator of Sample) and Sink (an async function taking a Sample) types. The idea is for all classes in the timeseries package to be based on these types.

On advantage of using a function as a Sink (instead of a channel) is that for cases where we don't really need a channel (like if we only need to store samples in a buffer), there is no added overhead (peak shaving for example could benefit from this).

The GroupResampler and Resampler classes are not very reusable, or at least require quite a bit of work from users to use them and they need to take care of handling timers and calling methods at the right time for them to be useful.

The new Resampler class has its own task(s) to handle all the resampling, users only need to subscribe to new timeseries by passing a timeseries Source and Sink. The Sink will be called each time a new sample is produced.

A new frequenz.sdk.timeseries.resampling module is added because there are more classes only used by the Resampler that might not be needed by anyone just wanting to use frequenz.sdk.timeseries.

This PR also introduces the use of async-solipsism (see #70) for testing, so tests using timers can be mocked in a way that they don't really need to wait for timers and time comparisons can be done precisely. It works pretty well in combination with time_machine.

Fixes #56.

@leandro-lucarella-frequenz leandro-lucarella-frequenz added the type:enhancement New feature or enhancement visitble to users label Dec 4, 2022
@leandro-lucarella-frequenz leandro-lucarella-frequenz added this to the v0.14.0 milestone Dec 4, 2022
@leandro-lucarella-frequenz leandro-lucarella-frequenz requested a review from a team as a code owner December 4, 2022 11:40
@github-actions github-actions bot added part:actor Affects an actor ot the actors utilities (decorator, etc.) part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) labels Dec 4, 2022
@leandro-lucarella-frequenz
Copy link
Contributor Author

Based on #97, please ignore the first commit until that PR is merged and this PR is rebased.

@idlir-shkurti-frequenz please have a look too to see if the interface fits your needs.

@leandro-lucarella-frequenz
Copy link
Contributor Author

Sometimes I'm getting the tests stalling after successful completion. This might need some investigation. I had some issues before with the logical meter tests but they were gone after some rebasing and more editing. Those tests still throw a lot of warnings, so I think we should eventually have a look and simplify them not to use a gRPC server, we could simply mock the microgrid API. Also using async-solipsism and time_machine there could make things easier and faster. (FYI @sahas-subramanian-frequenz )

@leandro-lucarella-frequenz leandro-lucarella-frequenz changed the title Fix the disable_actor_auto_restart fixture Rewrite the resampler class to be more reusable Dec 4, 2022
@leandro-lucarella-frequenz
Copy link
Contributor Author

One more observation, I wasn't sure if to add the cancel_and_wait() function (and the util package in general) as it is a very small and trivial function, but since it needs error handling is quite verbose to use everywhere and every time one wants to cancel a task is very likely that the exception catching is needed. So I accept comment and suggestions about this case in particular and to decide what to do in the future with similar cases.

@leandro-lucarella-frequenz
Copy link
Contributor Author

Updated:

  • Rebased
  • Added a second stage resampling to the resampling example
  • Moved the average function (default resampling function) to timeseries.resampling and made it the default for the Resampler class too, so users don't have to forcibly provide a resampling function.

"""Helper class to compare two sequences without caring about the underlying type."""


from typing import Any, Sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this I was thinking for a while that we probably also need a _compat module to deal with these issues between different python versions. Because in 3.8 is not deprecated and I'm not sure if in 3.8 collections.abc.Sequence support indenxing (so it can be used as a type hint), I will need to check.

Another thing I would put in the _compat module is anext() (although in the channels repo I think, I don't think we use it here).

Copy link
Contributor Author

@leandro-lucarella-frequenz leandro-lucarella-frequenz Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, with Python 3.8 I get this error if I use collections.abc.Sequence:

tests/utils/_a_sequence.py:29:23: E1136: Value 'Sequence' is unsubscriptable (unsubscriptable-object)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please send url to the _compat module?
I can't find it in google :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't exist. It is just an idea I had in mind. After this I actually tried to add one for Sequence, but it is not that simple for this case because even for the same Python version (3.8) you might need to import Sequence from different places depending of how you want to use it (a type hint or a ABC), so I couldn't decide how to name stuff, as it it lives in compat, we only need to import that for type hinting, in which case it would make sense to call it compat.typing to make it more clear, but then it feels weird to use typing when more modern pythons need less and less the use of the typing module.

I know it is nitpicking, but I couldn't decide, and we already are importing Sequence from typing in other parts of the SDK code so I figure we leave it like it is if we didn't have any issues with it so far.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh really? That's cool! I'll try it out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try it out but I have very little hope, annotations seems to be only about forward references: https://peps.python.org/pep-0563/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then awesome if it means we don't have to import stuff from typing for built-in types either 🥳

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is almost awesome. It does work for type hinting, but it doesn't work for other usages, like type aliases MyType = Sequence[int] or cast(Sequence[int], s) will still fail if Sequence doesn't come from typing.

But well... Is better than nothing.

"""Cancel all receiving tasks."""
await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()])

def add_timeseries(self, source: Source, sink: Sink) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if providing any support for callbacks is a good idea, especially in interfaces that will be exposed to the users. Maybe one of these options is better:

  1. add_timeseries returns a MetricResampler instance, which provides an __anext__ method, that will produce a value whenever a call to Resampler.resample() finishes.
  2. Resampler.resample() returns a list of tuples each containing a name for a stream, and the latest Sample.

Copy link
Contributor Author

@leandro-lucarella-frequenz leandro-lucarella-frequenz Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about going for 2, but make it an AsyncIterator[Map[Source, Sample]], so users can do:

async for resampled_window in resampler.resample():
    sample_1 = resampled_window[source_1]
    sample_2 = resampled_window[source_2]

What do you think?

What I don't like that much about this is then users probably will have to create a task for this. I guess this is why you created channels Select then, I think now I'm understanding a bit better where this comes from, I didn't knew you had so many issues with users abusing callbacks and this is why you want to avoid them at all cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about going for 2, but make it an AsyncIterator[Map[Source, Sample]], so users can do

also possible with:

t1 = resampler.add_timeseries(...)
t2 = resampler.add_timeseries(...)

while await resampler.resample():
    t1.consume()

and there would be no wrapping up of values in a map just to be unwrapped in the next step. looks weirdly similar to some other parts of the code as well. :D

But just putting it out there. Maybe maps are better, especially in dynamic contexts, else people will have to have a second loop to go through the MetricResampler objects. Although that will be the same performance as the async for approach, it will be a few more lines of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean the resample method would no longer be long running I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like the dissociation between the loop and where values are left, for me it makes the code harder to reason about (is the same issue I currently have with Select). It will also make the Resampler implementation more complicated, having to create new wrapper classes for each timeseries. But it is true that performance-wise is better.

I say for now I will go with the async iterator returning a map if there are no big objections, for me is the most intuitive and easier to use interface. If we see performance issues in the future we can revise it, but also if we reuse a dict, I think the extra wrapping should have a very minimal impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me that, besides being too unpythonic, is also not the whole story, because if there is an error with, for example t1, you probably won't want consume it, so the code will be still littered with if to check for errors and how to handle them.

I think I will make the ResamplingError to take also samples, so if there is a problem, you get the exceptions but also the samples that were produced correctly. It still sucks a bit as it makes error handling more complicated, but I guess if you are dealing with a lot of streams of data there is no real way around it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add more context, I guess this approach makes sense because if you are resampling a group of metrics, you probably care about having a complete window, as long as that happens, you operate normally. If there is a problem, you need to inspect the situation, so you can get from the exception which metrics were calculated successfully and which didn't, and you can decide what to do based on that (abort the whole thing, use the ones that are available, etc.).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example t1, you probably won't want consume it, so the code will be still littered with if to check for errors and how to handle them.

maybe you missed the continue statement in the example? So consume would only be called if resample worked for all streams.

Btw, channels closing could cause the resampler to fail, what else could cause an exception in the resampler?

Copy link
Contributor Author

@leandro-lucarella-frequenz leandro-lucarella-frequenz Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, I missed it. But then, if you don't consume when there is an error, you lost the whole window of data, the resampler will wait until the next timer fire if you continue.

And also I assume even if you don't call consume() then the samples of the current window should be discarded, otherwise different timeseries will get out of sync, or at least (if you remove the faulty ones) will be delayed, as now the new tick will send the samples from the old tick.

For example, the resampling actor will want to keep sending all the successful timeseries and remove the failing ones, not ignore the whole resampling window.

Still not so simple IMHO. But again, I'm more and more convinced that the complexity is inherent to the problem, there is no way to really take it away.

Btw, channels closing could cause the resampler to fail, what else could cause an exception in the resampler?

The resampling function I guess (hello callbacks :D), not counting bugs is the only thing I can think of (the resampling function should also only fail if there are bugs, but it is not under our control, so we can't assume it won't raise).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also I assume even if you don't call consume() then the samples of the current window should be discarded, otherwise different timeseries will get out of sync.

Yes, if some values are not consumed, they will get overwritten by the next run. I think that's fine. And we could add a boolean check, and log a warning saying that values are getting discarded. People trying to start a resampler and not using the output is too basic a problem that we can't solve with any approach.

For example, the resampling actor will want to keep sending all the successful timeseries and remove the failing ones, not ignore the whole resampling window.

If that happens, we should not send any values out, because if we do, downstream code will go out of sync. The formula engine can be made to resynchronize, but it does that by dropping some values until all streams have caught up to the latest values. I think it is better to do that in the resampler instead.

@leandro-lucarella-frequenz
Copy link
Contributor Author

@ela-kotulska-frequenz @sahas-subramanian-frequenz please let me know if you finished the review so I start working on the feedback.

@shsms
Copy link
Contributor

shsms commented Dec 7, 2022

@ela-kotulska-frequenz @sahas-subramanian-frequenz please let me know if you finished the review so I start working on the feedback.

I just finished reviewing. Other than the callback stuff, it all looks great!

@ela-kotulska-frequenz
Copy link
Contributor

I finished review, too.

@leandro-lucarella-frequenz
Copy link
Contributor Author

I spent quite some time with this and I'm not sure if going for the individual tasks design makes any sense at all if we really want to avoid callbacks at all cost, as it introduces a lot of extra unnecessary complexity. For example, one of the niceties about this approach is the resampling actor becomes really simple, as it doesn't need to keep track of senders or receivers itself. Without the callbacks, it needs to start keep tracking of the senders again, and once we do that, I'm not sure it brings any value compared to the previous approach of keeping track of both, on the contrary, it introduces some asynchrony, because now half is handled by the actor and half is handled deep in the Resampler class.

The whole point of having independent tasks handling each time series is to avoid the interaction that is being proposed if we want an "active driver" that needs to keep track of everything.

Removing the Sink sadly is turning out into another whole rewrite, as it was at the heart of the new design. To be honest, I'm also still not convinced it is justified to have such an strong no-callback policy just because it was misused in the past, but I guess that is a broader topic.

Since I also started doing some other work based on this PR, I suggest to merge it as is (if there are no other comments) and maybe do another rewrite as a follow up after I'm done with the stuff that I based on top of it.

FYI, this is the approach I was taking, but which ends up in a big rewrite:

Resampler being used by the resampling actor
    async def _resample(self) -> None:
        """Do the resampling."""
        while True:
            try:
                async for samples in self._resampler.resample():
                    await asyncio.gather(
                        *[
                            self._senders[source].send(sample)
                            for source, sample in samples.items()
                        ]
                    )
            except ResamplingError as err:
                # Send whatever we got
                if err.samples:
                    await asyncio.gather(
                        *[
                            self._senders[source].send(sample)
                            for source, sample in err.samples.items()
                        ]
                    )
                # Remove failing timeseries
                for source, source_error in err.exceptions.items():
                    logger.exception(
                        "Error resampling source %s, removing source...",
                        source,
                        exc_info=source_error,
                    )
                    removed = self._resampler.remove_timeseries(source)
                    popped = self._senders.pop(source, None)
                    if not removed or not popped:
                        logger.warning(
                            "Got an exception from an unknown source: "
                            "source=%r, exception=%r resampler=%s, senders=%s",
                            source,
                            source_error,
                            removed,
                            popped,
                            exc_info=source_error,
                        )

    async def run(self) -> None:
        """Resample known component metrics and process resampling requests.

        If there is a resampling error while resampling some component metric,
        then that metric will be discarded and not resampled any more. Any
        other error will be propagated (most likely ending in the actor being
        restarted).

        Raises:
            RuntimeError: If there is some unexpected error while resampling or
                handling requests.

        # noqa: DAR401 error
        """
        tasks_to_cancel: set[asyncio.Task] = set()
        try:
            subscriptions_task = asyncio.create_task(
                self._process_resampling_requests()
            )
            tasks_to_cancel.add(subscriptions_task)

            while True:
                resampling_task = asyncio.create_task(self._resample())
                tasks_to_cancel.add(resampling_task)
                done, _ = await asyncio.wait(
                    [resampling_task, subscriptions_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )

                if subscriptions_task in done:
                    tasks_to_cancel.remove(subscriptions_task)
                    raise RuntimeError(
                        "There was a problem with the subscriptions channel."
                    )

                if resampling_task in done:
                    tasks_to_cancel.remove(resampling_task)
                    # The resampler shouldn't end without an exception
                    error = resampling_task.exception()
                    assert (
                        error is not None
                    ), "The resample() function shouldn't exit without an exception."

                    # We don't know what to do with whatever error comes that
                    # wasn't handled in self._resample(), so propagate the
                    # exception if that is the case.
                    raise error
                    # The resampling_task will be re-created if we reached this point
        finally:
            await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
Diff of the changes (not updating all the tests which also need a lot of rewriting)
diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py
index 10cd77b..c18e4aa 100644
--- a/src/frequenz/sdk/actor/_resampling.py
+++ b/src/frequenz/sdk/actor/_resampling.py
@@ -3,10 +3,12 @@
 
 """ComponentMetricsResamplingActor used to subscribe for resampled component metrics."""
 
+from __future__ import annotations
+
 import asyncio
 import dataclasses
 import logging
-from typing import Sequence, Set
+from collections.abc import AsyncIterator, Sequence
 
 from frequenz.channels import Receiver, Sender
 
@@ -142,11 +144,12 @@ class ComponentMetricsResamplingActor:
         self._resampling_period_s = resampling_period_s
         self._max_data_age_in_periods: float = max_data_age_in_periods
         self._resampling_function: ResamplingFunction = resampling_function
-        self._resampler = Resampler(
+        self._resampler: Resampler = Resampler(
             resampling_period_s=resampling_period_s,
             max_data_age_in_periods=max_data_age_in_periods,
             resampling_function=resampling_function,
         )
+        self._senders: dict[AsyncIterator[Sample], Sender[Sample]] = {}
 
     async def _subscribe(self, request: ComponentMetricRequest) -> None:
         """Subscribe for data for a specific time series.
@@ -160,22 +163,56 @@ class ComponentMetricsResamplingActor:
         data_source_channel_name = data_source_request.get_channel_name()
         await self._subscription_sender.send(data_source_request)
         receiver = self._channel_registry.new_receiver(data_source_channel_name)
-
-        # This is a temporary hack until the Sender implementation uses
-        # exceptions to report errors.
         sender = self._channel_registry.new_sender(request.get_channel_name())
 
-        async def sink_adapter(sample: Sample) -> None:
-            if not await sender.send(sample):
-                raise Exception(f"Error while sending with sender {sender}", sender)
-
-        self._resampler.add_timeseries(receiver, sink_adapter)
+        self._senders[receiver] = sender
+        self._resampler.add_timeseries(receiver)
 
     async def _process_resampling_requests(self) -> None:
         """Process resampling data requests."""
         async for request in self._subscription_receiver:
             await self._subscribe(request)
 
+    async def _resample(self) -> None:
+        """Do the resampling."""
+        while True:
+            try:
+                async for samples in self._resampler.resample():
+                    await asyncio.gather(
+                        *[
+                            self._senders[source].send(sample)
+                            for source, sample in samples.items()
+                        ]
+                    )
+            except ResamplingError as err:
+                # Send whatever we got
+                if err.samples:
+                    await asyncio.gather(
+                        *[
+                            self._senders[source].send(sample)
+                            for source, sample in err.samples.items()
+                        ]
+                    )
+                # Remove failing timeseries
+                for source, source_error in err.exceptions.items():
+                    logger.exception(
+                        "Error resampling source %s, removing source...",
+                        source,
+                        exc_info=source_error,
+                    )
+                    removed = self._resampler.remove_timeseries(source)
+                    popped = self._senders.pop(source, None)
+                    if not removed or not popped:
+                        logger.warning(
+                            "Got an exception from an unknown source: "
+                            "source=%r, exception=%r resampler=%s, senders=%s",
+                            source,
+                            source_error,
+                            removed,
+                            popped,
+                            exc_info=source_error,
+                        )
+
     async def run(self) -> None:
         """Resample known component metrics and process resampling requests.
 
@@ -190,7 +227,7 @@ class ComponentMetricsResamplingActor:
 
         # noqa: DAR401 error
         """
-        tasks_to_cancel: Set[asyncio.Task] = set()
+        tasks_to_cancel: set[asyncio.Task] = set()
         try:
             subscriptions_task = asyncio.create_task(
                 self._process_resampling_requests()
@@ -198,7 +235,7 @@ class ComponentMetricsResamplingActor:
             tasks_to_cancel.add(subscriptions_task)
 
             while True:
-                resampling_task = asyncio.create_task(self._resampler.resample())
+                resampling_task = asyncio.create_task(self._resample())
                 tasks_to_cancel.add(resampling_task)
                 done, _ = await asyncio.wait(
                     [resampling_task, subscriptions_task],
@@ -217,28 +254,12 @@ class ComponentMetricsResamplingActor:
                     error = resampling_task.exception()
                     assert (
                         error is not None
-                    ), "The resample() function shouldn't exit normally."
-
-                    # We don't know what to do with something other than
-                    # ResamplingError, so propagate the exception if that is the
-                    # case.
-                    if not isinstance(error, ResamplingError):
-                        raise error
-                    for source, source_error in error.exceptions.items():
-                        logger.exception(
-                            "Error resampling source %s, removing source...",
-                            source,
-                            exc_info=source_error,
-                        )
-                        removed = self._resampler.remove_timeseries(source)
-                        if not removed:
-                            logger.warning(
-                                "Got an exception from an unknown source: "
-                                "source=%r, exception=%r",
-                                source,
-                                source_error,
-                                exc_info=source_error,
-                            )
+                    ), "The resample() function shouldn't exit without an exception."
+
+                    # We don't know what to do with whatever error comes that
+                    # wasn't handled in self._resample(), so propagate the
+                    # exception if that is the case.
+                    raise error
                     # The resampling_task will be re-created if we reached this point
         finally:
             await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
diff --git a/src/frequenz/sdk/timeseries/resampling.py b/src/frequenz/sdk/timeseries/resampling.py
index 8c830da..88c6c4b 100644
--- a/src/frequenz/sdk/timeseries/resampling.py
+++ b/src/frequenz/sdk/timeseries/resampling.py
@@ -3,17 +3,20 @@
 
 """Timeseries resampler."""
 
+from __future__ import annotations
+
 import asyncio
 import logging
 import math
 from collections import deque
+from collections.abc import AsyncIterator
 from datetime import datetime, timedelta
-from typing import Callable, Deque, Dict, Sequence, Union, cast
+from typing import Callable, Deque, Dict, Sequence, Union
 
 from frequenz.channels.util import Timer
 
 from ..util.asyncio import cancel_and_await
-from . import Sample, Sink, Source
+from . import Sample
 
 _logger = logging.Logger(__name__)
 
@@ -45,7 +48,7 @@ Returns:
 class SourceStoppedError(RuntimeError):
     """A timeseries stopped producing samples."""
 
-    def __init__(self, source: Source) -> None:
+    def __init__(self, source: AsyncIterator[Sample]) -> None:
         """Create an instance.
 
         Args:
@@ -68,36 +71,36 @@ class SourceStoppedError(RuntimeError):
 class ResamplingError(RuntimeError):
     """An Error ocurred while resampling.
 
-    This error is a container for errors raised by the underlying sources and
-    or sinks.
+    This error provides information on which samples were succesfully produced
+    and which failed. Successful samples are stored in `samples` and failing
+    timeseries have their exceptions resported in `exceptions`.
     """
 
     def __init__(
-        self, exceptions: Dict[Source, Union[Exception, asyncio.CancelledError]]
+        self,
+        samples: Dict[AsyncIterator[Sample], Sample],
+        exceptions: Dict[
+            AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+        ],
     ) -> None:
         """Create an instance.
 
         Args:
-            exceptions: A mapping of timeseries source and the exception
-                encountered while resampling that timeseries. Note that the
-                error could be raised by the sink, while trying to send
-                a resampled data for this timeseries, the source key is only
-                used to identify the timeseries with the issue, it doesn't
-                necessarily mean that the error was raised by the source. The
-                underlying exception should provide information about what was
+            samples: The samples from the timeseries that could be successfully
+                calculated.
+            exceptions: The exceptions raised by failing timeseries.
+                encountered while resampling that timeseries.
                 the actual source of the exception.
         """
         super().__init__(f"Some error were found while resampling: {exceptions}")
-        self.exceptions = exceptions
-        """A mapping of timeseries source and the exception encountered.
-
-        Note that the error could be raised by the sink, while trying to send
-        a resampled data for this timeseries, the source key is only used to
-        identify the timeseries with the issue, it doesn't necessarily mean
-        that the error was raised by the source. The underlying exception
-        should provide information about what was the actual source of the
-        exception.
-        """
+
+        self.samples: Dict[AsyncIterator[Sample], Sample] = samples
+        """The samples from the timeseries that could be successfully calculated."""
+
+        self.exceptions: Dict[
+            AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+        ] = exceptions
+        """The exceptions raised by failing timeseries."""
 
     def __repr__(self) -> str:
         """Return the representation of the instance.
@@ -105,15 +108,15 @@ class ResamplingError(RuntimeError):
         Returns:
             The representation of the instance.
         """
-        return f"{self.__class__.__name__}({self.exceptions=})"
+        return f"{self.__class__.__name__}({self.samples=}, {self.exceptions=})"
 
 
 class Resampler:
     """A timeseries resampler.
 
-    In general timeseries [`Source`][frequenz.sdk.timeseries.Source]s don't
-    necessarily come at periodic intervals. You can use this class to normalize
-    timeseries to produce `Sample`s at regular periodic intervals.
+    In general timeseries don't necessarily come at periodic intervals. You can
+    use this class to normalize timeseries to produce `Sample`s at regular
+    periodic intervals.
 
     This class uses
     a [`ResamplingFunction`][frequenz.sdk.timeseries.ResamplingFunction] to
@@ -149,14 +152,14 @@ class Resampler:
         self._resampling_period_s = resampling_period_s
         self._max_data_age_in_periods: float = max_data_age_in_periods
         self._resampling_function: ResamplingFunction = resampling_function
-        self._resamplers: Dict[Source, _StreamingHelper] = {}
+        self._resamplers: Dict[AsyncIterator[Sample], _StreamingHelper] = {}
         self._timer: Timer = Timer(self._resampling_period_s)
 
     async def stop(self) -> None:
         """Cancel all receiving tasks."""
         await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()])
 
-    def add_timeseries(self, source: Source, sink: Sink) -> bool:
+    def add_timeseries(self, source: AsyncIterator[Sample]) -> bool:
         """Start resampling a new timeseries.
 
         Args:
@@ -178,12 +181,11 @@ class Resampler:
                 resampling_function=self._resampling_function,
             ),
             source,
-            sink,
         )
         self._resamplers[source] = resampler
         return True
 
-    def remove_timeseries(self, source: Source) -> bool:
+    def remove_timeseries(self, source: AsyncIterator[Sample]) -> bool:
         """Stop resampling the timeseries produced by `source`.
 
         Args:
@@ -200,16 +202,12 @@ class Resampler:
             return False
         return True
 
-    async def resample(self, *, one_shot: bool = False) -> None:
+    async def resample(self) -> AsyncIterator[Dict[AsyncIterator[Sample], Sample]]:
         """Start resampling all known timeseries.
 
         This method will run forever unless there is an error while receiving
         from a source or sending to a sink (or `one_shot` is used).
 
-        Args:
-            one_shot: Wether the resampling should run only for one resampling
-                period.
-
         Raises:
             ResamplingError: If some timseries source or sink encounters any
                 errors while receiving or sending samples. In this case the
@@ -223,23 +221,28 @@ class Resampler:
                 *[r.resample(timer_timestamp) for r in self._resamplers.values()],
                 return_exceptions=True,
             )
-            # We need to cast this because Python is not smart enough to figure
-            # out the key value can't be None (not even adding another
-            # condition checking for None in the dict expression).
-            exceptions = cast(
-                Dict[Source, Union[Exception, asyncio.CancelledError]],
-                {
-                    source: results[i]
-                    for i, source in enumerate(self._resamplers)
-                    # CancelledError inherits from BaseException, but we don't want
-                    # to catch *all* BaseExceptions here.
-                    if isinstance(results[i], (Exception, asyncio.CancelledError))
-                },
-            )
+
+            samples: Dict[AsyncIterator[Sample], Sample] = {}
+            exceptions: Dict[
+                AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+            ] = {}
+            for i, source in enumerate(self._resamplers):
+                if isinstance(results[i], (Exception, asyncio.CancelledError)):
+                    exceptions[source] = results[i]
+                elif isinstance(results[i], Sample):
+                    samples[source] = results[i]
+                elif isinstance(results[i], BaseException):
+                    raise results[i]
+                else:
+                    assert False, (
+                        f"Resampling result for source {source!r} "
+                        "is not an exception or Sample: {results[i]!r}"
+                    )
+
             if exceptions:
-                raise ResamplingError(exceptions)
-            if one_shot:
-                break
+                raise ResamplingError(samples, exceptions)
+
+            yield samples
 
 
 class _ResamplingHelper:
@@ -342,8 +345,7 @@ class _StreamingHelper:
     def __init__(
         self,
         helper: _ResamplingHelper,
-        source: Source,
-        sink: Sink,
+        source: AsyncIterator[Sample],
     ) -> None:
         """Initialize an instance.
 
@@ -353,8 +355,7 @@ class _StreamingHelper:
             sink: The sink to use to send the resampled data.
         """
         self._helper: _ResamplingHelper = helper
-        self._source: Source = source
-        self._sink: Sink = sink
+        self._source: AsyncIterator[Sample] = source
         self._receiving_task: asyncio.Task = asyncio.create_task(
             self._receive_samples()
         )
@@ -373,7 +374,7 @@ class _StreamingHelper:
             if sample.value is not None and not math.isnan(sample.value):
                 self._helper.add_sample(sample)
 
-    async def resample(self, timestamp: datetime) -> None:
+    def resample(self, timestamp: datetime) -> Sample:
         """Calculate a new sample for the passed `timestamp` and send it.
 
         The helper is used to calculate the new sample and the sender is used
@@ -402,4 +403,4 @@ class _StreamingHelper:
                 raise recv_exception
             raise SourceStoppedError(self._source)
 
-        await self._sink(self._helper.resample(timestamp))
+        return self._helper.resample(timestamp)

@leandro-lucarella-frequenz leandro-lucarella-frequenz force-pushed the reusable-resampler branch 2 times, most recently from f6e4174 to f9ec477 Compare December 8, 2022 15:58
@leandro-lucarella-frequenz
Copy link
Contributor Author

Rebased on the current v0.x.x and changed imports to use collections.abc.Sequence instead of typing.Sequence.

from __future__ import annotations works great! We can even use union type with the or | operator and all built-in types for type hinting!

@leandro-lucarella-frequenz
Copy link
Contributor Author

As discussed with @sahas-subramanian-frequenz , I updated the PR to add a commit to hide the timeseries.resampling module and Source and Sink so we can merge this, that have other improvements, and leave a potential rewrite to remove the callback in another PR.

@leandro-lucarella-frequenz
Copy link
Contributor Author

@leandro-lucarella-frequenz
Copy link
Contributor Author

I kept thinking about this, and if we go back to the previous approach, another issue I see with using Select is performance. Select creates a task for every receiver, each time a message is received, not sure what real performance issue this have but is definitely more work than just running a task once and keep it running forever.

I think it would be good to have a discussion about how bad having a callback really is.

@shsms
Copy link
Contributor

shsms commented Dec 9, 2022

I kept thinking about this, and if we go back to the previous approach, another issue I see with using Select is performance.

Not sure why you're thinking about going back, I thought we agreed Select is a bad idea for the resampler.

@sahas-subramanian-frequenz
Copy link
Contributor

I'm going to be commuting, please feel free to approve again and merge after rebase.

@leandro-lucarella-frequenz
Copy link
Contributor Author

Not sure why you're thinking about going back, I thought we agreed Select is a bad idea for the resampler.

I'm not thinking about necessarily going back, but my impression is there are 2 main paradigms:

  1. Use tasks to work independently for different "sources": in this case callbacks make more sense, you have a task looping on something and using a callback to do the work when there is work to be done. The alternative I can see it to make everything to use channels, even if there are no actors involved, which is not great in terms of complexity and performance. For example the resampler, where you probably just want to store the samples in a ring buffer, it doesn't make sense to have a channel to pass the generated samples to a buffer.

  2. Use a "driver" to consume from all the "sources", in this case things are simpler if we use Select, as you basically will need to handle all the sources in one place.

So if we want to avoid callbacks, Select makes sense IMHO. But yeah, for the resampler it could be a mixed approach like the one I suggested in the thread above (and copied to #107). It is just that the code becomes more convoluted than with the callback, that was very lean and clean.

llucax added 15 commits December 9, 2022 11:55
When removing outdated samples, samples falling just right at the
resapmling time are used in 2 different periods.

Even if this is not technically wrong, it is unexpected, as it would be
more natural that a moving window doesn't intersect with previous moving
windows (if using one period, or after moving the window
max_data_age_in_periods_s).

Signed-off-by: Leandro Lucarella <[email protected]>
This is useful if you want to check if some function was called with a
sequence, without caring about the concrete underlying type, so the
implementation can change and tests don't need to be updated as long
as the interface stays the same.

For example: mock.assert_called_once_with(a_sequence(1, 2, 3))

Signed-off-by: Leandro Lucarella <[email protected]>
This is a very common pattern when cancelling a task.

This also introduces a new package `util` and a module `asyncio` inside
of it.

Signed-off-by: Leandro Lucarella <[email protected]>
This is to allow adding some other base types in future commits.

Signed-off-by: Leandro Lucarella <[email protected]>
All tools provided by this package should be steps in a pipeline that
takes Source as input and produces values in a Sink.

Signed-off-by: Leandro Lucarella <[email protected]>
This also made pylint to look (more?) into the tests in tests/timeseries
so this commit also add some fixes to issues reported by pylint after
adding the tests/timeseries/__init__.py file.

Signed-off-by: Leandro Lucarella <[email protected]>
The `GroupResampler` and `Resampler` classes are not very reusable, or
at least require quite a bit of work from users to use them and they
need to take care of handling timers and calling methods at the right
time for them to be useful.

The new `Resampler` class has its own task(s) to handle all the
resampling, users only need to subscribe to new timeseries by passing a
timeseries `Source` and `Sink`. The `Sink` will be called each time a
new sample is produced.

A new `frequenz.sdk.timeseries.resampling` module is added because there
are more classes only used by the `Resampler` that might not be needed
by anyone just wanting to use `frequenz.sdk.timeseries`.

This commit also introduces the use of `async-solipsism` for testing, so
tests using timers can be mocked in a way that they don't really need to
wait for timers and time comparisons can be done precisely.

Signed-off-by: Leandro Lucarella <[email protected]>
This is to make them easier to understand. Also require to use keyword
arguments to avoid mixing up `resampling_period_s` and
`max_data_age_in_periods`.

Also remove the example from the init documentation as it was outdated
and we already provide an example in `examples/` that at least gets
actually checked for syntax and linted.

Signed-off-by: Leandro Lucarella <[email protected]>
The default resampling function, `average`, is moved from the actor to
the `timeseries.resampling` module so the `Resampler` class can have a
default too.

Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
Use built-in types and collections.abc types for type hinting as it can
be done on old Python versions when using:
    from __future__ import annotations

Also it seems like the type checker works better, because a cast is not
necessary anymore.

So imports from typing are needed still because this doesn't work for
type aliases, they still need stuff from typing.

Signed-off-by: Leandro Lucarella <[email protected]>
We are not convinced about the current implementation using a callback
(Sink), so we don't want to expose this just yet.

Signed-off-by: Leandro Lucarella <[email protected]>
@leandro-lucarella-frequenz
Copy link
Contributor Author

Ah, damn, I forgot to revert the release notes which talks about the resampler. 🤷‍♂️

@@ -1,171 +1,531 @@
# License: MIT
# tests/timeseries/test_resampling.py:93: License: MIT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a deliberate change or an accident?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very weird accident! Good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sneaked a fix in #123. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:actor Affects an actor ot the actors utilities (decorator, etc.) part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) type:enhancement New feature or enhancement visitble to users

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make the resampler class more re-usable

6 participants