diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 7f50c183..5db85e9e 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -141,7 +141,7 @@ def connect_to_upstream( def serialize_graph(graph: lg.Graph) -> SerializedGraph: """ - A function that returns a serialized version of the graph topology. + A function that returns a serialized version of the graph topology @params: name: The name of the graph @@ -200,7 +200,7 @@ def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: @return: A dictionary where the key is a publisher grouping and the value is a dictionary of sets of topic paths subscribed and their groupings """ - sub_pub_grouping_map: Dict[str, str] = {} + sub_pub: Dict[str, str] = {} for stream in graph.__streams__.values(): difference = set(stream.topic_paths).difference(LabgraphMonitorNode.in_edges) if difference: @@ -208,26 +208,41 @@ def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: for edge in stream.topic_paths: if edge != upstream_edge: # convert SERIALIZER/SERIALIZER_INPUT_1 to its grouping Serializer - edge_path = "/".join(edge.split("/")[:-1]) - edge_grouping = type(graph.__descendants__[edge_path]).__name__ + edge_grouping = get_grouping(edge, graph) + + # convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator + upstream_edge_grouping = get_grouping(upstream_edge, graph) # convert SERIALIZER/SERIALIZER_INPUT_1 to its topic SERIALIZER_INPUT_1 topic_path = edge.split("/")[-1] - # convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator - group_path = "/".join(upstream_edge.split("/")[:-1]) - grouping = type(graph.__descendants__[group_path]).__name__ - - if grouping in sub_pub_grouping_map: - sub_pub_grouping_map[grouping]["topics"].add(topic_path) - sub_pub_grouping_map[grouping]["subscribers"].add(edge_grouping) + if upstream_edge_grouping in sub_pub: + sub_pub[upstream_edge_grouping]["topics"].add(topic_path) + sub_pub[upstream_edge_grouping]["subscribers"].add(edge_grouping) else: - sub_pub_grouping_map[grouping] = { + sub_pub[upstream_edge_grouping] = { "topics": {topic_path}, "subscribers": {edge_grouping}, } - return sub_pub_grouping_map + return sub_pub + +def get_grouping(edge: str, graph: lg.Graph) -> str: + """ + A function that returns grouping of a given node + using Graph attributes + + @params: + edge: A str representing edge's group and topic combined + e.g. SERIALIZER/SERIALIZER_INPUT_1 + + @return: The name of the grouping following lg.Graph's attributes + e.g. Serializer + """ + path = "/".join(edge.split("/")[:-1]) + grouping = type(graph.__descendants__[path]).__name__ + + return grouping def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: """ diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 5395bd3e..86c44513 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -55,7 +55,7 @@ def add_message_1(self, message: RandomMessage) -> None: self.state.data_1 = { "grouping": grouping, "timestamp": message.timestamp, - "numpy": list(message.data), + "data": list(message.data), } @lg.subscriber(SERIALIZER_INPUT_2) @@ -64,7 +64,7 @@ def add_message_2(self, message: RandomMessage) -> None: self.state.data_2 = { "grouping": grouping, "timestamp": message.timestamp, - "numpy": list(message.data), + "data": list(message.data), } @lg.subscriber(SERIALIZER_INPUT_3) @@ -73,7 +73,7 @@ def add_message_3(self, message: RandomMessage) -> None: self.state.data_3 = { "grouping": grouping, "timestamp": message.timestamp, - "numpy": list(message.data), + "data": list(message.data), } @lg.subscriber(SERIALIZER_INPUT_4) @@ -82,7 +82,7 @@ def add_message_4(self, message: RandomMessage) -> None: self.state.data_4 = { "grouping": grouping, "timestamp": message.timestamp, - "numpy": list(message.data), + "data": list(message.data), } def output(self, _in: Dict) -> Dict: @@ -96,7 +96,7 @@ def output(self, _in: Dict) -> Dict: for state in self.state.__dict__.values(): if state["grouping"] in value["upstreams"].keys(): value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"] - value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"] + value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["data"] return _in