Skip to content

Commit 81c5a71

Browse files
committed
Added a simple example for using TimeSeriesResamplingActor
Signed-off-by: Jakub Kozlowicz <[email protected]>
1 parent 00ca99a commit 81c5a71

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed

examples/sdk_resampling_example.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""Frequenz Python SDK resampling example.
2+
3+
Copyright
4+
Copyright © 2022 Frequenz Energy-as-a-Service GmbH
5+
6+
License
7+
MIT
8+
"""
9+
10+
import asyncio
11+
import random
12+
from datetime import datetime
13+
from typing import Dict
14+
15+
import pytz
16+
from frequenz.channels import Broadcast, MergeNamed, Receiver, Select, Sender
17+
18+
from frequenz.sdk.core.timeseries import TimeSeriesId
19+
from frequenz.sdk.data_ingestion.resampling.component_metric_resampler import Sample
20+
from frequenz.sdk.data_ingestion.resampling.data_provider import DataProvider
21+
from frequenz.sdk.data_ingestion.resampling.subscriptions import (
22+
Subscription,
23+
SubscriptionRequest,
24+
SubscriptionResponse,
25+
)
26+
from frequenz.sdk.data_ingestion.resampling.timeseries_resampling_actor import (
27+
TimeSeriesResamplingActor,
28+
)
29+
30+
31+
async def data_sender(sender: Sender[Sample]) -> None:
32+
"""Send a message to the channel every 0.2 seconds.
33+
34+
Args:
35+
sender: channel to be used to send random component data
36+
"""
37+
while True:
38+
value = random.uniform(1.0, 100.0)
39+
await sender.send(Sample(timestamp=datetime.now(tz=pytz.UTC), value=value))
40+
await asyncio.sleep(random.uniform(0.1, 0.5))
41+
42+
43+
class DummyDataProvider(DataProvider):
44+
"""Dummy Data Provider for presentation purposes."""
45+
46+
def __init__(self) -> None:
47+
"""Create an instance and initialize receivers."""
48+
self._receivers: Dict[Subscription, Receiver[Sample]] = {}
49+
50+
async def subscribe(self, subscription: Subscription) -> SubscriptionResponse:
51+
"""Pass a subscription to the data source and return a subscription response.
52+
53+
Args:
54+
subscription: object of the subscription (time_series_id, start_time)
55+
56+
Returns:
57+
response containing a sample receiver for the time series id provided in
58+
the subscription object
59+
"""
60+
receiver = self._receivers.get(subscription)
61+
if receiver is None:
62+
channel = Broadcast[Sample](f"Source {subscription.time_series_id}")
63+
asyncio.create_task(data_sender(channel.get_sender()))
64+
receiver = channel.get_receiver()
65+
self._receivers[subscription] = receiver
66+
return SubscriptionResponse(receiver=receiver, subscription=subscription)
67+
68+
69+
async def run() -> None:
70+
"""Run main functions that initializes and creates everything."""
71+
data_provider: DataProvider = DummyDataProvider()
72+
73+
# Create a channel for receiving subscription responses from the resampling actor
74+
subscription_response_channel = Broadcast[SubscriptionResponse](
75+
"Subscription Response Channel"
76+
)
77+
subscription_response_sender = subscription_response_channel.get_sender()
78+
subscription_response_receiver = subscription_response_channel.get_receiver()
79+
80+
# Create a channel for sending subscription requests to the resampling actor
81+
subscription_request_channel = Broadcast[SubscriptionRequest](
82+
"Subscription Request Channel"
83+
)
84+
subscription_request_sender = subscription_request_channel.get_sender()
85+
86+
# Instantiate a resampling actor
87+
_resampling_actor = TimeSeriesResamplingActor(
88+
data_provider=data_provider,
89+
resampling_period_s=1.0,
90+
subscription_receiver=subscription_request_channel.get_receiver(),
91+
)
92+
93+
# Declare time series ids to subscribe to for data
94+
time_series_ids = [
95+
TimeSeriesId(component_id=123, metric_id="active_power"),
96+
TimeSeriesId(component_id=99, metric_id="active_power"),
97+
]
98+
99+
# Create subscription requests for each time series id and provide a sender that
100+
# the resampling actor should use to send back subscription responses
101+
subscription_requests = [
102+
SubscriptionRequest(
103+
sender=subscription_response_sender,
104+
subscription=Subscription(time_series_id=time_series_id),
105+
)
106+
for time_series_id in time_series_ids
107+
]
108+
109+
# Send the subscription requests
110+
await asyncio.gather(
111+
*[
112+
subscription_request_sender.send(request)
113+
for request in subscription_requests
114+
]
115+
)
116+
117+
# Store sample receivers for each subscription
118+
sample_receivers: Dict[Subscription, Receiver[Sample]] = {}
119+
120+
while True:
121+
select = Select(
122+
subscription_response_receiver=subscription_response_receiver,
123+
sample_receiver=MergeNamed(
124+
**{
125+
str(subscription.time_series_id): receiver
126+
for subscription, receiver in sample_receivers.items()
127+
}
128+
),
129+
)
130+
while await select.ready():
131+
# Whenever a subscription response is received from the resampling actor,
132+
# it contains a receiver used to consume data for that subscription.
133+
# The subscription needs to be stored and the inner while loop restarted
134+
# in order to regenerate data receivers passed to the Select
135+
if msg := select.subscription_response_receiver:
136+
response: SubscriptionResponse = msg.inner
137+
sample_receivers[response.subscription] = response.receiver
138+
break
139+
140+
if msg := select.sample_receiver:
141+
if msg.inner is not None:
142+
time_series_id, sample = msg.inner
143+
print(f"{time_series_id} => {sample}")
144+
145+
146+
asyncio.run(run())

0 commit comments

Comments
 (0)