Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,28 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- `frequenz.sdk.timseries`:
- The resample classes in the `frequenz.sdk.timseries` were removed. Use the new `frequenz.sdk.timseries.resampling.Resampler` class instead.
- The `ResamplingFunction` was moved to the new module `frequenz.sdk.timseries.resampling`.

- `frequenz.sdk.actor.ComponentMetricsResamplingActor`:
- The constructor now requires to pass all arguments as keywords.
- The following constructor arguments were renamed to make them more clear:
- `subscription_sender` -> `data_sourcing_request_sender`
- `subscription_receiver` -> `resampling_request_receiver`


## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- New in `frequenz.sdk.timeseries`:

- `Source` and `Sink` types to work generically with streaming timeseries.

- New `frequenz.sdk.timeseries.resampling` module with:
- `Resampler` class that works on timseries `Source` and `Sink`.
- `ResamplingFunction` (moved from `frequenz.sdk.timeseries`).
- `ResamplingError` and `SourceStoppedError` exceptions.
- `average` function (the default resampling function).

## Bug Fixes

Expand Down
80 changes: 59 additions & 21 deletions examples/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"""Frequenz Python SDK resampling example."""

import asyncio
from datetime import datetime, timezone

from frequenz.channels import Broadcast
from frequenz.channels.util import MergeNamed
from frequenz.channels.util import Merge

from frequenz.sdk import microgrid
from frequenz.sdk.actor import (
Expand All @@ -16,11 +17,29 @@
DataSourcingActor,
)
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source

HOST = "microgrid.sandbox.api.frequenz.io"
PORT = 61060


async def _calculate_average(source: Source, sink: Sink) -> None:
avg: float = 0.0
count: int = 0
async for sample in source:
print(f"Received sample to average at {sample.timestamp}: {sample.value}")
count += 1
if sample.value is None:
continue
avg = avg * (count - 1) / count + sample.value / count
await sink(Sample(datetime.now(timezone.utc), avg))


async def _print_sample(sample: Sample) -> None:
print(f"\nResampled average at {sample.timestamp}: {sample.value}\n")


async def run() -> None: # pylint: disable=too-many-locals
"""Run main functions that initializes and creates everything."""
await microgrid.initialize(HOST, PORT)
Expand All @@ -32,9 +51,9 @@ async def run() -> None: # pylint: disable=too-many-locals
data_source_request_sender = data_source_request_channel.new_sender()
data_source_request_receiver = data_source_request_channel.new_receiver()

resampling_actor_request_channel = Broadcast[ComponentMetricRequest]("resample")
resampling_actor_request_sender = resampling_actor_request_channel.new_sender()
resampling_actor_request_receiver = resampling_actor_request_channel.new_receiver()
resampling_request_channel = Broadcast[ComponentMetricRequest]("resample")
resampling_request_sender = resampling_request_channel.new_sender()
resampling_request_receiver = resampling_request_channel.new_receiver()

# Instantiate a data sourcing actor
_data_sourcing_actor = DataSourcingActor(
Expand All @@ -44,9 +63,9 @@ async def run() -> None: # pylint: disable=too-many-locals
# Instantiate a resampling actor
_resampling_actor = ComponentMetricsResamplingActor(
channel_registry=channel_registry,
subscription_sender=data_source_request_sender,
subscription_receiver=resampling_actor_request_receiver,
resampling_period_s=1.0,
data_sourcing_request_sender=data_source_request_sender,
resampling_request_receiver=resampling_request_receiver,
resampling_period_s=1,
)

components = await microgrid.get().api_client.components()
Expand All @@ -56,7 +75,9 @@ async def run() -> None: # pylint: disable=too-many-locals
if comp.category == ComponentCategory.BATTERY
]

# Create subscription requests for each time series id
print(f"Found {len(battery_ids)} batteries: {battery_ids}")

# Create subscription requests for each battery's SoC
subscription_requests = [
ComponentMetricRequest(
namespace="resampling",
Expand All @@ -69,24 +90,41 @@ async def run() -> None: # pylint: disable=too-many-locals

# Send the subscription requests
await asyncio.gather(
*[
resampling_actor_request_sender.send(request)
for request in subscription_requests
]
*[resampling_request_sender.send(request) for request in subscription_requests]
)

# Merge sample receivers for each subscription into one receiver
merged_receiver = MergeNamed(
**{
req.get_channel_name(): channel_registry.new_receiver(
req.get_channel_name()
)
merged_receiver = Merge(
*[
channel_registry.new_receiver(req.get_channel_name())
for req in subscription_requests
}
]
)

async for channel_name, msg in merged_receiver:
print(f"{channel_name}: {msg}")
# Create a channel to calculate an average for all the data
average_chan = Broadcast[Sample]("average")

second_stage_resampler = Resampler(resampling_period_s=3.0)
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)

average_sender = average_chan.new_sender()
# Needed until channels Senders raises exceptions on errors
async def sink_adapter(sample: Sample) -> None:
assert await average_sender.send(sample)

print("Starting...")

try:
# This will run until it is interrupted (with Ctrl-C for example)
await asyncio.gather(
_calculate_average(merged_receiver, sink_adapter),
second_stage_resampler.resample(),
)
finally:
await second_stage_resampler.stop()


asyncio.run(run())
try:
asyncio.run(run())
except KeyboardInterrupt:
print("Bye!")
9 changes: 8 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@

FMT_DEPS = ["black", "isort"]
DOCSTRING_DEPS = ["pydocstyle", "darglint"]
PYTEST_DEPS = ["pytest", "pytest-cov", "pytest-mock", "pytest-asyncio", "time-machine"]
PYTEST_DEPS = [
"pytest",
"pytest-cov",
"pytest-mock",
"pytest-asyncio",
"time-machine",
"async-solipsism",
]
MYPY_DEPS = ["mypy", "pandas-stubs", "grpc-stubs"]


Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ src_paths = ["src", "examples", "tests"]
asyncio_mode = "auto"
required_plugins = [ "pytest-asyncio", "pytest-mock" ]

[[tools.mypy.overrides]]
[[tool.mypy.overrides]]
module = [
"grpc.aio",
"grpc.aio.*"
]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "async_solipsism"
ignore_missing_imports = true
Loading