From 5ae0c8efd006de9fc54ecd694eec4556bf4d3012 Mon Sep 17 00:00:00 2001 From: Suchitra Swain Date: Sat, 13 Sep 2025 14:35:51 +0200 Subject: [PATCH] [910] Pubsub service crashes on protocol negotiation failures --- libp2p/pubsub/pubsub.py | 6 ++++-- tests/core/pubsub/test_pubsub.py | 27 ++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 2c605fc3a..05c95a08e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -501,8 +501,10 @@ async def handle_peer_queue(self) -> None: async with self.peer_receive_channel: self.event_handle_peer_queue_started.set() async for peer_id in self.peer_receive_channel: - # Add Peer - self.manager.run_task(self._handle_new_peer, peer_id) + try: + self.manager.run_task(self._handle_new_peer, peer_id) + except Exception as e: + logger.info(f"Protocol negotiation failed for peer {peer_id}: {e}") async def handle_dead_peer_queue(self) -> None: """ diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index 9a09f34fe..29747ad5a 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -1239,5 +1239,30 @@ async def test_blacklist_tears_down_existing_connection(): if TESTING_TOPIC in pubsub0.peer_topics: assert pubsub1.my_id not in pubsub0.peer_topics[TESTING_TOPIC] else: - # It’s also fine if the entire topic entry was pruned + # It's also fine if the entire topic entry was pruned assert TESTING_TOPIC not in pubsub0.peer_topics + + +async def test_handle_peer_queue_exception_handling(): + """Test that handle_peer_queue gracefully handles exceptions from _handle_new_peer.""" + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + + original_handle_new_peer = pubsub._handle_new_peer + + async def mock_handle_new_peer(peer_id): + raise Exception("Protocol negotiation failed") + + pubsub._handle_new_peer = mock_handle_new_peer + + test_peer = IDFactory() + + await pubsub.peer_receive_channel.send(test_peer) + + async with trio.open_nursery() as nursery: + nursery.start_soon(pubsub.handle_peer_queue) + + await trio.sleep(0.1) + assert pubsub.manager.is_running + + pubsub._handle_new_peer = original_handle_new_peer