From bde19cacd81df08bd5ac8d8396fe62ac0af2ef8c Mon Sep 17 00:00:00 2001 From: Mystical <125946525+mystical-prog@users.noreply.github.com> Date: Sun, 29 Jun 2025 12:20:24 +0530 Subject: [PATCH 1/5] added flood publishing --- libp2p/pubsub/gossipsub.py | 81 ++++++++++++++++++++++---------------- libp2p/tools/constants.py | 1 + tests/utils/factories.py | 3 ++ 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 839d67198..0d701e00e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -100,6 +100,8 @@ class GossipSub(IPubsubRouter, Service): prune_back_off: int unsubscribe_back_off: int + flood_publish: bool + def __init__( self, protocols: Sequence[TProtocol], @@ -118,6 +120,7 @@ def __init__( px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, + flood_publish: bool = False, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -158,6 +161,8 @@ def __init__( self.prune_back_off = prune_back_off self.unsubscribe_back_off = unsubscribe_back_off + self.flood_publish = flood_publish + async def run(self) -> None: self.manager.run_daemon_task(self.heartbeat) if len(self.direct_peers) > 0: @@ -294,42 +299,50 @@ def _get_peers_to_send( if topic not in self.pubsub.peer_topics: continue - # direct peers - _direct_peers: set[ID] = {_peer for _peer in self.direct_peers} - send_to.update(_direct_peers) - - # floodsub peers - floodsub_peers: set[ID] = { - peer_id - for peer_id in self.pubsub.peer_topics[topic] - if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID - } - send_to.update(floodsub_peers) - - # gossipsub peers - gossipsub_peers: set[ID] = set() - if topic in self.mesh: - gossipsub_peers = self.mesh[topic] + if self.flood_publish and msg_forwarder == self.pubsub.my_id: + for peer in self.pubsub.peer_topics[topic]: + # TODO: add score threshold check when peer scoring is implemented + # if direct peer then skip score check + send_to.add(peer) else: - # When we publish to a topic that we have not subscribe to, we randomly - # pick `self.degree` number of peers who have subscribed to the topic - # and add them as our `fanout` peers. - topic_in_fanout: bool = topic in self.fanout - fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set() - fanout_size = len(fanout_peers) - if not topic_in_fanout or ( - topic_in_fanout and fanout_size < self.degree - ): - if topic in self.pubsub.peer_topics: - # Combine fanout peers with selected peers - fanout_peers.update( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers + # direct peers + direct_peers: set[ID] = {_peer for _peer in self.direct_peers} + send_to.update(direct_peers) + + # floodsub peers + floodsub_peers: set[ID] = { + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID + } + send_to.update(floodsub_peers) + + # gossipsub peers + gossipsub_peers: set[ID] = set() + if topic in self.mesh: + gossipsub_peers = self.mesh[topic] + else: + # When we publish to a topic that we have not subscribe to, we + # randomly pick `self.degree` number of peers who have subscribed + # to the topic and add them as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: set[ID] = ( + self.fanout[topic] if topic_in_fanout else set() + ) + fanout_size = len(fanout_peers) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers.update( + self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) ) - ) - self.fanout[topic] = fanout_peers - gossipsub_peers = fanout_peers - send_to.update(gossipsub_peers) + self.fanout[topic] = fanout_peers + gossipsub_peers = fanout_peers + send_to.update(gossipsub_peers) # Excludes `msg_forwarder` and `origin` yield from send_to.difference([msg_forwarder, origin]) diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index f7d367e70..4c495696b 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -45,6 +45,7 @@ class GossipsubParams(NamedTuple): px_peers_count: int = 16 prune_back_off: int = 60 unsubscribe_back_off: int = 10 + flood_publish: bool = False GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 75639e369..b4419e462 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -576,6 +576,7 @@ async def create_batch_with_gossipsub( px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, + flood_publish: bool = GOSSIPSUB_PARAMS.flood_publish, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -600,6 +601,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -618,6 +620,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) async with cls._create_batch_with_router( From 75a3749af924adf57347665c0341cf2e06533f70 Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Mon, 30 Jun 2025 12:28:24 +0530 Subject: [PATCH 2/5] added tests for flood publising --- newsfragments/713.feature.rst | 1 + tests/core/pubsub/test_gossipsub.py | 43 +++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 newsfragments/713.feature.rst diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst new file mode 100644 index 000000000..601911688 --- /dev/null +++ b/newsfragments/713.feature.rst @@ -0,0 +1 @@ +Added flood publishing. \ No newline at end of file diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 03276a781..ed8aff013 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -590,3 +590,46 @@ async def test_sparse_connect(): f"received the message. Ideally all nodes should receive it, but at " f"minimum {min_required} required for sparse network scalability." ) + + +@pytest.mark.trio +async def test_flood_publish(): + async with PubsubFactory.create_batch_with_gossipsub( + 6, + degree=2, + degree_low=1, + degree_high=3, + flood_publish=False, + ) as pubsubs_gsub: + routers: list[GossipSub] = [] + for pubsub in pubsubs_gsub: + assert isinstance(pubsub.router, GossipSub) + routers.append(pubsub.router) + hosts = [ps.host for ps in pubsubs_gsub] + + topic = "flood_test_topic" + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + + # connect host 0 to all other hosts + await one_to_all_connect(hosts, 0) + + # wait for connections to be established + await trio.sleep(1) + + # publish a message from the first host + msg_content = b"flood_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + + # wait for messages to propagate + await trio.sleep(0.5) + + print(routers[0].mesh[topic]) + if routers[0].pubsub: + print(routers[0].pubsub.peer_topics) + + # verify all nodes received the message + for queue in queues: + msg = await queue.get() + assert msg.data == msg_content, ( + f"node did not receive expected message: {msg.data}" + ) From 47809042e6eda12f3235d1c123af753510e304d2 Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Mon, 30 Jun 2025 12:31:36 +0530 Subject: [PATCH 3/5] fix lint --- newsfragments/713.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst index 601911688..6c0bb3bc0 100644 --- a/newsfragments/713.feature.rst +++ b/newsfragments/713.feature.rst @@ -1 +1 @@ -Added flood publishing. \ No newline at end of file +Added flood publishing. From ed673401aadf9669e38ddcd85681f03a443cc30b Mon Sep 17 00:00:00 2001 From: Khwahish29 Date: Tue, 8 Jul 2025 14:31:51 +0530 Subject: [PATCH 4/5] resolved merge conflicts --- tests/core/pubsub/test_gossipsub.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 6e369c359..35014cd25 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -634,6 +634,8 @@ async def test_flood_publish(): assert msg.data == msg_content, ( f"node did not receive expected message: {msg.data}" ) + + async def test_connect_some_with_fewer_hosts_than_degree(): """Test connect_some when there are fewer hosts than degree.""" # Create 3 hosts with degree=5 @@ -793,4 +795,4 @@ async def test_single_host(): connected_peers = len(pubsubs_fsub[0].peers) assert connected_peers == 0, ( f"Single host has {connected_peers} connections, expected 0" - ) \ No newline at end of file + ) From 70252b15e2cb33a7c73686e95a238986a0092c1d Mon Sep 17 00:00:00 2001 From: Suchitra Swain Date: Fri, 19 Sep 2025 12:08:34 +0200 Subject: [PATCH 5/5] [905]: Enhancement: Fix Kademila DHT --- examples/floodsub/basic_example.py | 122 +++++++++++++++++++ libp2p/kad_dht/kad_dht.py | 76 +++++++++++- libp2p/kad_dht/peer_routing.py | 25 ++++ tests/core/kad_dht/test_unit_peer_routing.py | 84 +++++++++++++ 4 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 examples/floodsub/basic_example.py diff --git a/examples/floodsub/basic_example.py b/examples/floodsub/basic_example.py new file mode 100644 index 000000000..3e68c5670 --- /dev/null +++ b/examples/floodsub/basic_example.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +Basic FloodSub Example + +This is a simple example that demonstrates FloodSub publishing and subscribing +without relying on test utilities. It shows the core functionality. + +Run this example with: + python examples/floodsub/basic_example.py +""" + +import asyncio +import logging +import sys + +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.tools.async_service import background_trio_service +from libp2p.tools.constants import FLOODSUB_PROTOCOL_ID + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("floodsub_basic") + + +async def main() -> None: + """Main function demonstrating basic FloodSub functionality.""" + logger.info("Starting basic FloodSub example...") + + # Create two hosts + key_pair1 = create_new_key_pair() + key_pair2 = create_new_key_pair() + + host1 = new_host( + key_pair=key_pair1, + listen_addrs=["/ip4/127.0.0.1/tcp/0"], + ) + + host2 = new_host( + key_pair=key_pair2, + listen_addrs=["/ip4/127.0.0.1/tcp/0"], + ) + + # Create FloodSub routers + floodsub1 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) + floodsub2 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) + + # Create Pubsub instances + pubsub1 = Pubsub( + host=host1, + router=floodsub1, + strict_signing=False, # Disable for simplicity + ) + + pubsub2 = Pubsub( + host=host2, + router=floodsub2, + strict_signing=False, # Disable for simplicity + ) + + # Start both pubsub services + async with background_trio_service(pubsub1): + async with background_trio_service(pubsub2): + await pubsub1.wait_until_ready() + await pubsub2.wait_until_ready() + + logger.info(f"Host 1 ID: {host1.get_id()}") + logger.info(f"Host 2 ID: {host2.get_id()}") + + # Start listening on both hosts + logger.info("Starting hosts...") + await host1.get_network().listen() + await host2.get_network().listen() + await trio.sleep(0.5) # Wait for hosts to start listening + + # Connect the hosts + logger.info("Connecting hosts...") + await host1.connect(host2.get_id(), host2.get_addrs()) + await trio.sleep(1) # Wait for connection + + # Subscribe to topic on host2 + topic = "test-topic" + logger.info(f"Subscribing to topic: {topic}") + subscription = await pubsub2.subscribe(topic) + await trio.sleep(0.5) # Wait for subscription to propagate + + # Publish messages from host1 + messages = [ + "Hello from FloodSub!", + "This is message number 2", + "FloodSub is working great!" + ] + + for i, message in enumerate(messages): + logger.info(f"Publishing message {i+1}: {message}") + await pubsub1.publish(topic, message.encode()) + await trio.sleep(0.5) + + # Receive messages on host2 + logger.info("Receiving messages...") + for i in range(len(messages)): + message = await subscription.get() + logger.info(f"Received message {i+1}: {message.data.decode()}") + logger.info(f" From peer: {message.from_id.hex()}") + logger.info(f" Topics: {message.topicIDs}") + + logger.info("Basic FloodSub example completed successfully!") + + +if __name__ == "__main__": + try: + trio.run(main) + except KeyboardInterrupt: + logger.info("Example interrupted by user") + sys.exit(0) + except Exception as e: + logger.error(f"Example failed: {e}") + sys.exit(1) diff --git a/libp2p/kad_dht/kad_dht.py b/libp2p/kad_dht/kad_dht.py index 0d05aaf81..3ab8686bf 100644 --- a/libp2p/kad_dht/kad_dht.py +++ b/libp2p/kad_dht/kad_dht.py @@ -278,6 +278,20 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( target_key, 20 ) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE in KadDHT, using connected peers as fallback", + len(closest_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as response + from .utils import sort_peer_ids_by_distance + fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20] + closest_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for FIND_NODE in KadDHT", len(closest_peers)) + logger.debug(f"Found {len(closest_peers)} peers close to target") # Consume the source signed_peer_record if sent @@ -459,6 +473,20 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( key, 20 ) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for provider response, using connected peers as fallback", + len(closest_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as response + from .utils import sort_peer_ids_by_distance + fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20] + closest_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for provider response", len(closest_peers)) + logger.debug( f"No providers found, including {len(closest_peers)}" "closest peers" @@ -550,6 +578,20 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( key, 20 ) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for GET_VALUE response, using connected peers as fallback", + len(closest_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as response + from .utils import sort_peer_ids_by_distance + fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20] + closest_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for GET_VALUE response", len(closest_peers)) + logger.debug( "No value found," f"including {len(closest_peers)} closest peers" @@ -677,9 +719,24 @@ async def put_value(self, key: bytes, value: bytes) -> None: ) # 2. Get closest peers, excluding self + routing_table_peers = self.routing_table.find_local_closest_peers(key) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(routing_table_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for put_value, using connected peers as fallback", + len(routing_table_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as fallback + from .utils import sort_peer_ids_by_distance + fallback_peers = sort_peer_ids_by_distance(key, connected_peers) + routing_table_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for put_value", len(routing_table_peers)) + closest_peers = [ peer - for peer in self.routing_table.find_local_closest_peers(key) + for peer in routing_table_peers if peer != self.local_peer_id ] logger.debug(f"Found {len(closest_peers)} peers to store value at") @@ -722,9 +779,24 @@ async def get_value(self, key: bytes) -> bytes | None: return value # 2. Get closest peers, excluding self + routing_table_peers = self.routing_table.find_local_closest_peers(key) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(routing_table_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for get_value, using connected peers as fallback", + len(routing_table_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as fallback + from .utils import sort_peer_ids_by_distance + fallback_peers = sort_peer_ids_by_distance(key, connected_peers) + routing_table_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for get_value", len(routing_table_peers)) + closest_peers = [ peer - for peer in self.routing_table.find_local_closest_peers(key) + for peer in routing_table_peers if peer != self.local_peer_id ] logger.debug(f"Searching {len(closest_peers)} peers for value") diff --git a/libp2p/kad_dht/peer_routing.py b/libp2p/kad_dht/peer_routing.py index f5313cb60..f2f27a5a0 100644 --- a/libp2p/kad_dht/peer_routing.py +++ b/libp2p/kad_dht/peer_routing.py @@ -168,6 +168,19 @@ async def find_closest_peers_network( # Start with closest peers from our routing table closest_peers = self.routing_table.find_local_closest_peers(target_key, count) logger.debug("Local closest peers: %d found", len(closest_peers)) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d), using connected peers as fallback", + len(closest_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as initial query targets + fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:count] + closest_peers = fallback_peers + logger.debug("Using %d connected peers as fallback", len(closest_peers)) + queried_peers: set[ID] = set() rounds = 0 @@ -387,6 +400,18 @@ async def _handle_kad_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( target_key, 20 ) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE response, using connected peers as fallback", + len(closest_peers), MIN_PEERS_THRESHOLD) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as response + fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20] + closest_peers = fallback_peers + logger.debug("Using %d connected peers as fallback for FIND_NODE response", len(closest_peers)) # Create protobuf response response = Message() diff --git a/tests/core/kad_dht/test_unit_peer_routing.py b/tests/core/kad_dht/test_unit_peer_routing.py index 6e15ce7ee..0c8a3a21c 100644 --- a/tests/core/kad_dht/test_unit_peer_routing.py +++ b/tests/core/kad_dht/test_unit_peer_routing.py @@ -63,6 +63,7 @@ def mock_host(self): host.get_private_key.return_value = key_pair.private_key host.get_addrs.return_value = [Multiaddr("/ip4/127.0.0.1/tcp/8000")] host.get_peerstore.return_value = Mock() + host.get_connected_peers.return_value = [] # Default to empty list host.new_stream = AsyncMock() host.connect = AsyncMock() return host @@ -459,3 +460,86 @@ def mock_query_side_effect(peer, key): # Should stop after max rounds, not infinite loop assert isinstance(result, list) + + @pytest.mark.trio + async def test_find_closest_peers_network_fallback_to_connected_peers(self, peer_routing, mock_host): + """Test that network search falls back to connected peers when routing table has insufficient peers.""" + target_key = b"target_key" + + # Create some connected peers + connected_peers = [create_valid_peer_id(f"connected{i}") for i in range(3)] + mock_host.get_connected_peers.return_value = connected_peers + + # Mock routing table to return insufficient peers (less than MIN_PEERS_THRESHOLD=5) + insufficient_peers = [create_valid_peer_id("insufficient")] + with patch.object( + peer_routing.routing_table, + "find_local_closest_peers", + return_value=insufficient_peers, + ): + # Mock _query_peer_for_closest to return empty results (no new peers found) + with patch.object(peer_routing, "_query_peer_for_closest", return_value=[]): + with patch( + "libp2p.kad_dht.peer_routing.sort_peer_ids_by_distance", + return_value=connected_peers, + ) as mock_sort: + result = await peer_routing.find_closest_peers_network(target_key) + + # Should use connected peers as fallback + mock_sort.assert_called_once_with(target_key, connected_peers) + assert result == connected_peers + + @pytest.mark.trio + async def test_find_closest_peers_network_no_fallback_when_sufficient_peers(self, peer_routing, mock_host): + """Test that network search does not fall back when routing table has sufficient peers.""" + target_key = b"target_key" + + # Create some connected peers + connected_peers = [create_valid_peer_id(f"connected{i}") for i in range(3)] + mock_host.get_connected_peers.return_value = connected_peers + + # Mock routing table to return sufficient peers (more than MIN_PEERS_THRESHOLD=5) + sufficient_peers = [create_valid_peer_id(f"sufficient{i}") for i in range(6)] + with patch.object( + peer_routing.routing_table, + "find_local_closest_peers", + return_value=sufficient_peers, + ): + # Mock _query_peer_for_closest to return empty results (no new peers found) + with patch.object(peer_routing, "_query_peer_for_closest", return_value=[]): + with patch( + "libp2p.kad_dht.peer_routing.sort_peer_ids_by_distance", + return_value=sufficient_peers, + ) as mock_sort: + result = await peer_routing.find_closest_peers_network(target_key) + + # Should not use connected peers as fallback, so sort_peer_ids_by_distance should not be called for fallback + # It will be called later in the method for sorting all candidates + assert result == sufficient_peers + + @pytest.mark.trio + async def test_find_closest_peers_network_fallback_with_no_connected_peers(self, peer_routing, mock_host): + """Test that network search handles case when no connected peers are available for fallback.""" + target_key = b"target_key" + + # Mock no connected peers + mock_host.get_connected_peers.return_value = [] + + # Mock routing table to return insufficient peers + insufficient_peers = [create_valid_peer_id("insufficient")] + with patch.object( + peer_routing.routing_table, + "find_local_closest_peers", + return_value=insufficient_peers, + ): + # Mock _query_peer_for_closest to return empty results (no new peers found) + with patch.object(peer_routing, "_query_peer_for_closest", return_value=[]): + with patch( + "libp2p.kad_dht.peer_routing.sort_peer_ids_by_distance", + return_value=insufficient_peers, + ) as mock_sort: + result = await peer_routing.find_closest_peers_network(target_key) + + # Should use the insufficient peers from routing table + # sort_peer_ids_by_distance will be called later in the method for sorting all candidates + assert result == insufficient_peers