Skip to content

Commit 1fe75cc

Browse files
committed
Hide the resampling module and Source/Sink from timeseries
We are not convinced about the current implementation using a callback (Sink), so we don't want to expose this just yet. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 46427cd commit 1fe75cc

File tree

6 files changed

+35
-52
lines changed

6 files changed

+35
-52
lines changed

examples/resampling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
DataSourcingActor,
1818
)
1919
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
20-
from frequenz.sdk.timeseries import Sample, Sink, Source
21-
from frequenz.sdk.timeseries.resampling import Resampler
20+
from frequenz.sdk.timeseries import Sample
21+
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
2222

2323
HOST = "microgrid.sandbox.api.frequenz.io"
2424
PORT = 61060

src/frequenz/sdk/actor/_resampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from frequenz.sdk.util.asyncio import cancel_and_await
1515

1616
from ..timeseries import Sample
17-
from ..timeseries.resampling import (
17+
from ..timeseries._resampling import (
1818
Resampler,
1919
ResamplingError,
2020
ResamplingFunction,

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,10 @@
66
77
A timeseries is a stream (normally an async iterator) of
88
[`Sample`][frequenz.sdk.timeseries.Sample]s.
9-
10-
Timeseries can be consumed from
11-
[`Source`][frequenz.sdk.timeseries.Source]s and new timeseries can be
12-
generated by sending samples to
13-
a [`Sink`][frequenz.sdk.timeseries.Sink]
14-
15-
In general timeseries `Source`s don't necessarily come at periodic
16-
intervals. To normalize timeseries to produce `Sample`s at regular
17-
periodic intervals,
18-
a [`Resampler`][frequenz.sdk.timeseries.resampling.Resampler] can be used.
19-
20-
A `Resampler` uses
21-
a [`ResamplingFunction`][frequenz.sdk.timeseries.resampling.ResamplingFunction]
22-
to produce a new sample from samples received in the past. If there are no
23-
samples coming to a resampled timeseries for a while, eventually the
24-
`Resampler` will produce `Sample`s with `None` as value, meaning there is no
25-
way to produce meaningful samples with the available data.
269
"""
2710

28-
from ._base_types import Sample, Sink, Source
11+
from ._base_types import Sample
2912

3013
__all__ = [
3114
"Sample",
32-
"Sink",
33-
"Source",
3415
]

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from dataclasses import dataclass
77
from datetime import datetime
8-
from typing import AsyncIterator, Callable, Coroutine, Optional
8+
from typing import Optional
99

1010

1111
@dataclass(frozen=True)
@@ -19,27 +19,3 @@ class Sample:
1919

2020
timestamp: datetime
2121
value: Optional[float] = None
22-
23-
24-
Source = AsyncIterator[Sample]
25-
"""A source for a timeseries.
26-
27-
A timeseries can be received sample by sample in a streaming way
28-
using a source.
29-
"""
30-
31-
Sink = Callable[[Sample], Coroutine[None, None, None]]
32-
"""A sink for a timeseries.
33-
34-
A new timeseries can be generated by sending samples to a sink.
35-
36-
This should be an `async` callable, for example:
37-
38-
``` python
39-
async some_sink(Sample) -> None:
40-
...
41-
```
42-
43-
Args:
44-
sample (Sample): A sample to be sent out.
45-
"""

src/frequenz/sdk/timeseries/resampling.py renamed to src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,40 @@
1010
import math
1111
from collections import deque
1212
from datetime import datetime, timedelta
13-
from typing import Callable, Sequence
13+
from typing import AsyncIterator, Callable, Coroutine, Sequence
1414

1515
from frequenz.channels.util import Timer
1616

1717
from ..util.asyncio import cancel_and_await
18-
from . import Sample, Sink, Source
18+
from . import Sample
1919

2020
_logger = logging.Logger(__name__)
2121

2222

23+
Source = AsyncIterator[Sample]
24+
"""A source for a timeseries.
25+
26+
A timeseries can be received sample by sample in a streaming way
27+
using a source.
28+
"""
29+
30+
Sink = Callable[[Sample], Coroutine[None, None, None]]
31+
"""A sink for a timeseries.
32+
33+
A new timeseries can be generated by sending samples to a sink.
34+
35+
This should be an `async` callable, for example:
36+
37+
``` python
38+
async some_sink(Sample) -> None:
39+
...
40+
```
41+
42+
Args:
43+
sample (Sample): A sample to be sent out.
44+
"""
45+
46+
2347
ResamplingFunction = Callable[[Sequence[Sample], float], float]
2448
"""Resampling function type.
2549

tests/timeseries/test_resampling.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
import time_machine
1515
from frequenz.channels import Broadcast
1616

17-
from frequenz.sdk.timeseries import Sample, Sink, Source
18-
from frequenz.sdk.timeseries.resampling import (
17+
from frequenz.sdk.timeseries import Sample
18+
from frequenz.sdk.timeseries._resampling import (
1919
Resampler,
2020
ResamplingError,
2121
ResamplingFunction,
22+
Sink,
23+
Source,
2224
SourceStoppedError,
2325
)
2426

0 commit comments

Comments
 (0)