Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def connect_to_upstream(

def serialize_graph(graph: lg.Graph) -> SerializedGraph:
"""
A function that returns a serialized version of the graph topology.
A function that returns a serialized version of the graph topology

@params:
name: The name of the graph
Expand Down Expand Up @@ -200,34 +200,49 @@ def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]:
@return: A dictionary where the key is a publisher grouping
and the value is a dictionary of sets of topic paths subscribed and their groupings
"""
sub_pub_grouping_map: Dict[str, str] = {}
sub_pub: Dict[str, str] = {}
for stream in graph.__streams__.values():
difference = set(stream.topic_paths).difference(LabgraphMonitorNode.in_edges)
if difference:
upstream_edge = max(difference, key=len)
for edge in stream.topic_paths:
if edge != upstream_edge:
# convert SERIALIZER/SERIALIZER_INPUT_1 to its grouping Serializer
edge_path = "/".join(edge.split("/")[:-1])
edge_grouping = type(graph.__descendants__[edge_path]).__name__
edge_grouping = get_grouping(edge, graph)

# convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator
upstream_edge_grouping = get_grouping(upstream_edge, graph)

# convert SERIALIZER/SERIALIZER_INPUT_1 to its topic SERIALIZER_INPUT_1
topic_path = edge.split("/")[-1]

# convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator
group_path = "/".join(upstream_edge.split("/")[:-1])
grouping = type(graph.__descendants__[group_path]).__name__

if grouping in sub_pub_grouping_map:
sub_pub_grouping_map[grouping]["topics"].add(topic_path)
sub_pub_grouping_map[grouping]["subscribers"].add(edge_grouping)
if upstream_edge_grouping in sub_pub:
sub_pub[upstream_edge_grouping]["topics"].add(topic_path)
sub_pub[upstream_edge_grouping]["subscribers"].add(edge_grouping)
else:
sub_pub_grouping_map[grouping] = {
sub_pub[upstream_edge_grouping] = {
"topics": {topic_path},
"subscribers": {edge_grouping},
}

return sub_pub_grouping_map
return sub_pub

def get_grouping(edge: str, graph: lg.Graph) -> str:
"""
A function that returns grouping of a given node
using Graph attributes

@params:
edge: A str representing edge's group and topic combined
e.g. SERIALIZER/SERIALIZER_INPUT_1

@return: The name of the grouping following lg.Graph's attributes
e.g. Serializer
"""
path = "/".join(edge.split("/")[:-1])
grouping = type(graph.__descendants__[path]).__name__

return grouping

def generate_graph_topology(graph: lg.Graph) -> SerializedGraph:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def add_message_1(self, message: RandomMessage) -> None:
self.state.data_1 = {
"grouping": grouping,
"timestamp": message.timestamp,
"numpy": list(message.data),
"data": list(message.data),
}

@lg.subscriber(SERIALIZER_INPUT_2)
Expand All @@ -64,7 +64,7 @@ def add_message_2(self, message: RandomMessage) -> None:
self.state.data_2 = {
"grouping": grouping,
"timestamp": message.timestamp,
"numpy": list(message.data),
"data": list(message.data),
}

@lg.subscriber(SERIALIZER_INPUT_3)
Expand All @@ -73,7 +73,7 @@ def add_message_3(self, message: RandomMessage) -> None:
self.state.data_3 = {
"grouping": grouping,
"timestamp": message.timestamp,
"numpy": list(message.data),
"data": list(message.data),
}

@lg.subscriber(SERIALIZER_INPUT_4)
Expand All @@ -82,7 +82,7 @@ def add_message_4(self, message: RandomMessage) -> None:
self.state.data_4 = {
"grouping": grouping,
"timestamp": message.timestamp,
"numpy": list(message.data),
"data": list(message.data),
}

def output(self, _in: Dict) -> Dict:
Expand All @@ -96,7 +96,7 @@ def output(self, _in: Dict) -> Dict:
for state in self.state.__dict__.values():
if state["grouping"] in value["upstreams"].keys():
value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"]
value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"]
value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["data"]

return _in

Expand Down