diff --git a/libp2p/relay/circuit_v2/config.py b/libp2p/relay/circuit_v2/config.py index 3315c74f1..70046c6a6 100644 --- a/libp2p/relay/circuit_v2/config.py +++ b/libp2p/relay/circuit_v2/config.py @@ -9,6 +9,7 @@ dataclass, field, ) +from enum import Flag, auto from libp2p.peer.peerinfo import ( PeerInfo, @@ -18,29 +19,95 @@ RelayLimits, ) +DEFAULT_MIN_RELAYS = 3 +DEFAULT_MAX_RELAYS = 20 +DEFAULT_DISCOVERY_INTERVAL = 300 # seconds +DEFAULT_RESERVATION_TTL = 3600 # seconds +DEFAULT_MAX_CIRCUIT_DURATION = 3600 # seconds +DEFAULT_MAX_CIRCUIT_BYTES = 1024 * 1024 * 1024 # 1GB + +DEFAULT_MAX_CIRCUIT_CONNS = 8 +DEFAULT_MAX_RESERVATIONS = 4 + +MAX_RESERVATIONS_PER_IP = 8 +MAX_CIRCUITS_PER_IP = 16 +RESERVATION_RATE_PER_IP = 4 # per minute +CIRCUIT_RATE_PER_IP = 8 # per minute +MAX_CIRCUITS_TOTAL = 64 +MAX_RESERVATIONS_TOTAL = 32 +MAX_BANDWIDTH_PER_CIRCUIT = 1024 * 1024 # 1MB/s +MAX_BANDWIDTH_TOTAL = 10 * 1024 * 1024 # 10MB/s + +MIN_RELAY_SCORE = 0.5 +MAX_RELAY_LATENCY = 1.0 # seconds +ENABLE_AUTO_RELAY = True +AUTO_RELAY_TIMEOUT = 30 # seconds +MAX_AUTO_RELAY_ATTEMPTS = 3 +RESERVATION_REFRESH_THRESHOLD = 0.8 # Refresh at 80% of TTL +MAX_CONCURRENT_RESERVATIONS = 2 + +# Shared timeout constants (used across modules) +STREAM_READ_TIMEOUT = 15 # seconds +STREAM_WRITE_TIMEOUT = 15 # seconds +STREAM_CLOSE_TIMEOUT = 10 # seconds +DIAL_TIMEOUT = 10 # seconds + +# NAT reachability timeout +REACHABILITY_TIMEOUT = 10 # seconds + +# Relay roles enum ----------------------------------------------------------- + + +class RelayRole(Flag): + """ + Bit-flag enum that captures the three possible relay capabilities. + + A node can combine multiple roles using bit-wise OR, for example:: + + RelayRole.HOP | RelayRole.STOP + """ + + HOP = auto() # Act as a relay for others ("hop") + STOP = auto() # Accept relayed connections ("stop") + CLIENT = auto() # Dial through existing relays ("client") + -@dataclass class RelayConfig: """Configuration for Circuit Relay v2.""" - # Role configuration - enable_hop: bool = False # Whether to act as a relay (hop) - enable_stop: bool = True # Whether to accept relayed connections (stop) - enable_client: bool = True # Whether to use relays for dialing + # Role configuration (bit-flags) + roles: RelayRole = RelayRole.STOP | RelayRole.CLIENT # Resource limits limits: RelayLimits | None = None # Discovery configuration bootstrap_relays: list[PeerInfo] = field(default_factory=list) - min_relays: int = 3 - max_relays: int = 20 - discovery_interval: int = 300 # seconds + min_relays: int = DEFAULT_MIN_RELAYS + max_relays: int = DEFAULT_MAX_RELAYS + discovery_interval: int = DEFAULT_DISCOVERY_INTERVAL # Connection configuration - reservation_ttl: int = 3600 # seconds - max_circuit_duration: int = 3600 # seconds - max_circuit_bytes: int = 1024 * 1024 * 1024 # 1GB + reservation_ttl: int = DEFAULT_RESERVATION_TTL + max_circuit_duration: int = DEFAULT_MAX_CIRCUIT_DURATION + max_circuit_bytes: int = DEFAULT_MAX_CIRCUIT_BYTES + + # --------------------------------------------------------------------- + # Backwards-compat boolean helpers. Existing code that still accesses + # ``cfg.enable_hop, cfg.enable_stop, cfg.enable_client`` will continue to work. + # --------------------------------------------------------------------- + + @property + def enable_hop(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.HOP) + + @property + def enable_stop(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.STOP) + + @property + def enable_client(self) -> bool: # pragma: no cover – helper + return bool(self.roles & RelayRole.CLIENT) def __post_init__(self) -> None: """Initialize default values.""" @@ -48,8 +115,8 @@ def __post_init__(self) -> None: self.limits = RelayLimits( duration=self.max_circuit_duration, data=self.max_circuit_bytes, - max_circuit_conns=8, - max_reservations=4, + max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS, + max_reservations=DEFAULT_MAX_RESERVATIONS, ) @@ -58,20 +125,20 @@ class HopConfig: """Configuration specific to relay (hop) nodes.""" # Resource limits per IP - max_reservations_per_ip: int = 8 - max_circuits_per_ip: int = 16 + max_reservations_per_ip: int = MAX_RESERVATIONS_PER_IP + max_circuits_per_ip: int = MAX_CIRCUITS_PER_IP # Rate limiting - reservation_rate_per_ip: int = 4 # per minute - circuit_rate_per_ip: int = 8 # per minute + reservation_rate_per_ip: int = RESERVATION_RATE_PER_IP + circuit_rate_per_ip: int = CIRCUIT_RATE_PER_IP # Resource quotas - max_circuits_total: int = 64 - max_reservations_total: int = 32 + max_circuits_total: int = MAX_CIRCUITS_TOTAL + max_reservations_total: int = MAX_RESERVATIONS_TOTAL # Bandwidth limits - max_bandwidth_per_circuit: int = 1024 * 1024 # 1MB/s - max_bandwidth_total: int = 10 * 1024 * 1024 # 10MB/s + max_bandwidth_per_circuit: int = MAX_BANDWIDTH_PER_CIRCUIT + max_bandwidth_total: int = MAX_BANDWIDTH_TOTAL @dataclass @@ -79,14 +146,14 @@ class ClientConfig: """Configuration specific to relay clients.""" # Relay selection - min_relay_score: float = 0.5 - max_relay_latency: float = 1.0 # seconds + min_relay_score: float = MIN_RELAY_SCORE + max_relay_latency: float = MAX_RELAY_LATENCY # Auto-relay settings - enable_auto_relay: bool = True - auto_relay_timeout: int = 30 # seconds - max_auto_relay_attempts: int = 3 + enable_auto_relay: bool = ENABLE_AUTO_RELAY + auto_relay_timeout: int = AUTO_RELAY_TIMEOUT + max_auto_relay_attempts: int = MAX_AUTO_RELAY_ATTEMPTS # Reservation management - reservation_refresh_threshold: float = 0.8 # Refresh at 80% of TTL - max_concurrent_reservations: int = 2 + reservation_refresh_threshold: float = RESERVATION_REFRESH_THRESHOLD + max_concurrent_reservations: int = MAX_CONCURRENT_RESERVATIONS diff --git a/libp2p/relay/circuit_v2/dcutr.py b/libp2p/relay/circuit_v2/dcutr.py index 2cece5d25..a67ddd5ee 100644 --- a/libp2p/relay/circuit_v2/dcutr.py +++ b/libp2p/relay/circuit_v2/dcutr.py @@ -39,6 +39,12 @@ Service, ) +from .config import ( + DIAL_TIMEOUT, + STREAM_READ_TIMEOUT, + STREAM_WRITE_TIMEOUT, +) + logger = logging.getLogger(__name__) # Protocol ID for DCUtR @@ -47,11 +53,6 @@ # Maximum message size for DCUtR (4KiB as per spec) MAX_MESSAGE_SIZE = 4 * 1024 -# Timeouts -STREAM_READ_TIMEOUT = 30 # seconds -STREAM_WRITE_TIMEOUT = 30 # seconds -DIAL_TIMEOUT = 10 # seconds - # Maximum number of hole punch attempts per peer MAX_HOLE_PUNCH_ATTEMPTS = 5 diff --git a/libp2p/relay/circuit_v2/discovery.py b/libp2p/relay/circuit_v2/discovery.py index a35eacdcd..798eaa3e7 100644 --- a/libp2p/relay/circuit_v2/discovery.py +++ b/libp2p/relay/circuit_v2/discovery.py @@ -31,6 +31,9 @@ Service, ) +from .config import ( + DEFAULT_DISCOVERY_INTERVAL as CFG_DISCOVERY_INTERVAL, +) from .pb.circuit_pb2 import ( HopMessage, ) @@ -43,10 +46,11 @@ logger = logging.getLogger("libp2p.relay.circuit_v2.discovery") -# Constants -MAX_RELAYS_TO_TRACK = 10 -DEFAULT_DISCOVERY_INTERVAL = 60 # seconds +# Constants (single-source-of-truth) +DEFAULT_DISCOVERY_INTERVAL = CFG_DISCOVERY_INTERVAL +MAX_RELAYS_TO_TRACK = 10 # Still discovery-specific STREAM_TIMEOUT = 10 # seconds +PEER_PROTOCOL_TIMEOUT = 5 # seconds # Extended interfaces for type checking @@ -165,20 +169,20 @@ async def discover_relays(self) -> None: self._discovered_relays[peer_id].last_seen = time.time() continue - # Check if peer supports the relay protocol - with trio.move_on_after(5): # Don't wait too long for protocol info + # Don't wait too long for protocol info + with trio.move_on_after(PEER_PROTOCOL_TIMEOUT): if await self._supports_relay_protocol(peer_id): await self._add_relay(peer_id) # Limit number of relays we track - if len(self._discovered_relays) > self.max_relays: + if len(self._discovered_relays) > MAX_RELAYS_TO_TRACK: # Sort by last seen time and keep only the most recent ones sorted_relays = sorted( self._discovered_relays.items(), key=lambda x: x[1].last_seen, reverse=True, ) - to_remove = sorted_relays[self.max_relays :] + to_remove = sorted_relays[MAX_RELAYS_TO_TRACK:] for peer_id, _ in to_remove: del self._discovered_relays[peer_id] @@ -463,7 +467,7 @@ async def _cleanup_expired(self) -> None: for peer_id, relay_info in self._discovered_relays.items(): # Check if relay hasn't been seen in a while (3x discovery interval) - if now - relay_info.last_seen > self.discovery_interval * 3: + if now - relay_info.last_seen > DEFAULT_DISCOVERY_INTERVAL * 3: to_remove.append(peer_id) continue diff --git a/libp2p/relay/circuit_v2/nat.py b/libp2p/relay/circuit_v2/nat.py index d4e8b3c83..6ab9714c4 100644 --- a/libp2p/relay/circuit_v2/nat.py +++ b/libp2p/relay/circuit_v2/nat.py @@ -19,10 +19,9 @@ ID, ) -logger = logging.getLogger("libp2p.relay.circuit_v2.nat") +# REACHABILITY_TIMEOUT now imported from config -# Timeout for reachability checks -REACHABILITY_TIMEOUT = 10 # seconds +logger = logging.getLogger("libp2p.relay.circuit_v2.nat") # Define private IP ranges PRIVATE_IP_RANGES = [ diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index 1cf76efa8..ae852a1f3 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -5,6 +5,7 @@ https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md """ +from enum import Enum, auto import logging import time from typing import ( @@ -37,6 +38,15 @@ Service, ) +from .config import ( + DEFAULT_MAX_CIRCUIT_BYTES, + DEFAULT_MAX_CIRCUIT_CONNS, + DEFAULT_MAX_CIRCUIT_DURATION, + DEFAULT_MAX_RESERVATIONS, + STREAM_CLOSE_TIMEOUT, + STREAM_READ_TIMEOUT, + STREAM_WRITE_TIMEOUT, +) from .pb.circuit_pb2 import ( HopMessage, Limit, @@ -58,18 +68,21 @@ PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0") STOP_PROTOCOL_ID = TProtocol("/libp2p/circuit/relay/2.0.0/stop") -# Default limits for relay resources + +# Direction enum for data piping +class Pipe(Enum): + SRC_TO_DST = auto() + DST_TO_SRC = auto() + + +# Default limits for relay resources (single source of truth) DEFAULT_RELAY_LIMITS = RelayLimits( - duration=60 * 60, # 1 hour - data=1024 * 1024 * 1024, # 1GB - max_circuit_conns=8, - max_reservations=4, + duration=DEFAULT_MAX_CIRCUIT_DURATION, + data=DEFAULT_MAX_CIRCUIT_BYTES, + max_circuit_conns=DEFAULT_MAX_CIRCUIT_CONNS, + max_reservations=DEFAULT_MAX_RESERVATIONS, ) -# Stream operation timeouts -STREAM_READ_TIMEOUT = 15 # seconds -STREAM_WRITE_TIMEOUT = 15 # seconds -STREAM_CLOSE_TIMEOUT = 10 # seconds MAX_READ_RETRIES = 5 # Maximum number of read retries @@ -458,8 +471,20 @@ async def _handle_stop_stream(self, stream: INetStream) -> None: # Start relaying data async with trio.open_nursery() as nursery: - nursery.start_soon(self._relay_data, src_stream, stream, peer_id) - nursery.start_soon(self._relay_data, stream, src_stream, peer_id) + nursery.start_soon( + self._relay_data, + src_stream, + stream, + peer_id, + Pipe.SRC_TO_DST, + ) + nursery.start_soon( + self._relay_data, + stream, + src_stream, + peer_id, + Pipe.DST_TO_SRC, + ) except trio.TooSlowError: logger.error("Timeout reading from stop stream") @@ -648,8 +673,20 @@ async def _handle_connect(self, stream: INetStream, msg: Any) -> None: # Start relaying data async with trio.open_nursery() as nursery: - nursery.start_soon(self._relay_data, stream, dst_stream, peer_id) - nursery.start_soon(self._relay_data, dst_stream, stream, peer_id) + nursery.start_soon( + self._relay_data, + stream, + dst_stream, + peer_id, + Pipe.SRC_TO_DST, + ) + nursery.start_soon( + self._relay_data, + dst_stream, + stream, + peer_id, + Pipe.DST_TO_SRC, + ) except (trio.TooSlowError, ConnectionError) as e: logger.error("Error establishing relay connection: %s", str(e)) @@ -685,6 +722,7 @@ async def _relay_data( src_stream: INetStream, dst_stream: INetStream, peer_id: ID, + direction: Pipe, ) -> None: """ Relay data between two streams. @@ -698,13 +736,16 @@ async def _relay_data( peer_id : ID ID of the peer being relayed + direction : Pipe + Direction of data flow (``Pipe.SRC_TO_DST`` or ``Pipe.DST_TO_SRC``) + """ try: while True: # Read data with retries data = await self._read_stream_with_retry(src_stream) if not data: - logger.info("Source stream closed/reset") + logger.info("%s closed/reset", direction.name) break # Write data with timeout @@ -712,10 +753,10 @@ async def _relay_data( with trio.fail_after(STREAM_WRITE_TIMEOUT): await dst_stream.write(data) except trio.TooSlowError: - logger.error("Timeout writing to destination stream") + logger.error("Timeout writing in %s", direction.name) break except Exception as e: - logger.error("Error writing to destination stream: %s", str(e)) + logger.error("Error writing in %s: %s", direction.name, str(e)) break # Update resource usage diff --git a/libp2p/relay/circuit_v2/resources.py b/libp2p/relay/circuit_v2/resources.py index 4da67ec6a..bd5d5fe09 100644 --- a/libp2p/relay/circuit_v2/resources.py +++ b/libp2p/relay/circuit_v2/resources.py @@ -8,6 +8,7 @@ from dataclasses import ( dataclass, ) +from enum import Enum, auto import hashlib import os import time @@ -19,6 +20,18 @@ # Import the protobuf definitions from .pb.circuit_pb2 import Reservation as PbReservation +RANDOM_BYTES_LENGTH = 16 # 128 bits of randomness +TIMESTAMP_MULTIPLIER = 1000000 # To convert seconds to microseconds + + +# Reservation status enum +class ReservationStatus(Enum): + """Lifecycle status of a relay reservation.""" + + ACTIVE = auto() + EXPIRED = auto() + REJECTED = auto() + @dataclass class RelayLimits: @@ -68,8 +81,8 @@ def _generate_voucher(self) -> bytes: # - Peer ID to bind it to the specific peer # - Timestamp for uniqueness # - Hash everything for a fixed size output - random_bytes = os.urandom(16) # 128 bits of randomness - timestamp = str(int(self.created_at * 1000000)).encode() + random_bytes = os.urandom(RANDOM_BYTES_LENGTH) + timestamp = str(int(self.created_at * TIMESTAMP_MULTIPLIER)).encode() peer_bytes = self.peer_id.to_bytes() # Combine all elements and hash them @@ -84,6 +97,15 @@ def is_expired(self) -> bool: """Check if the reservation has expired.""" return time.time() > self.expires_at + # Expose a friendly status enum -------------------------------------- + + @property + def status(self) -> ReservationStatus: + """Return the current status as a ``ReservationStatus`` enum.""" + return ( + ReservationStatus.EXPIRED if self.is_expired() else ReservationStatus.ACTIVE + ) + def can_accept_connection(self) -> bool: """Check if a new connection can be accepted.""" return (