Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions libp2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,12 @@ async def reset(self) -> None:
"""

@abstractmethod
def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream.

:param ttl: Time-to-live for the stream in seconds.
:return: True if the deadline was set successfully,
otherwise False.
:raises MuxedStreamError: if setting the deadline fails.
"""

@abstractmethod
Expand Down
25 changes: 15 additions & 10 deletions libp2p/host/autonat/autonat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from libp2p.custom_types import (
TProtocol,
)
from libp2p.host.exceptions import (
HostException,
)
from libp2p.host.autonat.pb.autonat_pb2 import (
DialResponse,
Message,
Expand Down Expand Up @@ -154,7 +157,11 @@ async def _handle_dial(self, message: Message) -> Message:
if peer_id in self.dial_results:
success = self.dial_results[peer_id]
else:
success = await self._try_dial(peer_id)
try:
await self._try_dial(peer_id)
success = True
except HostException:
success = False
self.dial_results[peer_id] = success

peer_info = PeerInfo()
Expand All @@ -166,7 +173,7 @@ async def _handle_dial(self, message: Message) -> Message:
response.dial_response.CopyFrom(dial_response)
return response

async def _try_dial(self, peer_id: ID) -> bool:
async def _try_dial(self, peer_id: ID) -> None:
"""
Attempt to establish a connection with a peer.

Expand All @@ -175,19 +182,17 @@ async def _try_dial(self, peer_id: ID) -> bool:
peer_id : ID
The identifier of the peer to attempt to dial.

Returns
-------
bool
True if the connection was successfully established,
False if the connection attempt failed.
Raises
------
HostException
If the connection attempt failed.

"""
try:
stream = await self.host.new_stream(peer_id, [AUTONAT_PROTOCOL_ID])
await stream.close()
return True
except Exception:
return False
except Exception as e:
raise HostException(f"Failed to dial peer {peer_id}: {e}") from e

def get_status(self) -> int:
"""
Expand Down
14 changes: 13 additions & 1 deletion libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
get_default_protocols,
)
from libp2p.host.exceptions import (
HostException,
StreamFailure,
)
from libp2p.peer.id import (
Expand Down Expand Up @@ -206,8 +207,19 @@ def set_stream_handler(

:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler function
:raises HostException: if setting the stream handler fails
"""
self.multiselect.add_handler(protocol_id, stream_handler)
try:
if not protocol_id or (isinstance(protocol_id, str) and not protocol_id.strip()):
raise HostException("Protocol ID cannot be empty")
if not stream_handler:
raise HostException("Stream handler cannot be None")

self.multiselect.add_handler(protocol_id, stream_handler)
except HostException:
raise
except Exception as e:
raise HostException(f"Failed to set stream handler: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SuchitraSwain Thank you for adding the try-catch block and input validation to this function. However, I believe this PR may not be addressing the core issue described in #194.
Issue #194 is specifically about "changing return True/False pattern to try/catch pattern," but the True/False pattern has already been removed from py-libp2p in previous commits.
For reference, the earlier implementation at this commit did use the True/False return pattern, but the current codebase has already migrated away from this pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SuchitraSwain
Change the PR title


async def new_stream(
self,
Expand Down
40 changes: 34 additions & 6 deletions libp2p/identity/identify_push/identify_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from libp2p.network.stream.exceptions import (
StreamClosed,
)
from libp2p.host.exceptions import (
HostException,
)
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import (
ID,
Expand Down Expand Up @@ -172,7 +175,7 @@ async def push_identify_to_peer(
observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> bool:
) -> None:
"""
Push an identify message to a specific peer.

Expand All @@ -186,8 +189,8 @@ async def push_identify_to_peer(
limit: Semaphore for concurrency control.
use_varint_format: True=length-prefixed, False=raw protobuf.

Returns:
bool: True if the push was successful, False otherwise.
Raises:
HostException: If the identify push fails.

"""
async with limit:
Expand Down Expand Up @@ -224,10 +227,35 @@ async def push_identify_to_peer(
await stream.close()

logger.debug("Successfully pushed identify to peer %s", peer_id)
return True
except Exception as e:
logger.error("Error pushing identify to peer %s: %s", peer_id, e)
return False
raise HostException(f"Failed to push identify to peer {peer_id}: {e}") from e


async def _safe_push_identify_to_peer(
host: IHost,
peer_id: ID,
observed_multiaddr: Multiaddr | None = None,
limit: trio.Semaphore = trio.Semaphore(CONCURRENCY_LIMIT),
use_varint_format: bool = True,
) -> None:
"""
Safely push identify information to a specific peer, catching and logging exceptions.

This is a wrapper around push_identify_to_peer that catches exceptions and logs them
instead of letting them propagate, which is useful when calling from a nursery.

Args:
host: The libp2p host.
peer_id: The peer ID to push to.
observed_multiaddr: The observed multiaddress (optional).
limit: Semaphore for concurrency control.
use_varint_format: True=length-prefixed, False=raw protobuf.
"""
try:
await push_identify_to_peer(host, peer_id, observed_multiaddr, limit, use_varint_format)
except Exception as e:
logger.debug("Failed to push identify to peer %s: %s", peer_id, e)


async def push_identify_to_peers(
Expand Down Expand Up @@ -260,7 +288,7 @@ async def push_identify_to_peers(
async with trio.open_nursery() as nursery:
for peer_id in peer_ids:
nursery.start_soon(
push_identify_to_peer,
_safe_push_identify_to_peer,
host,
peer_id,
observed_multiaddr,
Expand Down
76 changes: 56 additions & 20 deletions libp2p/stream_muxer/mplex/mplex_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from libp2p.stream_muxer.exceptions import (
MuxedConnUnavailable,
MuxedStreamError,
)
from libp2p.stream_muxer.rw_lock import ReadWriteLock

Expand Down Expand Up @@ -90,8 +91,13 @@ def is_initiator(self) -> bool:
return self.stream_id.is_initiator

async def _read_until_eof(self) -> bytes:
async for data in self.incoming_data_channel:
self._buf.extend(data)
if self.read_deadline is not None:
with trio.fail_after(self.read_deadline):
async for data in self.incoming_data_channel:
self._buf.extend(data)
else:
async for data in self.incoming_data_channel:
self._buf.extend(data)
payload = self._buf
self._buf = self._buf[len(payload) :]
return bytes(payload)
Expand Down Expand Up @@ -138,8 +144,16 @@ async def read(self, n: int | None = None) -> bytes:
# We know `receive` will be blocked here. Wait for data here with
# `receive` and catch all kinds of errors here.
try:
data = await self.incoming_data_channel.receive()
self._buf.extend(data)
# Apply read deadline if set
if self.read_deadline is not None:
with trio.fail_after(self.read_deadline):
data = await self.incoming_data_channel.receive()
self._buf.extend(data)
else:
data = await self.incoming_data_channel.receive()
self._buf.extend(data)
except trio.TooSlowError:
raise MuxedStreamError("Read operation timed out")
except trio.EndOfChannel:
if self.event_reset.is_set():
raise MplexStreamReset
Expand Down Expand Up @@ -173,7 +187,15 @@ async def write(self, data: bytes) -> None:
if self.is_initiator
else HeaderTags.MessageReceiver
)
await self.muxed_conn.send_message(flag, data, self.stream_id)
try:
# Apply write deadline if set
if self.write_deadline is not None:
with trio.fail_after(self.write_deadline):
await self.muxed_conn.send_message(flag, data, self.stream_id)
else:
await self.muxed_conn.send_message(flag, data, self.stream_id)
except trio.TooSlowError:
raise MuxedStreamError("Write operation timed out")

async def close(self) -> None:
"""
Expand Down Expand Up @@ -241,34 +263,48 @@ async def reset(self) -> None:
if self.muxed_conn.streams is not None:
self.muxed_conn.streams.pop(self.stream_id, None)

# TODO deadline not in use
def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the stream in seconds
:raises MuxedStreamError: if setting the deadline fails
"""
self.read_deadline = ttl
self.write_deadline = ttl
return True

def set_read_deadline(self, ttl: int) -> bool:
try:
if ttl < 0:
raise ValueError("Deadline cannot be negative")
self.read_deadline = ttl
self.write_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set deadline: {e}") from e

def set_read_deadline(self, ttl: int) -> None:
"""
Set read deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the read deadline in seconds
:raises MuxedStreamError: if setting the read deadline fails
"""
self.read_deadline = ttl
return True
try:
if ttl < 0:
raise ValueError("Read deadline cannot be negative")
self.read_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set read deadline: {e}") from e

def set_write_deadline(self, ttl: int) -> bool:
def set_write_deadline(self, ttl: int) -> None:
"""
Set write deadline for muxed stream.

:return: True if successful
:param ttl: Time-to-live for the write deadline in seconds
:raises MuxedStreamError: if setting the write deadline fails
"""
self.write_deadline = ttl
return True
try:
if ttl < 0:
raise ValueError("Write deadline cannot be negative")
self.write_deadline = ttl
except Exception as e:
raise MuxedStreamError(f"Failed to set write deadline: {e}") from e

def get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the parent Mplex connection."""
Expand Down
8 changes: 4 additions & 4 deletions libp2p/stream_muxer/yamux/yamux.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ async def reset(self) -> None:
self.closed = True
self.reset_received = True # Mark as reset

def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream. Yamux does not support deadlines natively,
so this method always returns False to indicate the operation is unsupported.
so this method raises an exception to indicate the operation is unsupported.

:param ttl: Time-to-live in seconds (ignored).
:return: False, as deadlines are not supported.
:raises NotImplementedError: as deadlines are not supported in Yamux.
"""
raise NotImplementedError("Yamux does not support setting read deadlines")
raise NotImplementedError("Yamux does not support setting deadlines")

def get_remote_address(self) -> tuple[str, int] | None:
"""
Expand Down
8 changes: 4 additions & 4 deletions libp2p/transport/quic/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,15 +632,15 @@ async def __aexit__(
logger.debug("Exiting the context and closing the stream")
await self.close()

def set_deadline(self, ttl: int) -> bool:
def set_deadline(self, ttl: int) -> None:
"""
Set a deadline for the stream. QUIC does not support deadlines natively,
so this method always returns False to indicate the operation is unsupported.
so this method raises an exception to indicate the operation is unsupported.

:param ttl: Time-to-live in seconds (ignored).
:return: False, as deadlines are not supported.
:raises NotImplementedError: as deadlines are not supported in QUIC.
"""
raise NotImplementedError("QUIC does not support setting read deadlines")
raise NotImplementedError("QUIC does not support setting deadlines")

# String representation for debugging

Expand Down
Loading
Loading