Skip to content

Commit f9501b4

Browse files
committed
Added a simple example for using TimeSeriesResamplingActor
Signed-off-by: Jakub Kozlowicz <[email protected]>
1 parent d8be314 commit f9501b4

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed

examples/sdk_resampling_example.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Frequenz Python SDK resampling example.
2+
3+
Copyright
4+
Copyright © 2022 Frequenz Energy-as-a-Service GmbH
5+
6+
License
7+
MIT
8+
"""
9+
10+
import asyncio
11+
12+
from frequenz.channels import Broadcast, MergeNamed
13+
14+
from frequenz.sdk.actor import ChannelRegistry
15+
from frequenz.sdk.actor.data_sourcing import DataSourcingActor
16+
from frequenz.sdk.data_ingestion.resampling.component_metrics_resampling_actor import (
17+
ComponentMetricsResamplingActor,
18+
)
19+
from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest
20+
from frequenz.sdk.microgrid import ComponentCategory, microgrid_api
21+
22+
HOST = "microgrid.sandbox.api.frequenz.io"
23+
PORT = 61060
24+
25+
26+
async def run() -> None:
27+
"""Run main functions that initializes and creates everything."""
28+
await microgrid_api.initialize(HOST, PORT)
29+
30+
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
31+
32+
# Create a channels for sending/receiving subscription requests
33+
data_source_request_channel = Broadcast[ComponentMetricRequest](
34+
"Data Source Request Channel"
35+
)
36+
data_source_request_sender = data_source_request_channel.get_sender()
37+
data_source_request_receiver = data_source_request_channel.get_receiver()
38+
39+
resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
40+
"Resampling Actor Request Channel"
41+
)
42+
resampling_actor_request_sender = resampling_actor_request_channel.get_sender()
43+
resampling_actor_request_receiver = resampling_actor_request_channel.get_receiver()
44+
45+
# Instantiate a data sourcing actor
46+
_data_sourcing_actor = DataSourcingActor(
47+
request_receiver=data_source_request_receiver, registry=channel_registry
48+
)
49+
50+
# Instantiate a resampling actor
51+
_resampling_actor = ComponentMetricsResamplingActor(
52+
channel_registry=channel_registry,
53+
subscription_sender=data_source_request_sender,
54+
subscription_receiver=resampling_actor_request_receiver,
55+
resampling_period_s=1.0,
56+
)
57+
58+
components = await microgrid_api.get().microgrid_api_client.components()
59+
battery_ids = [
60+
comp.component_id
61+
for comp in components
62+
if comp.category == ComponentCategory.BATTERY
63+
]
64+
65+
# Create subscription requests for each time series id
66+
subscription_requests = [
67+
ComponentMetricRequest(
68+
namespace="Resampling",
69+
component_id=component_id,
70+
metric_id=ComponentMetricId.SOC,
71+
start_time=None,
72+
)
73+
for component_id in battery_ids
74+
]
75+
76+
# Send the subscription requests
77+
await asyncio.gather(
78+
*[
79+
resampling_actor_request_sender.send(request)
80+
for request in subscription_requests
81+
]
82+
)
83+
84+
# Store sample receivers for each subscription
85+
sample_receiver = MergeNamed(
86+
**{
87+
channel_name: channel_registry.get_receiver(channel_name)
88+
for channel_name in map(
89+
lambda req: req.get_channel_name(), subscription_requests
90+
)
91+
}
92+
)
93+
94+
async for channel_name, msg in sample_receiver:
95+
print(msg)
96+
97+
98+
asyncio.run(run())

0 commit comments

Comments
 (0)