Skip to content

Implement a data sourcing actor #30

@tiyash-basu-frequenz

Description

@tiyash-basu-frequenz

What's needed?

We need to implement an actor that can expose data from different sources to the downstream layers.

Component data can come from different sources, e.g., the Microgrid API, the Reporting API, and parquet files in local storage.
The data format in these sources can be different.

There may be mutiple downstream actors, each of them reading from multiple sources.
If they all have mechanisms to convert data into one common format for further consumption, then:

  • they would each need to understand how all the different data sources work, causing high coupling, and
  • they would very likely run similar conversions, wasting CPU cycles

Proposed solution

The solution would be to create an actor that

  • can read multiple data sources, e.g., the Microgrid API, and parquet files in local storage,
  • can expose data in a common format that abstracts the underlying data source away,
  • can switch between input data streams seamlessly, based upon well-defined conditions (see below for details).

Note that in the first version (v1), this actor needs to consume data from the following sources only:

  • Microgrid API
  • parquet files in local storage

In order for it to identify the data channel and the data source(s), the requests to the actor should consists of the following parameters:
a. the component ID
b. the metric ID
c. the start timestamp, or the timestamp from which data should be read

If the start timestamp is in past, then the local parquet files should be read first. Once there is no more data in the parquet files, then data from the live data source (Microgrid API) should be streamed.

The top-level building blocks of the actor could be a set of classes as follows
(Note that these are just pseudocoded ideas to specify requirements. The actual implementation may vary):

# This is the class that uniquely represents a metric channel.
@dataclass
class ChannelKey:
    # An enum or a unique ID representing the metric.
    metric_id: str

    # The component ID
    component_id: int

    # The time from which data should be consumed.
    start_ts: datetime

See ComponentMetricKey in issue #5 for more details on ChannelKey.

# This is the class that consumes and forwards data for one given channel.
# The actor can have a collection of the objects of this class.
class DataIngestor:
  _channel: Option[BroadcastChannel[float]]

  def __init_(self):
    # Initialize `_channel` as `None`
    self._channel = None
    ...

  def get_channel(self, data_key: ChannelKey) -> BroadcastChannel[float]:
    # This method is key here.
    #
    # Consume data from different sources, and stream them downstream via an
    # output channel. The user has to input the details of which
    # metric has to be consumed and from what time. The user gets a channel with
    # the desired data in return.
    #
    # If `data_key.start_ts` is `now()`, then read directly from the
    # Microgrid API.
    #
    # If `data_key.start_ts` is in the past, e.g., `now() - 30m`, then read the
    # parquet files until there is no more data there. Then read from the
    # microgrid API.
    #
    # If `data_key.start_ts` is in the future, e.g., `now() + 30m`, then wait
    # until the given timestamp is reached. Then read from the microgrid API.

    # Return `self._channel` if it already exists, otherwise create a new one.
    ...

The actor itself could be defined as follows:

@dataclass
class DataRequestKey:
  # The details of the metric to read data for, and the timesamo from which the
  # data should be read.
  data_key: ChannelKey

  # Can have more elements as required.

class DataSourcingActor:
  # The idea here is to store previously returned channels, so that we reuse them
  # insted of creating new ones every time. The data structure can be a `dict`, or
  # can be some other custom data structure, depending upon the following
  # requirements:
  #
  # Two `ChannelKey` objects with `start_ts >= now()` are
  # effectively the same, since they consume live data from the Microgrid API.
  #
  # However, two `ChannelKey` objects that need historical data from the
  # same `start_ts` timestamp may not be the same. Assuming the two
  # `ChannelKey` objects are received at times 00:00:00 and 01:00:00,
  # then the 2nd one will still require to start from the the first historical
  # data sample, whereas the 1st one could be several data samples ahead or
  # reading live data by then.
  _data_ingestors: Dict[DataRequestKey, DataIngestor]

  def __init__(self):
    # Initialize the `_data_ingestors` object.
    self._data_ingestors = {}
    ...

  def get_channel(self, data_key: ChannelKey):
    data_req_key = DataRequestKey(data_key)

    # If a `DataIngestor`1 object has not yet been created, then create it.
    if not self._data_ingestors.contains(data_req_key):
      self._data_ingestors[data_req_key] = DataIngestor()

    # Return the channel from the `DataNgestor` instance.
    return self._data_ingestors[data_req_key].get_channel()

Use cases

No response

Alternatives and workarounds

No response

Additional context

No response

Metadata

Metadata

Labels

part:data-pipelineAffects the data pipelinepriority:highAddress this as soon as possibletype:enhancementNew feature or enhancement visitble to users

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions