Skip to content
5 changes: 2 additions & 3 deletions libp2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,12 @@ async def reset(self) -> None:
"""

@abstractmethod
def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream.

:param ttl: Time-to-live for the stream in seconds.
:return: True if the deadline was set successfully,
otherwise False.
:raises MuxedStreamError: if setting the deadline fails.
"""

@abstractmethod
Expand Down
25 changes: 15 additions & 10 deletions libp2p/host/autonat/autonat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from libp2p.custom_types import (
TProtocol,
)
from libp2p.host.exceptions import (
HostException,
)
from libp2p.host.autonat.pb.autonat_pb2 import (
DialResponse,
Message,
Expand Down Expand Up @@ -154,7 +157,11 @@ async def _handle_dial(self, message: Message) -> Message:
if peer_id in self.dial_results:
success = self.dial_results[peer_id]
else:
success = await self._try_dial(peer_id)
try:
await self._try_dial(peer_id)
success = True
except HostException:
success = False
self.dial_results[peer_id] = success

peer_info = PeerInfo()
Expand All @@ -166,7 +173,7 @@ async def _handle_dial(self, message: Message) -> Message:
response.dial_response.CopyFrom(dial_response)
return response

async def _try_dial(self, peer_id: ID) -> bool:
async def _try_dial(self, peer_id: ID) -> None:
"""
Attempt to establish a connection with a peer.

Expand All @@ -175,19 +182,17 @@ async def _try_dial(self, peer_id: ID) -> bool:
peer_id : ID
The identifier of the peer to attempt to dial.

Returns
-------
bool
True if the connection was successfully established,
False if the connection attempt failed.
Raises
------
HostException
If the connection attempt failed.

"""
try:
stream = await self.host.new_stream(peer_id, [AUTONAT_PROTOCOL_ID])
await stream.close()
return True
except Exception:
return False
except Exception as e:
raise HostException(f"Failed to dial peer {peer_id}: {e}") from e

def get_status(self) -> int:
"""
Expand Down
14 changes: 13 additions & 1 deletion libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
get_default_protocols,
)
from libp2p.host.exceptions import (
HostException,
StreamFailure,
)
from libp2p.peer.id import (
Expand Down Expand Up @@ -206,8 +207,19 @@ def set_stream_handler(

:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler function
:raises HostException: if setting the stream handler fails
"""
self.multiselect.add_handler(protocol_id, stream_handler)
try:
if not protocol_id or (isinstance(protocol_id, str) and not protocol_id.strip()):
raise HostException("Protocol ID cannot be empty")
if not stream_handler:
raise HostException("Stream handler cannot be None")

self.multiselect.add_handler(protocol_id, stream_handler)
except HostException:
raise
except Exception as e:
raise HostException(f"Failed to set stream handler: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SuchitraSwain Thank you for adding the try-catch block and input validation to this function. However, I believe this PR may not be addressing the core issue described in #194.
Issue #194 is specifically about "changing return True/False pattern to try/catch pattern," but the True/False pattern has already been removed from py-libp2p in previous commits.
For reference, the earlier implementation at this commit did use the True/False return pattern, but the current codebase has already migrated away from this pattern.


async def new_stream(
self,
Expand Down
40 changes: 34 additions & 6 deletions libp2p/identity/identify_push/identify_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.host.exceptions import (
HostException,
)
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import (
ID,
Expand Down Expand Up @@ -172,7 +175,7 @@ async def push_identify_to_peer(
observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> bool:
) -> None:
"""
Push an identify message to a specific peer.

Expand All @@ -186,8 +189,8 @@ async def push_identify_to_peer(
limit: Semaphore for concurrency control.
use_varint_format: True=length-prefixed, False=raw protobuf.

Returns:
bool: True if the push was successful, False otherwise.
Raises:
HostException: If the identify push fails.

"""
async with limit:
Expand Down Expand Up @@ -224,10 +227,35 @@ async def push_identify_to_peer(
await stream.close()

logger.debug("Successfully pushed identify to peer %s", peer_id)
return True
except Exception as e:
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
return False
raise HostException(f"Failed to push identify to peer {peer_id}: {e}") from e


async def _safe_push_identify_to_peer(
host: IHost,
peer_id: ID,
observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> None:
"""
Safely push identify information to a specific peer, catching and logging exceptions.

This is a wrapper around push_identify_to_peer that catches exceptions and logs them
instead of letting them propagate, which is useful when calling from a nursery.

Args:
host: The libp2p host.
peer_id: The peer ID to push to.
observed_multiaddr: The observed multiaddress (optional).
limit: Semaphore for concurrency control.
use_varint_format: True=length-prefixed, False=raw protobuf.
"""
try:
await push_identify_to_peer(host, peer_id, observed_multiaddr, limit, use_varint_format)
except Exception as e:
logger.debug("Failed to push identify to peer %s: %s", peer_id, e)


async def push_identify_to_peers(
Expand Down Expand Up @@ -260,7 +288,7 @@ async def push_identify_to_peers(
async with trio.open_nursery() as nursery:
for peer_id in peer_ids:
nursery.start_soon(
push_identify_to_peer,
_safe_push_identify_to_peer,
host,
peer_id,
observed_multiaddr,
Expand Down
44 changes: 30 additions & 14 deletions libp2p/stream_muxer/mplex/mplex_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from libp2p.stream_muxer.exceptions import (
MuxedConnUnavailable,
MuxedStreamError,
)

from .constants import (
Expand Down Expand Up @@ -309,33 +310,48 @@ async def reset(self) -> None:
self.muxed_conn.streams.pop(self.stream_id, None)

# TODO deadline not in use
def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the stream in seconds
:raises MuxedStreamError: if setting the deadline fails
"""
self.read_deadline = ttl
self.write_deadline = ttl
return True

def set_read_deadline(self, ttl: int) -> bool:
try:
if ttl < 0:
raise ValueError("Deadline cannot be negative")
self.read_deadline = ttl
self.write_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set deadline: {e}") from e

def set_read_deadline(self, ttl: int) -> None:
"""
Set read deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the read deadline in seconds
:raises MuxedStreamError: if setting the read deadline fails
"""
self.read_deadline = ttl
return True
try:
if ttl < 0:
raise ValueError("Read deadline cannot be negative")
self.read_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set read deadline: {e}") from e

def set_write_deadline(self, ttl: int) -> bool:
def set_write_deadline(self, ttl: int) -> None:
"""
Set write deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the write deadline in seconds
:raises MuxedStreamError: if setting the write deadline fails
"""
self.write_deadline = ttl
return True
try:
if ttl < 0:
raise ValueError("Write deadline cannot be negative")
self.write_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set write deadline: {e}") from e

def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the parent Mplex connection."""
Expand Down
8 changes: 4 additions & 4 deletions libp2p/stream_muxer/yamux/yamux.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ async def reset(self) -> None:
self.closed = True
self.reset_received = True # Mark as reset

def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream. Yamux does not support deadlines natively,
so this method always returns False to indicate the operation is unsupported.
so this method raises an exception to indicate the operation is unsupported.

:param ttl: Time-to-live in seconds (ignored).
:return: False, as deadlines are not supported.
:raises NotImplementedError: as deadlines are not supported in Yamux.
"""
raise NotImplementedError("Yamux does not support setting read deadlines")
raise NotImplementedError("Yamux does not support setting deadlines")

def get_remote_address(self) -> tuple[str, int] | None:
"""
Expand Down
8 changes: 4 additions & 4 deletions libp2p/transport/quic/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,15 +632,15 @@ async def __aexit__(
logger.debug("Exiting the context and closing the stream")
await self.close()

def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream. QUIC does not support deadlines natively,
so this method always returns False to indicate the operation is unsupported.
so this method raises an exception to indicate the operation is unsupported.

:param ttl: Time-to-live in seconds (ignored).
:return: False, as deadlines are not supported.
:raises NotImplementedError: as deadlines are not supported in QUIC.
"""
raise NotImplementedError("QUIC does not support setting read deadlines")
raise NotImplementedError("QUIC does not support setting deadlines")

# String representation for debugging

Expand Down
20 changes: 11 additions & 9 deletions libp2p/transport/tcp/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def __init__(self, handler_function: THandler) -> None:
self.handler = handler_function

# TODO: Get rid of `nursery`?
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool:
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> None:
"""
Put listener in listening mode and wait for incoming connections.

:param maddr: maddr of peer
:return: return True if successful
:raises OpenConnectionError: if listening fails
"""

async def serve_tcp(
Expand Down Expand Up @@ -76,17 +76,19 @@ async def handler(stream: trio.SocketStream) -> None:

tcp_port_str = maddr.value_for_protocol("tcp")
if tcp_port_str is None:
logger.error(f"Cannot listen: TCP port is missing in multiaddress {maddr}")
return False
error_msg = f"Cannot listen: TCP port is missing in multiaddress {maddr}"
logger.error(error_msg)
raise OpenConnectionError(error_msg)

try:
tcp_port = int(tcp_port_str)
except ValueError:
logger.error(
error_msg = (
f"Cannot listen: Invalid TCP port '{tcp_port_str}' "
f"in multiaddress {maddr}"
)
return False
logger.error(error_msg)
raise OpenConnectionError(error_msg)

ip4_host_str = maddr.value_for_protocol("ip4")
# For trio.serve_tcp, ip4_host_str (as host argument) can be None,
Expand All @@ -102,16 +104,16 @@ async def handler(stream: trio.SocketStream) -> None:
if started_listeners is None:
# This implies that task_status.started() was not called within serve_tcp,
# likely because trio.serve_tcp itself failed to start (e.g., port in use).
logger.error(
error_msg = (
f"Failed to start TCP listener for {maddr}: "
f"`nursery.start` returned None. "
"This might be due to issues like the port already "
"being in use or invalid host."
)
return False
logger.error(error_msg)
raise OpenConnectionError(error_msg)

self.listeners.extend(started_listeners)
return True

def get_addrs(self) -> tuple[Multiaddr, ...]:
"""
Expand Down
7 changes: 7 additions & 0 deletions newsfragments/194.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Replaced return True/False pattern with try/catch pattern for better error handling.

- Updated set_stream_handler method in BasicHost to raise HostException instead of returning boolean
- Enhanced exception handling in mplex stream deadline methods with proper error chaining
- Improved identify/push protocol error handling with comprehensive exception management
- Added proper exception documentation and error messages throughout affected methods

7 changes: 5 additions & 2 deletions tests/core/examples/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,11 @@ async def identify_push_demo(host_a, host_b):
logger.debug("Host A protocols before push: %s", host_a_protocols)

# Push identify information from host_a to host_b
success = await push_identify_to_peer(host_a, host_b.get_id())
assert success is True
try:
await push_identify_to_peer(host_a, host_b.get_id())
# If we get here, the push was successful
except Exception as e:
pytest.fail(f"push_identify_to_peer failed: {e}")

# Add a small delay to allow processing
await trio.sleep(0.1)
Expand Down
Loading