Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_info():
},
},
"betweenness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19",
"additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
Expand All @@ -98,7 +98,7 @@ def get_info():
},
},
"edge_betweenness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99",
"additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
Expand Down
84 changes: 46 additions & 38 deletions nx_parallel/algorithms/centrality/betweenness.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from joblib import Parallel, delayed
from networkx.algorithms.centrality.betweenness import (
_accumulate_basic,
_accumulate_endpoints,
Expand Down Expand Up @@ -38,31 +37,35 @@ def betweenness_centrality(
iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""

if hasattr(G, "graph_object"):
G = G.graph_object

if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)

n_jobs = nxp.get_n_jobs()
def process_func(G, chunk, weight, endpoints):
return _betweenness_centrality_node_subset(
G, chunk, weight=weight, endpoints=endpoints
)

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunk.execute_parallel(
G,
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
endpoints=endpoints,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for n in bt:
bt_c[n] += bt[n]
bt_c = {}
for bt in bt_cs:
for n, value in bt.items():
bt_c[n] = bt_c.get(n, 0.0) + value

betweenness = _rescale(
bt_c,
Expand Down Expand Up @@ -94,7 +97,12 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False):
@nxp._configure_if_nx_active()
@py_random_state(4)
def edge_betweenness_centrality(
G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks"
G,
k=None,
normalized=True,
weight=None,
seed=None,
get_chunks="nodes",
):
"""The parallel computation is implemented by dividing the nodes into chunks and
computing edge betweenness centrality for each chunk concurrently.
Expand All @@ -111,28 +119,28 @@ def edge_betweenness_centrality(
if hasattr(G, "graph_object"):
G = G.graph_object

if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)
def process_func(G, chunk, weight):
return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight)

n_jobs = nxp.get_n_jobs()

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunk.execute_parallel(
G,
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for e in bt:
bt_c[e] += bt[e]
bt_c = {}
for partial_bt in bt_cs:
for edge, value in partial_bt.items():
bt_c[edge] = bt_c.get(edge, 0.0) + value

for n in G: # remove nodes to only return edges
del bt_c[n]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,41 @@
import math


def test_betweenness_centrality_get_chunks():
def get_chunk(nodes):
def test_edge_betweenness_centrality_get_chunks():
def get_chunk(edges):
num_chunks = nxp.get_n_jobs()
nodes_ebc = {i: 0 for i in nodes}
for i in ebc:
nodes_ebc[i[0]] += ebc[i]
nodes_ebc[i[1]] += ebc[i]

sorted_nodes = sorted(nodes_ebc.items(), key=lambda x: x[1], reverse=True)
edges = list(edges)

chunks = [[] for _ in range(num_chunks)]
chunk_sums = [0] * num_chunks
# Split edges into chunks without relying on precomputed centrality
chunk_size = max(1, len(edges) // num_chunks)
chunks = [edges[i : i + chunk_size] for i in range(0, len(edges), chunk_size)]

for node, value in sorted_nodes:
min_chunk_index = chunk_sums.index(min(chunk_sums))
chunks[min_chunk_index].append(node)
chunk_sums[min_chunk_index] += value
print(f"Chunks distribution: {chunks}")

return chunks

G = nx.fast_gnp_random_graph(100, 0.1, directed=False)
H = nxp.ParallelGraph(G)
ebc = nx.edge_betweenness_centrality(G)
par_bc_chunk = nxp.betweenness_centrality(H, get_chunks=get_chunk) # smoke test
par_bc = nxp.betweenness_centrality(H)

for i in range(len(G.nodes)):
assert math.isclose(par_bc[i], par_bc_chunk[i], abs_tol=1e-16)
# get_chunk is faster than default(for big graphs)
# G = nx.bipartite.random_graph(400, 700, 0.8, seed=5, directed=False)

ebc = nx.edge_betweenness_centrality(G, normalized=True)

print(f"NetworkX Edge Betweenness Centrality: {ebc}")

backend = nxp.BackendInterface()

# Smoke test for edge_betweenness_centrality with custom get_chunks
par_bc_chunk = backend.edge_betweenness_centrality(
H.graph_object,
get_chunks=get_chunk,
)

print(f"Parallel Computed Edge Betweenness Centrality: {par_bc_chunk}")

# Compare with standard edge betweenness centrality
standard_bc = nx.edge_betweenness_centrality(G, normalized=True)

for edge in standard_bc:
assert math.isclose(
par_bc_chunk[edge], standard_bc[edge], abs_tol=1e-6
), f"Edge {edge} mismatch: {par_bc_chunk[edge]} vs {standard_bc[edge]}"
6 changes: 3 additions & 3 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def __str__(self):
def assign_algorithms(cls):
"""Class decorator to assign algorithms to the class attributes."""
for attr in ALGORITHMS:
# get the function name by parsing the module hierarchy
func_name = attr.rsplit(".", 1)[-1]
setattr(cls, func_name, attrgetter(attr)(algorithms))
setattr(
cls, attr.rsplit(".", 1)[-1], staticmethod(attrgetter(attr)(algorithms))
)
return cls


Expand Down
35 changes: 24 additions & 11 deletions nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# smoke tests for all functions supporting `get_chunks` kwarg

import inspect
import importlib
import random
Expand All @@ -10,7 +8,7 @@
import nx_parallel as nxp


def get_all_functions(package_name="nx_parallel"):
def get_all_functions(package_name="nx_parallel.algorithms"):
"""Returns a dict keyed by function names to its arguments.

This function constructs a dictionary keyed by the function
Expand All @@ -32,8 +30,8 @@ def get_functions_with_get_chunks():
"""Returns a list of function names with the `get_chunks` kwarg."""
all_funcs = get_all_functions()
get_chunks_funcs = []
for func in all_funcs:
if "get_chunks" in all_funcs[func]["args"]:
for func, params in all_funcs.items():
if "get_chunks" in params["args"]:
get_chunks_funcs.append(func)
return get_chunks_funcs

Expand All @@ -47,6 +45,15 @@ def random_chunking(nodes):
num_in_chunk = max(len(_nodes) // num_chunks, 1)
return nxp.chunks(_nodes, num_in_chunk)

# Define a simple process_func for testing
def process_func(G, chunk, **kwargs):
# Example: Return the degree of each node in the chunk
return {node: G.degree(node) for node in chunk}

# Define a simple iterator_func for testing
def iterator_func(G):
return G.nodes()

get_chunks_funcs = get_functions_with_get_chunks()
ignore_funcs = [
"number_of_isolates",
Expand All @@ -62,19 +69,25 @@ def random_chunking(nodes):
G = nx.fast_gnp_random_graph(50, 0.6, seed=42)
H = nxp.ParallelGraph(G)
for func in get_chunks_funcs:
print(func)
if func not in ignore_funcs:
if func in tournament_funcs:
G = nx.tournament.random_tournament(50, seed=42)
H = nxp.ParallelGraph(G)
c1 = getattr(nxp, func)(H)
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
c1 = getattr(nxp, func)(H, process_func, iterator_func)
c2 = getattr(nxp, func)(
H, process_func, iterator_func, get_chunks=random_chunking
)
assert c1 == c2
else:
c1 = getattr(nxp, func)(H)
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
c1 = getattr(nxp, func)(H, process_func, iterator_func)
c2 = getattr(nxp, func)(
H, process_func, iterator_func, get_chunks=random_chunking
)
if isinstance(c1, types.GeneratorType):
c1, c2 = dict(c1), dict(c2)
c1, c2 = (
list(c1),
list(c2),
) # Convert generators to lists for comparison
if func in chk_dict_vals:
for i in range(len(G.nodes)):
assert math.isclose(c1[i], c2[i], abs_tol=1e-16)
Expand Down
Loading
Loading