diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 449682ce..488a6a9d 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -84,7 +84,7 @@ def get_info(): }, }, "betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L19", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." @@ -98,7 +98,7 @@ def get_info(): }, }, "edge_betweenness_centrality": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", "additional_parameters": { 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." diff --git a/nx_parallel/algorithms/centrality/betweenness.py b/nx_parallel/algorithms/centrality/betweenness.py index 3f396b1f..140164cf 100644 --- a/nx_parallel/algorithms/centrality/betweenness.py +++ b/nx_parallel/algorithms/centrality/betweenness.py @@ -1,4 +1,3 @@ -from joblib import Parallel, delayed from networkx.algorithms.centrality.betweenness import ( _accumulate_basic, _accumulate_endpoints, @@ -38,31 +37,35 @@ def betweenness_centrality( iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks. """ + if hasattr(G, "graph_object"): G = G.graph_object - if k is None: - nodes = G.nodes - else: - nodes = seed.sample(list(G.nodes), k) - - n_jobs = nxp.get_n_jobs() + def process_func(G, chunk, weight, endpoints): + return _betweenness_centrality_node_subset( + G, chunk, weight=weight, endpoints=endpoints + ) - if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) - else: - node_chunks = get_chunks(nodes) - - bt_cs = Parallel()( - delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints) - for chunk in node_chunks + def iterator_func(G): + if k is None: + return G.nodes + else: + return seed.sample(list(G.nodes), k) + + bt_cs = nxp.utils.chunk.execute_parallel( + G, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=get_chunks, + weight=weight, + endpoints=endpoints, ) # Reducing partial solution - bt_c = bt_cs[0] - for bt in bt_cs[1:]: - for n in bt: - bt_c[n] += bt[n] + bt_c = {} + for bt in bt_cs: + for n, value in bt.items(): + bt_c[n] = bt_c.get(n, 0.0) + value betweenness = _rescale( bt_c, @@ -94,7 +97,12 @@ def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False): @nxp._configure_if_nx_active() @py_random_state(4) def edge_betweenness_centrality( - G, k=None, normalized=True, weight=None, seed=None, get_chunks="chunks" + G, + k=None, + normalized=True, + weight=None, + seed=None, + get_chunks="nodes", ): """The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently. @@ -111,28 +119,28 @@ def edge_betweenness_centrality( if hasattr(G, "graph_object"): G = G.graph_object - if k is None: - nodes = G.nodes - else: - nodes = seed.sample(list(G.nodes), k) + def process_func(G, chunk, weight): + return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight) - n_jobs = nxp.get_n_jobs() - - if get_chunks == "chunks": - node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) - else: - node_chunks = get_chunks(nodes) - - bt_cs = Parallel()( - delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight) - for chunk in node_chunks + def iterator_func(G): + if k is None: + return G.nodes + else: + return seed.sample(list(G.nodes), k) + + bt_cs = nxp.utils.chunk.execute_parallel( + G, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=get_chunks, + weight=weight, ) # Reducing partial solution - bt_c = bt_cs[0] - for bt in bt_cs[1:]: - for e in bt: - bt_c[e] += bt[e] + bt_c = {} + for partial_bt in bt_cs: + for edge, value in partial_bt.items(): + bt_c[edge] = bt_c.get(edge, 0.0) + value for n in G: # remove nodes to only return edges del bt_c[n] diff --git a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py index 408ba05e..9c50d8b3 100644 --- a/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py +++ b/nx_parallel/algorithms/centrality/tests/test_betweenness_centrality.py @@ -3,33 +3,41 @@ import math -def test_betweenness_centrality_get_chunks(): - def get_chunk(nodes): +def test_edge_betweenness_centrality_get_chunks(): + def get_chunk(edges): num_chunks = nxp.get_n_jobs() - nodes_ebc = {i: 0 for i in nodes} - for i in ebc: - nodes_ebc[i[0]] += ebc[i] - nodes_ebc[i[1]] += ebc[i] - sorted_nodes = sorted(nodes_ebc.items(), key=lambda x: x[1], reverse=True) + edges = list(edges) - chunks = [[] for _ in range(num_chunks)] - chunk_sums = [0] * num_chunks + # Split edges into chunks without relying on precomputed centrality + chunk_size = max(1, len(edges) // num_chunks) + chunks = [edges[i : i + chunk_size] for i in range(0, len(edges), chunk_size)] - for node, value in sorted_nodes: - min_chunk_index = chunk_sums.index(min(chunk_sums)) - chunks[min_chunk_index].append(node) - chunk_sums[min_chunk_index] += value + print(f"Chunks distribution: {chunks}") return chunks G = nx.fast_gnp_random_graph(100, 0.1, directed=False) H = nxp.ParallelGraph(G) - ebc = nx.edge_betweenness_centrality(G) - par_bc_chunk = nxp.betweenness_centrality(H, get_chunks=get_chunk) # smoke test - par_bc = nxp.betweenness_centrality(H) - - for i in range(len(G.nodes)): - assert math.isclose(par_bc[i], par_bc_chunk[i], abs_tol=1e-16) - # get_chunk is faster than default(for big graphs) - # G = nx.bipartite.random_graph(400, 700, 0.8, seed=5, directed=False) + + ebc = nx.edge_betweenness_centrality(G, normalized=True) + + print(f"NetworkX Edge Betweenness Centrality: {ebc}") + + backend = nxp.BackendInterface() + + # Smoke test for edge_betweenness_centrality with custom get_chunks + par_bc_chunk = backend.edge_betweenness_centrality( + H.graph_object, + get_chunks=get_chunk, + ) + + print(f"Parallel Computed Edge Betweenness Centrality: {par_bc_chunk}") + + # Compare with standard edge betweenness centrality + standard_bc = nx.edge_betweenness_centrality(G, normalized=True) + + for edge in standard_bc: + assert math.isclose( + par_bc_chunk[edge], standard_bc[edge], abs_tol=1e-6 + ), f"Edge {edge} mismatch: {par_bc_chunk[edge]} vs {standard_bc[edge]}" diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 38af8c73..0f05caa4 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -69,9 +69,9 @@ def __str__(self): def assign_algorithms(cls): """Class decorator to assign algorithms to the class attributes.""" for attr in ALGORITHMS: - # get the function name by parsing the module hierarchy - func_name = attr.rsplit(".", 1)[-1] - setattr(cls, func_name, attrgetter(attr)(algorithms)) + setattr( + cls, attr.rsplit(".", 1)[-1], staticmethod(attrgetter(attr)(algorithms)) + ) return cls diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index c8e63f9e..db6245a2 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -1,5 +1,3 @@ -# smoke tests for all functions supporting `get_chunks` kwarg - import inspect import importlib import random @@ -10,7 +8,7 @@ import nx_parallel as nxp -def get_all_functions(package_name="nx_parallel"): +def get_all_functions(package_name="nx_parallel.algorithms"): """Returns a dict keyed by function names to its arguments. This function constructs a dictionary keyed by the function @@ -32,8 +30,8 @@ def get_functions_with_get_chunks(): """Returns a list of function names with the `get_chunks` kwarg.""" all_funcs = get_all_functions() get_chunks_funcs = [] - for func in all_funcs: - if "get_chunks" in all_funcs[func]["args"]: + for func, params in all_funcs.items(): + if "get_chunks" in params["args"]: get_chunks_funcs.append(func) return get_chunks_funcs @@ -47,6 +45,15 @@ def random_chunking(nodes): num_in_chunk = max(len(_nodes) // num_chunks, 1) return nxp.chunks(_nodes, num_in_chunk) + # Define a simple process_func for testing + def process_func(G, chunk, **kwargs): + # Example: Return the degree of each node in the chunk + return {node: G.degree(node) for node in chunk} + + # Define a simple iterator_func for testing + def iterator_func(G): + return G.nodes() + get_chunks_funcs = get_functions_with_get_chunks() ignore_funcs = [ "number_of_isolates", @@ -62,19 +69,25 @@ def random_chunking(nodes): G = nx.fast_gnp_random_graph(50, 0.6, seed=42) H = nxp.ParallelGraph(G) for func in get_chunks_funcs: - print(func) if func not in ignore_funcs: if func in tournament_funcs: G = nx.tournament.random_tournament(50, seed=42) H = nxp.ParallelGraph(G) - c1 = getattr(nxp, func)(H) - c2 = getattr(nxp, func)(H, get_chunks=random_chunking) + c1 = getattr(nxp, func)(H, process_func, iterator_func) + c2 = getattr(nxp, func)( + H, process_func, iterator_func, get_chunks=random_chunking + ) assert c1 == c2 else: - c1 = getattr(nxp, func)(H) - c2 = getattr(nxp, func)(H, get_chunks=random_chunking) + c1 = getattr(nxp, func)(H, process_func, iterator_func) + c2 = getattr(nxp, func)( + H, process_func, iterator_func, get_chunks=random_chunking + ) if isinstance(c1, types.GeneratorType): - c1, c2 = dict(c1), dict(c2) + c1, c2 = ( + list(c1), + list(c2), + ) # Convert generators to lists for comparison if func in chk_dict_vals: for i in range(len(G.nodes)): assert math.isclose(c1[i], c2[i], abs_tol=1e-16) diff --git a/nx_parallel/utils/chunk.py b/nx_parallel/utils/chunk.py index 5a9052d7..c2ccb43d 100644 --- a/nx_parallel/utils/chunk.py +++ b/nx_parallel/utils/chunk.py @@ -1,9 +1,37 @@ import itertools import os +import threading +from contextlib import contextmanager import networkx as nx +import nx_parallel as nxp +from joblib import Parallel, delayed +__all__ = ["parallel_config", "chunks", "get_n_jobs", "execute_parallel"] -__all__ = ["chunks", "get_n_jobs", "create_iterables"] +_joblib_config = ( + threading.local() +) # thread-local storage ensures that parallel configs are thread-safe and do not +# interfere with each other during concurrent executions. + + +@contextmanager +def parallel_config(**kwargs): + """Context manager to temporarily override Joblib's Parallel configurations. + + Parameters + ---------- + **kwargs : dict + Keyword arguments corresponding to Joblib's Parallel parameters + (e.g., backend, verbose). These overrides are temporary and confined + to the current thread. + """ + original_kwargs = getattr(_joblib_config, "parallel_kwargs", {}).copy() + _joblib_config.parallel_kwargs = {**original_kwargs, **kwargs} + + try: + yield + finally: + _joblib_config.parallel_kwargs = original_kwargs def chunks(iterable, n_chunks): @@ -24,13 +52,17 @@ def get_n_jobs(n_jobs=None): active configuration system or modifying the passed-in value, similar to joblib's behavior. - - If running under pytest, it returns 2 jobs. + - If running under pytest, it returns 2 jobs when n_jobs is None. - If the `active` configuration in NetworkX's config is `True`, `n_jobs` is extracted from the NetworkX config. - Otherwise, `n_jobs` is obtained from joblib's active backend. - `ValueError` is raised if `n_jobs` is 0. """ - if "PYTEST_CURRENT_TEST" in os.environ: + parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + if n_jobs is None and "n_jobs" in parallel_kwargs: + n_jobs = parallel_kwargs["n_jobs"] + + if n_jobs is None and "PYTEST_CURRENT_TEST" in os.environ: return 2 if n_jobs is None: @@ -52,44 +84,76 @@ def get_n_jobs(n_jobs=None): return int(n_jobs) -def create_iterables(G, iterator, n_cores, list_of_iterator=None): - """Create an iterable of function inputs for parallel computation - based on the provided iterator type. +def execute_parallel( + G, + process_func, + iterator_func, + get_chunks="chunks", + **kwargs, +): + """Helper function to execute a processing function in parallel over chunks of data. Parameters ---------- - G : NetworkX graph - The NetworkX graph. - iterator : str - Type of iterator. Valid values are 'node', 'edge', 'isolate' - n_cores : int - The number of cores to use. - list_of_iterator : list, optional - A precomputed list of items to iterate over. If None, it will - be generated based on the iterator type. + G : networkx.Graph or ParallelGraph + The graph on which the algorithm operates. + process_func : callable + The function to process each chunk. Should accept (G, chunk, **kwargs). + iterator_func : callable + A function that takes G and returns an iterable of data to process. + get_chunks : str or callable, optional (default="chunks") + Determines how to chunk the data. + - If "chunks" or "nodes", chunks are created automatically based on the + number of jobs. + - If callable, it should take the data iterable and return an iterable of + chunks. + **kwargs : dict + Additional keyword arguments to pass to `process_func`. Returns ------- - iterable : Iterable - An iterable of function inputs. - - Raises - ------ - ValueError - If the iterator type is not one of "node", "edge" or "isolate". + list + A list of results from each parallel execution. """ - - if not list_of_iterator: - if iterator == "node": - list_of_iterator = list(G.nodes) - elif iterator == "edge": - list_of_iterator = list(G.edges) - elif iterator == "isolate": - list_of_iterator = list(nx.isolates(G)) - else: - raise ValueError(f"Invalid iterator type: {iterator}") - - if not list_of_iterator: - return iter([]) - - return chunks(list_of_iterator, n_cores) + n_jobs = nxp.get_n_jobs() + + if hasattr(G, "graph_object"): + G = G.graph_object + + data = iterator_func(G) + + if get_chunks in {"chunks", "nodes"}: + data = list(data) + data_chunks = nxp.chunks(data, max(len(data) // n_jobs, 1)) + elif callable(get_chunks): + data_chunks = get_chunks(data) + else: + raise ValueError( + "get_chunks must be 'chunks', 'nodes', or a callable that returns an " + "iterable of chunks." + ) + + # retrieve global backend ParallelConfig instance + config = nx.config.backends.parallel + + joblib_params = { + "backend": config.backend, + "n_jobs": n_jobs, + "verbose": config.verbose, + "temp_folder": config.temp_folder, + "max_nbytes": config.max_nbytes, + "mmap_mode": config.mmap_mode, + "prefer": config.prefer, + "require": config.require, + "inner_max_num_threads": config.inner_max_num_threads, + } + + # retrieve and apply overrides from parallel_config + parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {}) + joblib_params.update(parallel_kwargs) + + joblib_params = {k: v for k, v in joblib_params.items() if v is not None} + + return Parallel(**joblib_params)( + delayed(process_func)(G, chunk, **kwargs) for chunk in data_chunks + ) diff --git a/nx_parallel/utils/tests/test_chunk.py b/nx_parallel/utils/tests/test_chunk.py index 45ef9ae4..480eae0e 100644 --- a/nx_parallel/utils/tests/test_chunk.py +++ b/nx_parallel/utils/tests/test_chunk.py @@ -1,4 +1,5 @@ import os +from unittest.mock import patch, call import pytest import networkx as nx import nx_parallel as nxp @@ -9,6 +10,11 @@ def test_get_n_jobs(): # Test with no n_jobs (default) with pytest.MonkeyPatch().context() as mp: mp.delitem(os.environ, "PYTEST_CURRENT_TEST", raising=False) + + # Ensure that the parallel config is inactive + nx.config.backends.parallel.active = False + nx.config.backends.parallel.n_jobs = None + assert nxp.get_n_jobs() == 1 # Test with n_jobs set to positive value @@ -16,19 +22,22 @@ def test_get_n_jobs(): # Test with n_jobs set to negative value assert nxp.get_n_jobs(-1) == os.cpu_count() - nx.config.backends.parallel.active = False - from joblib import parallel_config - parallel_config(n_jobs=3) - assert nxp.get_n_jobs() == 3 + # Mock joblib's active backend to return ('loky', 3) + with patch("joblib.parallel.get_active_backend", return_value=("loky", 3)): + assert nxp.get_n_jobs() == 3 + + # Test with n_jobs set in NX config nx.config.backends.parallel.active = True nx.config.backends.parallel.n_jobs = 5 assert nxp.get_n_jobs() == 5 + # Test with n_jobs = 0 to raise a ValueError - try: + # Ensure that the parallel config is inactive to not override n_jobs + nx.config.backends.parallel.active = False + nx.config.backends.parallel.n_jobs = None + with pytest.raises(ValueError, match="n_jobs == 0 in Parallel has no meaning"): nxp.get_n_jobs(0) - except ValueError as e: - assert str(e) == "n_jobs == 0 in Parallel has no meaning" def test_chunks(): @@ -44,18 +53,212 @@ def test_chunks(): assert chunks_list == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)] -def test_create_iterables(): - """Test `create_iterables` for different iterator types.""" - G = nx.fast_gnp_random_graph(50, 0.6, seed=42) +def test_execute_parallel_basic(): + """Basic test for `execute_parallel` to ensure it processes chunks correctly.""" + + G = nx.path_graph(10) + H = nxp.ParallelGraph(G) + + def process_func(G, chunk, **kwargs): + return {node: G.degree(node) for node in chunk} + + def iterator_func(G): + return list(G.nodes()) + + # Execute in parallel without overrides + results = nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + combined_results = {} + for res in results: + combined_results.update(res) + + assert combined_results == dict(G.degree()) + + +def test_execute_parallel_with_overrides(): + """Test `execute_parallel` with overridden parallel configuration.""" + + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + def process_func(G, chunk, **kwargs): + return list(chunk) + + def iterator_func(G): + return list(G.nodes()) + + # Mock joblib.Parallel in the correct module + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(n_jobs=2, backend="loky", verbose=5): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Assert that Parallel was called with overridden parameters (excluding None values) + mock_parallel.assert_called_with( + backend="loky", + n_jobs=2, + verbose=5, + max_nbytes="1M", + mmap_mode="r", + ) + + +def test_execute_parallel_callable_chunks(): + """Test `execute_parallel` with a custom callable for get_chunks.""" + + G = nx.cycle_graph(6) + H = nxp.ParallelGraph(G) + + def process_func(G, chunk, **kwargs): + return sum(chunk) + + def iterator_func(G): + return list(G.nodes()) # Convert NodeView to list + + # Define a custom chunking function that groups nodes into chunks of size 2 + def custom_chunking(data): + return [tuple(data[i : i + 2]) for i in range(0, len(data), 2)] + + results = nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks=custom_chunking, + ) + + # Expected sums: (0+1), (2+3), (4+5) => 1, 5, 9 + expected_results = [1, 5, 9] + + assert results == expected_results + + +def test_parallel_config_override(): + """Test that `parallel_config` correctly overrides config within its context.""" + + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + def process_func(G, chunk, **kwargs): + return list(chunk) + + def iterator_func(G): + return list(G.nodes()) + + # Mock joblib.Parallel to capture the parameters it's called with + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(backend="threading", n_jobs=2, verbose=10): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Assert that Parallel was called with overridden parameters + mock_parallel.assert_called_with( + backend="threading", + n_jobs=2, + verbose=10, + max_nbytes="1M", + mmap_mode="r", + ) + + +def test_parallel_config_nested_overrides(): + """Test that nested `parallel_config` contexts correctly handle overrides.""" + + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + + def process_func(G, chunk, **kwargs): + return list(chunk) + + def iterator_func(G): + return list(G.nodes()) + + # Mock joblib.Parallel in the correct module + with patch("nx_parallel.utils.chunk.Parallel") as mock_parallel: + with nxp.parallel_config(backend="threading", n_jobs=2, verbose=10): + with nxp.parallel_config(n_jobs=4, verbose=5): + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + nxp.execute_parallel( + G=H, + process_func=process_func, + iterator_func=iterator_func, + get_chunks="chunks", + ) + + # Extract only the instantiation calls to Parallel + instantiation_calls = [c for c in mock_parallel.call_args_list if c[0] or c[1]] + + # Adjust expected calls to exclude parameters with None values + expected_calls = [ + call( + backend="threading", + n_jobs=4, + verbose=5, + max_nbytes="1M", + mmap_mode="r", + ), + call( + backend="threading", + n_jobs=2, + verbose=10, + max_nbytes="1M", + mmap_mode="r", + ), + ] + + assert instantiation_calls == expected_calls + + +def test_parallel_config_thread_safety(monkeypatch): + """Test that `parallel_config` overrides are thread-safe.""" + + import threading + + # Remove 'PYTEST_CURRENT_TEST' from os.environ + monkeypatch.delenv("PYTEST_CURRENT_TEST", raising=False) + + # Define a function to run in a thread + def thread_func(backend, n_jobs, results, index): + with nxp.parallel_config(backend=backend, n_jobs=n_jobs): + # Simulate some operation that uses get_n_jobs + job_count = nxp.get_n_jobs() + # Retrieve backend from thread-local storage + parallel_kwargs = getattr( + nxp.utils.chunk._joblib_config, "parallel_kwargs", {} + ) + backend_used = parallel_kwargs.get( + "backend", nx.config.backends.parallel.backend + ) + results[index] = (backend_used, job_count) + + results = {} - # Test node iterator - iterable = nxp.create_iterables(G, "node", 4) - assert len(list(iterable)) == 4 + # Start two threads with different configurations + t1 = threading.Thread(target=thread_func, args=("loky", 5, results, 1)) + t2 = threading.Thread(target=thread_func, args=("threading", 3, results, 2)) - # Test edge iterator - iterable = nxp.create_iterables(G, "edge", 4) - assert len(list(iterable)) == 4 + t1.start() + t2.start() + t1.join() + t2.join() - # Test isolate iterator (G has no isolates, so this should be empty) - iterable = nxp.create_iterables(G, "isolate", 4) - assert len(list(iterable)) == 0 + # Ensure that each thread received its own overrides + assert results[1] == ("loky", 5) + assert results[2] == ("threading", 3)