From 1882db28f7a8de7bcac7f2a26ca1d2cbd04bf9f0 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sun, 18 May 2025 19:53:30 +0530 Subject: [PATCH 01/14] interop utilities for mplex ping --- interop/README.md | 19 ++++++ interop/__init__.py | 0 interop/arch.py | 73 ++++++++++++++++++++++ interop/exec/config/mod.py | 57 +++++++++++++++++ interop/exec/native_ping.py | 33 ++++++++++ interop/lib.py | 112 +++++++++++++++++++++++++++++++++ setup.py | 121 ++++++++++++++++++++++++++++++++++++ 7 files changed, 415 insertions(+) create mode 100644 interop/README.md create mode 100644 interop/__init__.py create mode 100644 interop/arch.py create mode 100644 interop/exec/config/mod.py create mode 100644 interop/exec/native_ping.py create mode 100644 interop/lib.py create mode 100644 setup.py diff --git a/interop/README.md b/interop/README.md new file mode 100644 index 000000000..5ecff1f1c --- /dev/null +++ b/interop/README.md @@ -0,0 +1,19 @@ +These commands are to be run in `./interop/exec` + +## Redis + +```bash +docker run -p 6379:6379 -it redis:latest +``` + +## Listener + +```bash +transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +``` + +## Dialer + +```bash +transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +``` diff --git a/interop/__init__.py b/interop/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/interop/arch.py b/interop/arch.py new file mode 100644 index 000000000..5c79283d0 --- /dev/null +++ b/interop/arch.py @@ -0,0 +1,73 @@ +from dataclasses import ( + dataclass, +) + +import multiaddr +import redis +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.keys import ( + KeyPair, +) +from libp2p.crypto.rsa import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.security.insecure.transport import ( + PLAINTEXT_PROTOCOL_ID, + InsecureTransport, +) +import libp2p.security.secio.transport as secio +from libp2p.stream_muxer.mplex.mplex import ( + MPLEX_PROTOCOL_ID, + Mplex, +) + + +def generate_new_rsa_identity() -> KeyPair: + return create_new_key_pair() + + +async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str): + match (sec_protocol, muxer): + case ("insecure", "mplex"): + key_pair = create_new_key_pair() + host = new_host( + key_pair, + {MPLEX_PROTOCOL_ID: Mplex}, + { + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), + TProtocol(secio.ID): secio.Transport(key_pair), + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case _: + raise ValueError("Protocols not supported") + + +@dataclass +class RedisClient: + client: redis.Redis + + def brpop(self, key: str, timeout: float) -> list[str]: + result = self.client.brpop([key], timeout) + return [result[1]] if result else [] + + def rpush(self, key: str, value: str) -> None: + self.client.rpush(key, value) + + +async def main(): + client = RedisClient(redis.Redis(host="localhost", port=6379, db=0)) + client.rpush("test", "hello") + print(client.blpop("test", timeout=5)) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/exec/config/mod.py b/interop/exec/config/mod.py new file mode 100644 index 000000000..9da19dcb8 --- /dev/null +++ b/interop/exec/config/mod.py @@ -0,0 +1,57 @@ +from dataclasses import ( + dataclass, +) +import os +from typing import ( + Optional, +) + + +def str_to_bool(val: str) -> bool: + return val.lower() in ("true", "1") + + +class ConfigError(Exception): + """Raised when the required environment variables are missing or invalid""" + + +@dataclass +class Config: + transport: str + sec_protocol: Optional[str] + muxer: Optional[str] + ip: str + is_dialer: bool + test_timeout: int + redis_addr: str + port: str + + @classmethod + def from_env(cls) -> "Config": + try: + transport = os.environ["transport"] + ip = os.environ["ip"] + except KeyError as e: + raise ConfigError(f"{e.args[0]} env variable not set") from None + + try: + is_dialer = str_to_bool(os.environ.get("is_dialer", "true")) + test_timeout = int(os.environ.get("test_timeout", "180")) + except ValueError as e: + raise ConfigError(f"Invalid value in env: {e}") from None + + redis_addr = os.environ.get("redis_addr", 6379) + sec_protocol = os.environ.get("security") + muxer = os.environ.get("muxer") + port = os.environ.get("port", "8000") + + return cls( + transport=transport, + sec_protocol=sec_protocol, + muxer=muxer, + ip=ip, + is_dialer=is_dialer, + test_timeout=test_timeout, + redis_addr=redis_addr, + port=port, + ) diff --git a/interop/exec/native_ping.py b/interop/exec/native_ping.py new file mode 100644 index 000000000..3578d0c60 --- /dev/null +++ b/interop/exec/native_ping.py @@ -0,0 +1,33 @@ +import trio + +from interop.exec.config.mod import ( + Config, + ConfigError, +) +from interop.lib import ( + run_test, +) + + +async def main() -> None: + try: + config = Config.from_env() + except ConfigError as e: + print(f"Config error: {e}") + return + + # Uncomment and implement when ready + _ = await run_test( + config.transport, + config.ip, + config.port, + config.is_dialer, + config.test_timeout, + config.redis_addr, + config.sec_protocol, + config.muxer, + ) + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/lib.py b/interop/lib.py new file mode 100644 index 000000000..8c884c3f3 --- /dev/null +++ b/interop/lib.py @@ -0,0 +1,112 @@ +from dataclasses import ( + dataclass, +) +import json +import time + +from loguru import ( + logger, +) +import multiaddr +import redis +import trio + +from interop.arch import ( + RedisClient, + build_host, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.network.stream.net_stream import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + +PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") +PING_LENGTH = 32 +RESP_TIMEOUT = 60 + + +async def handle_ping(stream: INetStream) -> None: + while True: + try: + payload = await stream.read(PING_LENGTH) + peer_id = stream.muxed_conn.peer_id + if payload is not None: + print(f"received ping from {peer_id}") + + await stream.write(payload) + print(f"responded with pong to {peer_id}") + + except Exception: + await stream.reset() + break + + +async def send_ping(stream: INetStream) -> None: + try: + payload = b"\x01" * PING_LENGTH + print(f"sending ping to {stream.muxed_conn.peer_id}") + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + if response == payload: + print(f"received pong from {stream.muxed_conn.peer_id}") + + except Exception as e: + print(f"error occurred: {e}") + + +async def run_test( + transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer +): + logger.info("Starting run_test") + + redis_client = RedisClient( + redis.Redis(host="localhost", port=int(redis_addr), db=0) + ) + (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer) + logger.info(f"Running ping test local_peer={host.get_id()}") + + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + if not is_dialer: + host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + ma = f"{listen_addr}/p2p/{host.get_id().pretty()}" + redis_client.rpush("listenerAddr", ma) + + logger.info(f"Test instance, listening: {ma}") + else: + redis_addr = redis_client.brpop("listenerAddr", timeout=5) + destination = redis_addr[0].decode() + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + + handshake_start = time.perf_counter() + + await host.connect(info) + stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) + + logger.info("Remote conection established") + nursery.start_soon(send_ping, stream) + + handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 + + logger.info(f"handshake time: {handshake_plus_ping:.2f}ms") + return + + await trio.sleep_forever() + + +@dataclass +class Report: + handshake_plus_one_rtt_millis: float + ping_rtt_millis: float + + def gen_report(self): + return json.dumps(self.__dict__) diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..dd736318a --- /dev/null +++ b/setup.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +import sys + +from setuptools import ( + find_packages, + setup, +) + +description = "libp2p: The Python implementation of the libp2p networking stack" + +# Platform-specific dependencies +if sys.platform == "win32": + crypto_requires = [] # We'll use coincurve instead of fastecdsa on Windows +else: + crypto_requires = ["fastecdsa==1.7.5"] + +extras_require = { + "dev": [ + "build>=0.9.0", + "bump_my_version>=0.19.0", + "ipython", + "mypy==1.10.0", + "pre-commit>=3.4.0", + "tox>=4.0.0", + "twine", + "wheel", + ], + "docs": [ + "sphinx>=6.0.0", + "sphinx_rtd_theme>=1.0.0", + "towncrier>=24,<25", + ], + "test": [ + "p2pclient==0.2.0", + "pytest>=7.0.0", + "pytest-xdist>=2.4.0", + "pytest-trio>=0.5.2", + "factory-boy>=2.12.0,<3.0.0", + ], + "interop": ["redis==6.1.0", "logging==0.4.9.6" "loguru==0.7.3"], +} + +extras_require["dev"] = ( + extras_require["dev"] + + extras_require["docs"] + + extras_require["test"] + + extras_require["interop"] +) + +try: + with open("./README.md", encoding="utf-8") as readme: + long_description = readme.read() +except FileNotFoundError: + long_description = description + +install_requires = [ + "base58>=1.0.3", + "coincurve>=10.0.0", + "exceptiongroup>=1.2.0; python_version < '3.11'", + "grpcio>=1.41.0", + "lru-dict>=1.1.6", + "multiaddr>=0.0.9", + "mypy-protobuf>=3.0.0", + "noiseprotocol>=0.3.0", + "protobuf>=6.30.1", + "pycryptodome>=3.9.2", + "pymultihash>=0.8.2", + "pynacl>=1.3.0", + "rpcudp>=3.0.0", + "trio-typing>=0.0.4", + "trio>=0.26.0", +] + +# Add platform-specific dependencies +install_requires.extend(crypto_requires) + +setup( + name="libp2p", + # *IMPORTANT*: Don't manually change the version here. See Contributing docs for the release process. + version="0.2.7", + description=description, + long_description=long_description, + long_description_content_type="text/markdown", + author="The Ethereum Foundation", + author_email="snakecharmers@ethereum.org", + url="https://github.com/libp2p/py-libp2p", + include_package_data=True, + install_requires=install_requires, + python_requires=">=3.9, <4", + extras_require=extras_require, + py_modules=["libp2p"], + license="MIT AND Apache-2.0", + license_files=("LICENSE-MIT", "LICENSE-APACHE"), + zip_safe=False, + keywords="libp2p p2p", + packages=find_packages(exclude=["scripts", "scripts.*", "tests", "tests.*"]), + package_data={"libp2p": ["py.typed"]}, + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Natural Language :: English", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + ], + platforms=["unix", "linux", "osx", "win32"], + entry_points={ + "console_scripts": [ + "chat-demo=examples.chat.chat:main", + "echo-demo=examples.echo.echo:main", + "ping-demo=examples.ping.ping:main", + "identify-demo=examples.identify.identify:main", + "identify-push-demo=examples.identify_push.identify_push_demo:run_main", + "identify-push-listener-dialer-demo=examples.identify_push.identify_push_listener_dialer:main", + "pubsub-demo=examples.pubsub.pubsub:main", + ], + }, +) From d08aeb81c378cdceea3abd318945588041a064fe Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 24 May 2025 00:30:19 +0530 Subject: [PATCH 02/14] added: logs --- interop/arch.py | 36 ++++++++++++++++++++- interop/lib.py | 8 +++++ libp2p/host/basic_host.py | 11 +++++-- libp2p/network/swarm.py | 6 ++-- libp2p/protocol_muxer/multiselect_client.py | 9 ++++++ 5 files changed, 64 insertions(+), 6 deletions(-) diff --git a/interop/arch.py b/interop/arch.py index 5c79283d0..135851993 100644 --- a/interop/arch.py +++ b/interop/arch.py @@ -15,6 +15,7 @@ from libp2p.crypto.rsa import ( create_new_key_pair, ) +from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair from libp2p.custom_types import ( TProtocol, ) @@ -22,11 +23,17 @@ PLAINTEXT_PROTOCOL_ID, InsecureTransport, ) +from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID +from libp2p.security.noise.transport import Transport as NoiseTransport import libp2p.security.secio.transport as secio from libp2p.stream_muxer.mplex.mplex import ( MPLEX_PROTOCOL_ID, Mplex, ) +from libp2p.stream_muxer.yamux.yamux import ( + Yamux, +) +from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID def generate_new_rsa_identity() -> KeyPair: @@ -39,7 +46,19 @@ async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxe key_pair = create_new_key_pair() host = new_host( key_pair, - {MPLEX_PROTOCOL_ID: Mplex}, + {TProtocol(MPLEX_PROTOCOL_ID): Mplex}, + { + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), + TProtocol(secio.ID): secio.Transport(key_pair), + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) + case ("insecure", "yamux"): + key_pair = create_new_key_pair() + host = new_host( + key_pair, + {TProtocol(YAMUX_PROTOCOL_ID): Yamux}, { TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair), TProtocol(secio.ID): secio.Transport(key_pair), @@ -47,6 +66,21 @@ async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxe ) muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") return (host, muladdr) + case ("noise", "yamux"): + key_pair = create_new_key_pair() + noise_key_pair = create_new_x25519_key_pair() + + host = new_host( + key_pair, + {TProtocol(YAMUX_PROTOCOL_ID): Yamux}, + { + NOISE_PROTOCOL_ID: NoiseTransport( + key_pair, noise_privkey=noise_key_pair.private_key + ) + }, + ) + muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") + return (host, muladdr) case _: raise ValueError("Protocols not supported") diff --git a/interop/lib.py b/interop/lib.py index 8c884c3f3..0c599a9ca 100644 --- a/interop/lib.py +++ b/interop/lib.py @@ -89,10 +89,18 @@ async def run_test( handshake_start = time.perf_counter() + logger.info("GETTING READY FOR CONNECTION") await host.connect(info) + logger.info("HOST CONNECTED") + + # TILL HERE EVERYTHING IS FINE + stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) + logger.info("CREATED NEW STREAM") + # DOES NOT MORE FORWARD FROM THIS logger.info("Remote conection established") + nursery.start_soon(send_ping, stream) handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index e370a3de1..6b928f401 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -6,7 +6,6 @@ AbstractAsyncContextManager, asynccontextmanager, ) -import logging from typing import ( TYPE_CHECKING, Optional, @@ -72,8 +71,11 @@ # telling it to listen on the given listen addresses. -logger = logging.getLogger("libp2p.network.basic_host") +# logger = logging.getLogger("libp2p.network.basic_host") DEFAULT_NEGOTIATE_TIMEOUT = 5 +from loguru import ( + logger, +) class BasicHost(IHost): @@ -221,14 +223,17 @@ async def new_stream( :return: stream: new stream created """ net_stream = await self._network.new_stream(peer_id) - + logger.info("INETSTREAM CHECKING IN") + logger.info(protocol_ids) # Perform protocol muxing to determine protocol to use try: + logger.debug("PROTOCOLS TRYING TO GET SENT") selected_protocol = await self.multiselect_client.select_one_of( list(protocol_ids), MultiselectCommunicator(net_stream), negotitate_timeout, ) + logger.info("PROTOCOLS GOT SENT") except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) await net_stream.reset() diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index b182def2e..e80b60c64 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -6,6 +6,10 @@ import random from typing import cast +# logger = logging.getLogger("libp2p.network.swarm") +from loguru import ( + logger, +) from multiaddr import ( Multiaddr, ) @@ -62,8 +66,6 @@ SwarmException, ) -logger = logging.getLogger("libp2p.network.swarm") - def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn: async def stream_handler(stream: INetStream) -> None: diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 90adb251d..d95974204 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -4,6 +4,10 @@ import trio +from loguru import ( + logger, +) + from libp2p.abc import ( IMultiselectClient, IMultiselectCommunicator, @@ -39,12 +43,16 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None: try: await communicator.write(MULTISELECT_PROTOCOL_ID) except MultiselectCommunicatorError as error: + logger.error("WROTE FAIL") raise MultiselectClientError() from error + logger.info(f"WROTE SUC, {MULTISELECT_PROTOCOL_ID}") try: handshake_contents = await communicator.read() + logger.info(f"READ SUC, {handshake_contents}") except MultiselectCommunicatorError as error: + logger.error(f"READ FAIL, {error}") raise MultiselectClientError() from error if not is_valid_handshake(handshake_contents): @@ -144,6 +152,7 @@ async def try_select( try: response = await communicator.read() + logger.info("Response: ", response) except MultiselectCommunicatorError as error: raise MultiselectClientError() from error From 98c1da6b4fa0b0068926471c4f8a442535e539ee Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 10 Jun 2025 15:04:45 +0530 Subject: [PATCH 03/14] removed loguru --- interop/arch.py | 8 +- interop/exec/config/mod.py | 7 +- interop/lib.py | 33 ++++-- libp2p/host/basic_host.py | 6 +- libp2p/network/swarm.py | 6 +- libp2p/protocol_muxer/multiselect_client.py | 3 - setup.py | 121 -------------------- 7 files changed, 32 insertions(+), 152 deletions(-) delete mode 100644 setup.py diff --git a/interop/arch.py b/interop/arch.py index 135851993..fafb90f72 100644 --- a/interop/arch.py +++ b/interop/arch.py @@ -23,17 +23,19 @@ PLAINTEXT_PROTOCOL_ID, InsecureTransport, ) -from libp2p.security.noise.transport import PROTOCOL_ID as NOISE_PROTOCOL_ID -from libp2p.security.noise.transport import Transport as NoiseTransport +from libp2p.security.noise.transport import ( + PROTOCOL_ID as NOISE_PROTOCOL_ID, + Transport as NoiseTransport, +) import libp2p.security.secio.transport as secio from libp2p.stream_muxer.mplex.mplex import ( MPLEX_PROTOCOL_ID, Mplex, ) from libp2p.stream_muxer.yamux.yamux import ( + PROTOCOL_ID as YAMUX_PROTOCOL_ID, Yamux, ) -from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID def generate_new_rsa_identity() -> KeyPair: diff --git a/interop/exec/config/mod.py b/interop/exec/config/mod.py index 9da19dcb8..aa1e93044 100644 --- a/interop/exec/config/mod.py +++ b/interop/exec/config/mod.py @@ -2,9 +2,6 @@ dataclass, ) import os -from typing import ( - Optional, -) def str_to_bool(val: str) -> bool: @@ -18,8 +15,8 @@ class ConfigError(Exception): @dataclass class Config: transport: str - sec_protocol: Optional[str] - muxer: Optional[str] + sec_protocol: str | None + muxer: str | None ip: str is_dialer: bool test_timeout: int diff --git a/interop/lib.py b/interop/lib.py index 0c599a9ca..d05cd8949 100644 --- a/interop/lib.py +++ b/interop/lib.py @@ -2,11 +2,8 @@ dataclass, ) import json -import time +import logging -from loguru import ( - logger, -) import multiaddr import redis import trio @@ -25,6 +22,16 @@ info_from_p2p_addr, ) +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0") PING_LENGTH = 32 RESP_TIMEOUT = 60 @@ -66,13 +73,15 @@ async def send_ping(stream: INetStream) -> None: async def run_test( transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer ): - logger.info("Starting run_test") + import time + + logging.info("Starting run_test") redis_client = RedisClient( redis.Redis(host="localhost", port=int(redis_addr), db=0) ) (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer) - logger.info(f"Running ping test local_peer={host.get_id()}") + logging.info(f"Running ping test local_peer={host.get_id()}") async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: if not is_dialer: @@ -80,7 +89,7 @@ async def run_test( ma = f"{listen_addr}/p2p/{host.get_id().pretty()}" redis_client.rpush("listenerAddr", ma) - logger.info(f"Test instance, listening: {ma}") + logging.info(f"Test instance, listening: {ma}") else: redis_addr = redis_client.brpop("listenerAddr", timeout=5) destination = redis_addr[0].decode() @@ -89,23 +98,23 @@ async def run_test( handshake_start = time.perf_counter() - logger.info("GETTING READY FOR CONNECTION") + logging.info("GETTING READY FOR CONNECTION") await host.connect(info) - logger.info("HOST CONNECTED") + logging.info("HOST CONNECTED") # TILL HERE EVERYTHING IS FINE stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) - logger.info("CREATED NEW STREAM") + logging.info("CREATED NEW STREAM") # DOES NOT MORE FORWARD FROM THIS - logger.info("Remote conection established") + logging.info("Remote conection established") nursery.start_soon(send_ping, stream) handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 - logger.info(f"handshake time: {handshake_plus_ping:.2f}ms") + logging.info(f"handshake time: {handshake_plus_ping:.2f}ms") return await trio.sleep_forever() diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 6b928f401..45920eaf7 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -6,6 +6,7 @@ AbstractAsyncContextManager, asynccontextmanager, ) +import logging from typing import ( TYPE_CHECKING, Optional, @@ -71,11 +72,8 @@ # telling it to listen on the given listen addresses. -# logger = logging.getLogger("libp2p.network.basic_host") +logger = logging.getLogger("libp2p.network.basic_host") DEFAULT_NEGOTIATE_TIMEOUT = 5 -from loguru import ( - logger, -) class BasicHost(IHost): diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index e80b60c64..b182def2e 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -6,10 +6,6 @@ import random from typing import cast -# logger = logging.getLogger("libp2p.network.swarm") -from loguru import ( - logger, -) from multiaddr import ( Multiaddr, ) @@ -66,6 +62,8 @@ SwarmException, ) +logger = logging.getLogger("libp2p.network.swarm") + def create_default_stream_handler(network: INetworkService) -> StreamHandlerFn: async def stream_handler(stream: INetStream) -> None: diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index d95974204..f75b01561 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -43,16 +43,13 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None: try: await communicator.write(MULTISELECT_PROTOCOL_ID) except MultiselectCommunicatorError as error: - logger.error("WROTE FAIL") raise MultiselectClientError() from error - logger.info(f"WROTE SUC, {MULTISELECT_PROTOCOL_ID}") try: handshake_contents = await communicator.read() logger.info(f"READ SUC, {handshake_contents}") except MultiselectCommunicatorError as error: - logger.error(f"READ FAIL, {error}") raise MultiselectClientError() from error if not is_valid_handshake(handshake_contents): diff --git a/setup.py b/setup.py deleted file mode 100644 index dd736318a..000000000 --- a/setup.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python -import sys - -from setuptools import ( - find_packages, - setup, -) - -description = "libp2p: The Python implementation of the libp2p networking stack" - -# Platform-specific dependencies -if sys.platform == "win32": - crypto_requires = [] # We'll use coincurve instead of fastecdsa on Windows -else: - crypto_requires = ["fastecdsa==1.7.5"] - -extras_require = { - "dev": [ - "build>=0.9.0", - "bump_my_version>=0.19.0", - "ipython", - "mypy==1.10.0", - "pre-commit>=3.4.0", - "tox>=4.0.0", - "twine", - "wheel", - ], - "docs": [ - "sphinx>=6.0.0", - "sphinx_rtd_theme>=1.0.0", - "towncrier>=24,<25", - ], - "test": [ - "p2pclient==0.2.0", - "pytest>=7.0.0", - "pytest-xdist>=2.4.0", - "pytest-trio>=0.5.2", - "factory-boy>=2.12.0,<3.0.0", - ], - "interop": ["redis==6.1.0", "logging==0.4.9.6" "loguru==0.7.3"], -} - -extras_require["dev"] = ( - extras_require["dev"] - + extras_require["docs"] - + extras_require["test"] - + extras_require["interop"] -) - -try: - with open("./README.md", encoding="utf-8") as readme: - long_description = readme.read() -except FileNotFoundError: - long_description = description - -install_requires = [ - "base58>=1.0.3", - "coincurve>=10.0.0", - "exceptiongroup>=1.2.0; python_version < '3.11'", - "grpcio>=1.41.0", - "lru-dict>=1.1.6", - "multiaddr>=0.0.9", - "mypy-protobuf>=3.0.0", - "noiseprotocol>=0.3.0", - "protobuf>=6.30.1", - "pycryptodome>=3.9.2", - "pymultihash>=0.8.2", - "pynacl>=1.3.0", - "rpcudp>=3.0.0", - "trio-typing>=0.0.4", - "trio>=0.26.0", -] - -# Add platform-specific dependencies -install_requires.extend(crypto_requires) - -setup( - name="libp2p", - # *IMPORTANT*: Don't manually change the version here. See Contributing docs for the release process. - version="0.2.7", - description=description, - long_description=long_description, - long_description_content_type="text/markdown", - author="The Ethereum Foundation", - author_email="snakecharmers@ethereum.org", - url="https://github.com/libp2p/py-libp2p", - include_package_data=True, - install_requires=install_requires, - python_requires=">=3.9, <4", - extras_require=extras_require, - py_modules=["libp2p"], - license="MIT AND Apache-2.0", - license_files=("LICENSE-MIT", "LICENSE-APACHE"), - zip_safe=False, - keywords="libp2p p2p", - packages=find_packages(exclude=["scripts", "scripts.*", "tests", "tests.*"]), - package_data={"libp2p": ["py.typed"]}, - classifiers=[ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Natural Language :: English", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: 3.13", - ], - platforms=["unix", "linux", "osx", "win32"], - entry_points={ - "console_scripts": [ - "chat-demo=examples.chat.chat:main", - "echo-demo=examples.echo.echo:main", - "ping-demo=examples.ping.ping:main", - "identify-demo=examples.identify.identify:main", - "identify-push-demo=examples.identify_push.identify_push_demo:run_main", - "identify-push-listener-dialer-demo=examples.identify_push.identify_push_listener_dialer:main", - "pubsub-demo=examples.pubsub.pubsub:main", - ], - }, -) From 086a1356ac55084dbe58a599e42c16a51cb1a208 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 10 Jun 2025 15:10:39 +0530 Subject: [PATCH 04/14] removed extar logs --- libp2p/host/basic_host.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 45920eaf7..0452c91ad 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -221,17 +221,13 @@ async def new_stream( :return: stream: new stream created """ net_stream = await self._network.new_stream(peer_id) - logger.info("INETSTREAM CHECKING IN") - logger.info(protocol_ids) # Perform protocol muxing to determine protocol to use try: - logger.debug("PROTOCOLS TRYING TO GET SENT") selected_protocol = await self.multiselect_client.select_one_of( list(protocol_ids), MultiselectCommunicator(net_stream), negotitate_timeout, ) - logger.info("PROTOCOLS GOT SENT") except MultiselectClientError as error: logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) await net_stream.reset() From 1151fd41e1e083497881040fac77a4693cc66c3b Mon Sep 17 00:00:00 2001 From: lla-dane Date: Tue, 10 Jun 2025 17:12:31 +0530 Subject: [PATCH 05/14] updated as per #664 --- interop/arch.py | 74 ++++++++++++--- interop/lib.py | 242 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 240 insertions(+), 76 deletions(-) diff --git a/interop/arch.py b/interop/arch.py index fafb90f72..d28a860d6 100644 --- a/interop/arch.py +++ b/interop/arch.py @@ -1,7 +1,11 @@ from dataclasses import ( dataclass, ) +import logging +from cryptography.hazmat.primitives.asymmetric import ( + x25519, +) import multiaddr import redis import trio @@ -15,7 +19,6 @@ from libp2p.crypto.rsa import ( create_new_key_pair, ) -from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair from libp2p.custom_types import ( TProtocol, ) @@ -24,7 +27,6 @@ InsecureTransport, ) from libp2p.security.noise.transport import ( - PROTOCOL_ID as NOISE_PROTOCOL_ID, Transport as NoiseTransport, ) import libp2p.security.secio.transport as secio @@ -37,11 +39,52 @@ Yamux, ) +# Configure detailed logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("ping_debug.log", mode="w", encoding="utf-8"), + ], +) + def generate_new_rsa_identity() -> KeyPair: return create_new_key_pair() +def create_noise_keypair(): + """Create a Noise protocol keypair for secure communication""" + try: + x25519_private_key = x25519.X25519PrivateKey.generate() + + class NoisePrivateKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.private_bytes_raw() + + def public_key(self): + return NoisePublicKey(self._key.public_key()) + + def get_public_key(self): + return NoisePublicKey(self._key.public_key()) + + class NoisePublicKey: + def __init__(self, key): + self._key = key + + def to_bytes(self): + return self._key.public_bytes_raw() + + return NoisePrivateKey(x25519_private_key) + except Exception as e: + logging.error(f"Failed to create Noise keypair: {e}") + return None + + async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str): match (sec_protocol, muxer): case ("insecure", "mplex"): @@ -69,18 +112,23 @@ async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxe muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") return (host, muladdr) case ("noise", "yamux"): - key_pair = create_new_key_pair() - noise_key_pair = create_new_x25519_key_pair() + key_pair = generate_new_rsa_identity() + logging.debug("Generated RSA keypair") - host = new_host( - key_pair, - {TProtocol(YAMUX_PROTOCOL_ID): Yamux}, - { - NOISE_PROTOCOL_ID: NoiseTransport( - key_pair, noise_privkey=noise_key_pair.private_key - ) - }, - ) + noise_privkey = create_noise_keypair() + if not noise_privkey: + print("[ERROR] Failed to create Noise keypair") + return 1 + logging.debug("Generated Noise keypair") + + noise_transport = NoiseTransport(key_pair, noise_privkey) + logging.debug(f"Noise transport initialized: {noise_transport}") + sec_opt = {TProtocol("/noise"): noise_transport} + muxer_opt = {TProtocol(YAMUX_PROTOCOL_ID): Yamux} + + logging.info(f"Using muxer: {muxer_opt}") + + host = new_host(key_pair=key_pair, sec_opt=sec_opt, muxer_opt=muxer_opt) muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}") return (host, muladdr) case _: diff --git a/interop/lib.py b/interop/lib.py index d05cd8949..abd8275aa 100644 --- a/interop/lib.py +++ b/interop/lib.py @@ -1,7 +1,3 @@ -from dataclasses import ( - dataclass, -) -import json import logging import multiaddr @@ -38,92 +34,212 @@ async def handle_ping(stream: INetStream) -> None: - while True: - try: - payload = await stream.read(PING_LENGTH) - peer_id = stream.muxed_conn.peer_id - if payload is not None: - print(f"received ping from {peer_id}") + """Handle incoming ping requests from rust-libp2p clients""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] New ping stream opened by {peer_id}") + logging.info(f"Ping handler called for peer {peer_id}") - await stream.write(payload) - print(f"responded with pong to {peer_id}") + ping_count = 0 - except Exception: - await stream.reset() - break + try: + while True: + try: + print(f"[INFO] Wailting for ping data from {peer_id}...") + logging.debug(f"Stream state: {stream}") + data = await stream.read(PING_LENGTH) + + if not data: + print( + f"[INFO] No data received,conneciton likely closed by {peer_id}" + ) + logging.debug("No data received, stream closed") + break + + if len(data) == 0: + print(f"[INFO] Empty data received, connection closed by {peer_id}") + logging.debug("Empty data received") + break + + ping_count += 1 + print( + f"[PING {ping_count}] Received ping from {peer_id}:" + f" {len(data)} bytes" + ) + logging.debug(f"Ping data: {data.hex()}") + + # Echo the data back (this is what ping protocol does) + await stream.write(data) + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") + + except Exception as e: + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") + logging.exception("Ping loop error") + break + except Exception as e: + print(f"[ERROR] Error handling ping from {peer_id}: {e}") + logging.exception("Ping handler error") + finally: + try: + print(f"[INFO] Closing ping stream with {peer_id}") + await stream.close() + except Exception as e: + logging.debug(f"Error closing stream: {e}") -async def send_ping(stream: INetStream) -> None: - try: - payload = b"\x01" * PING_LENGTH - print(f"sending ping to {stream.muxed_conn.peer_id}") + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") - await stream.write(payload) - with trio.fail_after(RESP_TIMEOUT): - response = await stream.read(PING_LENGTH) +async def send_ping(stream: INetStream, count: int = 1) -> None: + """Send a sequence of pings compatible with rust-libp2p.""" + peer_id = stream.muxed_conn.peer_id + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") - if response == payload: - print(f"received pong from {stream.muxed_conn.peer_id}") + import os + import time - except Exception as e: - print(f"error occurred: {e}") + rtts = [] + + for i in range(1, count + 1): + try: + # Generate random 32-byte payload as per ping protcol spec + payload = os.urandom(PING_LENGTH) + print(f"[PING {i}/{count}] Sending ping to {peer_id}") + logging.debug(f"Sending payload: {payload.hex()}") + start_time = time.time() + + await stream.write(payload) + + with trio.fail_after(RESP_TIMEOUT): + response = await stream.read(PING_LENGTH) + + end_time = time.time() + rtt = (end_time - start_time) * 1000 + + if ( + response + and len(response) >= PING_LENGTH + and response[:PING_LENGTH] == payload + ): + rtts.append(rtt) + print(f"[PING {i}] Successful RTT: {rtt:.2f}ms") + else: + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") + if response: + logging.debug(f"Expecte: {payload.hex()}") + logging.debug(f"Received: {response.hex()}") + + if i < count: + await trio.sleep(1) + + except trio.TooSlowError: + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") + except Exception as e: + print(f"[ERROR] Ping {i} failed: {e}") + logging.exception(f"Ping {i} error") + + # Print statistics + if rtts: + avg_rtt = sum(rtts) / len(rtts) + min_rtt = min(rtts) + max_rtt = max(rtts) # Fixed typo: was max_rtts + success_count = len(rtts) + loss_rate = ((count - success_count) / count) * 100 + + print("\n[STATS] Ping Statistics:") + print( + f" Packets: Sent={count}, Received={success_count}," + f" Lost={count - success_count}" + ) + print(f" Loss rate: {loss_rate:.1f}%") + print(f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms, max={max_rtt:.2f}ms") + else: + print(f"\n[STATS] All pings failed ({count} attempts)") async def run_test( transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer ): - import time - - logging.info("Starting run_test") - redis_client = RedisClient( redis.Redis(host="localhost", port=int(redis_addr), db=0) ) (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer) - logging.info(f"Running ping test local_peer={host.get_id()}") - - async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: + async with host.run(listen_addrs=[listen_addr]): if not is_dialer: + print("[INFO] Starting py-libp2p ping server...") + + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) + + # Also register alternative protocol IDs for better compatibilty + alt_protcols = [ + TProtocol("/ping/1.0.0"), + TProtocol("/libp2p/ping/1.0.0"), + ] + + for alt_proto in alt_protcols: + print(f"[INFO] Also registering handler for: {alt_proto}") + host.set_stream_handler(alt_proto, handle_ping) + + print("[INFO] Server started successfully!") + print(f"[INFO] Peer ID: {host.get_id()}") + print(f"[INFO] Listening: /ip4/{ip}/tcp/{port}") + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") + ma = f"{listen_addr}/p2p/{host.get_id().pretty()}" redis_client.rpush("listenerAddr", ma) - logging.info(f"Test instance, listening: {ma}") + print("[INFO] Pushed address to Redis database") + await trio.sleep_forever() else: + print("[INFO] Starting py-libp2p ping client...") + + print("[INFO] Fetching remote address from Redis database...") redis_addr = redis_client.brpop("listenerAddr", timeout=5) destination = redis_addr[0].decode() maddr = multiaddr.Multiaddr(destination) info = info_from_p2p_addr(maddr) + target_peer_id = info.peer_id - handshake_start = time.perf_counter() + print(f"[INFO] Our Peer ID: {host.get_id()}") + print(f"[INFO] Target: {destination}") + print(f"[INFO] Target Peer ID: {target_peer_id}") + print("[INFO] Connecting to peer...") - logging.info("GETTING READY FOR CONNECTION") await host.connect(info) - logging.info("HOST CONNECTED") - - # TILL HERE EVERYTHING IS FINE - - stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) - logging.info("CREATED NEW STREAM") - - # DOES NOT MORE FORWARD FROM THIS - logging.info("Remote conection established") - - nursery.start_soon(send_ping, stream) - - handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 - - logging.info(f"handshake time: {handshake_plus_ping:.2f}ms") - return - - await trio.sleep_forever() - - -@dataclass -class Report: - handshake_plus_one_rtt_millis: float - ping_rtt_millis: float - - def gen_report(self): - return json.dumps(self.__dict__) + print("[INFO] Connection established!") + + # Try protocols in order of preference + # Start with the standard libp2p ping protocol + protocols_to_try = [ + PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol + TProtocol("/ping/1.0.0"), # Alternative + TProtocol("/libp2p/ping/1.0.0"), # Another alternative + ] + + stream = None + + for proto in protocols_to_try: + try: + print(f"[INFO] Trying to open stream with protocol: {proto}") + stream = await host.new_stream(target_peer_id, [proto]) + print(f"[INFO] Stream opened with protocol: {proto}") + break + except Exception as e: + print(f"[ERROR] Failed to open stream with {proto}: {e}") + logging.debug(f"Protocol {proto} failed: {e}") + continue + + if not stream: + print("[ERROR] Failed to open stream with any ping protocol") + print("[ERROR] Ensure the target peer supports one of these protocols") + for proto in protocols_to_try: + print(f"[ERROR] - {proto}") + return 1 + + await send_ping(stream) + + await stream.close() + print("[INFO] Stream closed successfully") + + print("\n[INFO] Client stopped") + return 0 From d0e596f7493f9a0ec41bd9da3489b051f6760464 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 16 Jul 2025 13:33:32 +0530 Subject: [PATCH 06/14] remove loguru logs --- libp2p/protocol_muxer/multiselect_client.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index f75b01561..90adb251d 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -4,10 +4,6 @@ import trio -from loguru import ( - logger, -) - from libp2p.abc import ( IMultiselectClient, IMultiselectCommunicator, @@ -48,7 +44,6 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None: try: handshake_contents = await communicator.read() - logger.info(f"READ SUC, {handshake_contents}") except MultiselectCommunicatorError as error: raise MultiselectClientError() from error @@ -149,7 +144,6 @@ async def try_select( try: response = await communicator.read() - logger.info("Response: ", response) except MultiselectCommunicatorError as error: raise MultiselectClientError() from error From bdb4ece3a510401f6ef2fca3d559c4034aa8e5a0 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 16 Jul 2025 13:37:13 +0530 Subject: [PATCH 07/14] added redis in the pyproject.toml --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b06d639cf..e52b95aa7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ dev = [ "tox>=4.0.0", "twine", "wheel", + "redis", "setuptools>=42", "sphinx>=6.0.0", "sphinx_rtd_theme>=1.0.0", From 7d40e81b5f8c642432a7bdd8098225cecca06686 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Wed, 16 Jul 2025 13:43:35 +0530 Subject: [PATCH 08/14] added redis in docs in pyproject.toml --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e52b95aa7..c8100b01c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,7 @@ docs = [ "sphinx_rtd_theme>=1.0.0", "towncrier>=24,<25", "tomli; python_version < '3.11'", + "redis" ] test = [ "factory-boy>=2.12.0,<3.0.0", From ae10e731e79c63cfec27e04cd893ee2c7f716f5c Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 26 Jul 2025 14:20:40 +0530 Subject: [PATCH 09/14] py<->rust PING INTEROP successfull --- docs/conf.py | 2 +- docs/index.rst | 1 + docs/interop.rst | 29 +++ examples/ping/ping.py | 17 +- libp2p/stream_muxer/yamux/yamux.py | 279 +++++++++++++------------- pyproject.toml | 6 + tests/core/stream_muxer/test_yamux.py | 5 + tox.ini | 7 +- 8 files changed, 200 insertions(+), 146 deletions(-) create mode 100644 docs/interop.rst diff --git a/docs/conf.py b/docs/conf.py index 446252f1f..bd37d6ac1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -290,7 +290,7 @@ ] # Prevent autodoc from trying to import module from tests.factories -autodoc_mock_imports = ["tests.factories"] +autodoc_mock_imports = ["tests.factories", "redis"] # Documents to append as an appendix to all manuals. # texinfo_appendices = [] diff --git a/docs/index.rst b/docs/index.rst index 3031f067a..66192aed1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ The Python implementation of the libp2p networking stack Examples API + Interop .. toctree:: :maxdepth: 1 diff --git a/docs/interop.rst b/docs/interop.rst new file mode 100644 index 000000000..6485fc972 --- /dev/null +++ b/docs/interop.rst @@ -0,0 +1,29 @@ +interop package +=============== + +Submodules +---------- + +interop.arch module +------------------- + +.. automodule:: interop.arch + :members: + :show-inheritance: + :undoc-members: + +interop.lib module +------------------ + +.. automodule:: interop.lib + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: interop + :members: + :show-inheritance: + :undoc-members: diff --git a/examples/ping/ping.py b/examples/ping/ping.py index d1a5daae4..83e6e239d 100644 --- a/examples/ping/ping.py +++ b/examples/ping/ping.py @@ -1,4 +1,5 @@ import argparse +import logging import multiaddr import trio @@ -55,8 +56,8 @@ async def send_ping(stream: INetStream) -> None: async def run(port: int, destination: str) -> None: - listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - host = new_host(listen_addrs=[listen_addr]) + listen_addr = multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}") + host = new_host() async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: # Start the peer-store cleanup task @@ -106,8 +107,20 @@ def main() -> None: type=str, help=f"destination multiaddr string, e.g. {example_maddr}", ) + parser.add_argument( + "-v", "--verbose", action="store_true", help="enable verbose logging" + ) + args = parser.parse_args() + if args.verbose: + # Enable even more detailed logging + logging.getLogger("libp2p").setLevel(logging.DEBUG) + logging.getLogger("libp2p.network").setLevel(logging.DEBUG) + logging.getLogger("libp2p.transport").setLevel(logging.DEBUG) + logging.getLogger("libp2p.security").setLevel(logging.DEBUG) + logging.getLogger("libp2p.stream_muxer").setLevel(logging.DEBUG) + try: trio.run(run, *(args.port, args.destination)) except KeyboardInterrupt: diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index b2711e1a8..f55f896ea 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -33,6 +33,9 @@ from libp2p.io.exceptions import ( IncompleteReadError, ) +from libp2p.io.utils import ( + read_exactly, +) from libp2p.network.connection.exceptions import ( RawConnError, ) @@ -103,44 +106,25 @@ async def write(self, data: bytes) -> None: sent = 0 logger.debug(f"Stream {self.stream_id}: Starts writing {total_len} bytes ") while sent < total_len: - # Wait for available window with timeout - timeout = False - async with self.window_lock: - if self.send_window == 0: - logger.debug( - f"Stream {self.stream_id}: Window is zero, waiting for update" - ) - # Release lock and wait with timeout - self.window_lock.release() - # To avoid re-acquiring the lock immediately, - with trio.move_on_after(5.0) as cancel_scope: - while self.send_window == 0 and not self.closed: - await trio.sleep(0.01) - # If we timed out, cancel the scope - timeout = cancel_scope.cancelled_caught - # Re-acquire lock - await self.window_lock.acquire() - - # If we timed out waiting for window update, raise an error - if timeout: - raise MuxedStreamError( - "Timed out waiting for window update after 5 seconds." - ) + # Wait for the available window + while self.send_window == 0 and not self.closed: + await trio.sleep(0.01) - if self.closed: - raise MuxedStreamError("Stream is closed") + if self.closed: + raise MuxedStreamError("Stream is closed") - # Calculate how much we can send now + # Calculate how much we can send now + async with self.window_lock: to_send = min(self.send_window, total_len - sent) chunk = data[sent : sent + to_send] self.send_window -= to_send - # Send the data - header = struct.pack( - YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk) - ) - await self.conn.secured_conn.write(header + chunk) - sent += to_send + # Send the data + header = struct.pack( + YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk) + ) + await self.conn.secured_conn.write(header + chunk) + sent += to_send async def send_window_update(self, increment: int, skip_lock: bool = False) -> None: """ @@ -397,7 +381,6 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None: else: if self.on_close is not None: await self.on_close() - await trio.sleep(0.1) @property def is_closed(self) -> bool: @@ -529,70 +512,52 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: async def handle_incoming(self) -> None: while not self.event_shutting_down.is_set(): try: - header = await self.secured_conn.read(HEADER_SIZE) - if not header or len(header) < HEADER_SIZE: - logger.debug( - f"Connection closed orincomplete header for peer {self.peer_id}" + try: + header = await read_exactly(self.secured_conn, HEADER_SIZE) + except IncompleteReadError: + logging.debug( + f"Connection closed or incomplete header for peer " + f"{self.peer_id}" ) self.event_shutting_down.set() await self._cleanup_on_error() break + + # Debug: log raw header bytes + logging.debug(f"Raw header bytes: {header.hex()}") + version, typ, flags, stream_id, length = struct.unpack( YAMUX_HEADER_FORMAT, header ) logger.debug( f"Received header for peer {self.peer_id}:" - f"type={typ}, flags={flags}, stream_id={stream_id}," - f"length={length}" + f"version={version}, type={typ}, flags={flags}, " + f"stream_id={stream_id}, length={length}" ) - if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_SYN: - async with self.streams_lock: - if stream_id not in self.streams: - stream = YamuxStream(stream_id, self, False) - self.streams[stream_id] = stream - self.stream_buffers[stream_id] = bytearray() - self.stream_events[stream_id] = trio.Event() - ack_header = struct.pack( - YAMUX_HEADER_FORMAT, - 0, - TYPE_DATA, - FLAG_ACK, - stream_id, - 0, - ) - await self.secured_conn.write(ack_header) - logger.debug( - f"Sending stream {stream_id}" - f"to channel for peer {self.peer_id}" - ) - await self.new_stream_send_channel.send(stream) - else: - rst_header = struct.pack( - YAMUX_HEADER_FORMAT, - 0, - TYPE_DATA, - FLAG_RST, - stream_id, - 0, - ) - await self.secured_conn.write(rst_header) - elif typ == TYPE_DATA and flags & FLAG_RST: - async with self.streams_lock: - if stream_id in self.streams: - logger.debug( - f"Resetting stream {stream_id} for peer {self.peer_id}" - ) - self.streams[stream_id].closed = True - self.streams[stream_id].reset_received = True - self.stream_events[stream_id].set() - elif typ == TYPE_DATA and flags & FLAG_ACK: - async with self.streams_lock: - if stream_id in self.streams: - logger.debug( - f"Received ACK for stream" - f"{stream_id} for peer {self.peer_id}" - ) - elif typ == TYPE_GO_AWAY: + + data = b"" + if typ == TYPE_DATA and length > 0: + try: + data = await read_exactly(self.secured_conn, length) + # Ensure data is never None + if data is None: + data = b"" + logging.debug( + f"Read {len(data)} bytes of data for stream {stream_id}" + ) + except Exception as e: + logging.error(f"Error reading data for stream {stream_id}: {e}") + data = b"" # Ensure data is never None + # Mark stream as closed on read error + async with self.streams_lock: + if stream_id in self.streams: + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + + if typ == TYPE_GO_AWAY: error_code = length if error_code == GO_AWAY_NORMAL: logger.debug( @@ -630,32 +595,6 @@ async def handle_incoming(self) -> None: f"Received ping response with value" f"{length} for peer {self.peer_id}" ) - elif typ == TYPE_DATA: - try: - data = ( - await self.secured_conn.read(length) if length > 0 else b"" - ) - async with self.streams_lock: - if stream_id in self.streams: - self.stream_buffers[stream_id].extend(data) - self.stream_events[stream_id].set() - if flags & FLAG_FIN: - logger.debug( - f"Received FIN for stream {self.peer_id}:" - f"{stream_id}, marking recv_closed" - ) - self.streams[stream_id].recv_closed = True - if self.streams[stream_id].send_closed: - self.streams[stream_id].closed = True - except Exception as e: - logger.error(f"Error reading data for stream {stream_id}: {e}") - # Mark stream as closed on read error - async with self.streams_lock: - if stream_id in self.streams: - self.streams[stream_id].recv_closed = True - if self.streams[stream_id].send_closed: - self.streams[stream_id].closed = True - self.stream_events[stream_id].set() elif typ == TYPE_WINDOW_UPDATE: increment = length async with self.streams_lock: @@ -668,6 +607,85 @@ async def handle_incoming(self) -> None: f" increment: {increment}" ) stream.send_window += increment + elif typ == TYPE_DATA: + async with self.streams_lock: + if stream_id in self.streams: + # Store data - ensure data is not None before extending + if data is not None and len(data) > 0: + self.stream_buffers[stream_id].extend(data) + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + # Handle flags + if flags & FLAG_SYN: + logging.debug( + f"Received late SYN for stream {stream_id} " + f"for peer {self.peer_id}" + ) + if flags & FLAG_ACK: + logging.debug( + f"Received ACK for stream {stream_id} " + f"for peer {self.peer_id}" + ) + if flags & FLAG_FIN: + logging.debug( + f"Received FIN for stream {self.peer_id}:" + f"{stream_id}, marking recv_closed" + ) + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + if flags & FLAG_RST: + logging.debug( + f"Resetting stream {stream_id} " + f"for peer {self.peer_id}" + ) + self.streams[stream_id].closed = True + self.streams[stream_id].reset_received = True + if stream_id in self.stream_events: + self.stream_events[stream_id].set() + else: + if flags & FLAG_SYN: + if stream_id not in self.streams: + stream = YamuxStream(stream_id, self, False) + self.streams[stream_id] = stream + # Initialize stream buffer + buffer = bytearray() + if data is not None and len(data) > 0: + buffer.extend(data) + self.stream_buffers[stream_id] = buffer + self.stream_events[stream_id] = trio.Event() + self.stream_events[stream_id].set() + ack_header = struct.pack( + YAMUX_HEADER_FORMAT, + 0, + TYPE_DATA, + FLAG_ACK, + stream_id, + 0, + ) + await self.secured_conn.write(ack_header) + logging.debug( + f"Sending stream {stream_id}" + f"to channel for peer {self.peer_id}" + ) + await self.new_stream_send_channel.send(stream) + else: + rst_header = struct.pack( + YAMUX_HEADER_FORMAT, + 0, + TYPE_DATA, + FLAG_RST, + stream_id, + 0, + ) + await self.secured_conn.write(rst_header) + else: + logging.warning( + f"Received data for unknown stream {stream_id} " + f"from peer {self.peer_id} (length={length})" + ) except Exception as e: # Special handling for expected IncompleteReadError on stream close if isinstance(e, IncompleteReadError): @@ -677,7 +695,7 @@ async def handle_incoming(self) -> None: and details.get("requested_count") == 2 and details.get("received_count") == 0 ): - logger.info( + logging.info( f"Stream closed cleanly for peer {self.peer_id}" + f" (IncompleteReadError: {details})" ) @@ -685,40 +703,21 @@ async def handle_incoming(self) -> None: await self._cleanup_on_error() break else: - logger.error( + logging.error( f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" + f"{type(e).__name__}: {str(e)}" ) else: - # Handle RawConnError with more nuance - if isinstance(e, RawConnError): - error_msg = str(e) - # If RawConnError is empty, it's likely normal cleanup - if not error_msg.strip(): - logger.info( - f"RawConnError (empty) during cleanup for peer " - f"{self.peer_id} (normal connection shutdown)" - ) - else: - # Log non-empty RawConnError as warning - logger.warning( - f"RawConnError during connection handling for peer " - f"{self.peer_id}: {error_msg}" - ) - else: - # Log all other errors normally - logger.error( - f"Error in handle_incoming for peer {self.peer_id}: " - + f"{type(e).__name__}: {str(e)}" - ) + logging.error( + f"Error in handle_incoming for peer {self.peer_id}: " + f"{type(e).__name__}: {str(e)}" + ) # Don't crash the whole connection for temporary errors if self.event_shutting_down.is_set() or isinstance( e, (RawConnError, OSError) ): await self._cleanup_on_error() break - # For other errors, log and continue - await trio.sleep(0.01) async def _cleanup_on_error(self) -> None: # Set shutdown flag first to prevent other operations @@ -760,6 +759,6 @@ async def _cleanup_on_error(self) -> None: except Exception as callback_error: logger.error(f"Error in on_close callback: {callback_error}") - # Cancel nursery tasks + # Cancel nursery tasks if available if self._nursery: self._nursery.cancel_scope.cancel() diff --git a/pyproject.toml b/pyproject.toml index c8100b01c..eb6e8c676 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,8 @@ dependencies = [ "rpcudp>=3.0.0", "trio-typing>=0.0.4", "trio>=0.26.0", + "fastecdsa==2.3.2; sys_platform != 'win32'", + "cryptography>=42.0.0; sys_platform == 'win32'", # Alternative for Windows "zeroconf (>=0.147.0,<0.148.0)", ] classifiers = [ @@ -84,6 +86,7 @@ dev = [ "factory-boy>=2.12.0,<3.0.0", "ruff>=0.11.10", "pyrefly (>=0.17.1,<0.18.0)", + "pytest-timeout" ] docs = [ "sphinx>=6.0.0", @@ -99,6 +102,9 @@ test = [ "pytest-timeout>=2.4.0", "pytest-trio>=0.5.2", "pytest-xdist>=2.4.0", + "pytest-trio>=0.5.2", + "factory-boy>=2.12.0,<3.0.0", + "pytest-timeout", ] [tool.setuptools] diff --git a/tests/core/stream_muxer/test_yamux.py b/tests/core/stream_muxer/test_yamux.py index 444288518..7198549f7 100644 --- a/tests/core/stream_muxer/test_yamux.py +++ b/tests/core/stream_muxer/test_yamux.py @@ -51,6 +51,11 @@ async def read(self, n: int | None = None) -> bytes: data = await self.receive_stream.receive_some(n) logging.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logging.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: logging.debug("Closing stream") diff --git a/tox.ini b/tox.ini index 44f74bab5..85d4df60b 100644 --- a/tox.ini +++ b/tox.ini @@ -63,13 +63,14 @@ deps= wheel build[virtualenv] allowlist_externals= - bash.exe + cmd.exe commands= python --version python -m pip install --upgrade pip - bash.exe -c "rm -rf build dist" + cmd.exe /c "if exist build rd /s /q build" + cmd.exe /c "if exist dist rd /s /q dist" python -m build - bash.exe -c 'python -m pip install --upgrade "$(ls dist/libp2p-*-py3-none-any.whl)" --progress-bar off' + cmd.exe /c "for %i in (dist\libp2p-*-py3-none-any.whl) do python -m pip install --upgrade "%i" --progress-bar off" python -c "import libp2p" skip_install=true From 785ffdc435afd885294677f6c24e0e357d2fe5de Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 26 Jul 2025 21:51:30 +0530 Subject: [PATCH 10/14] add _wait_for_stream_event helper function --- libp2p/stream_muxer/yamux/yamux.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index f55f896ea..3d4952667 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -501,14 +501,31 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: # Wait for data if stream is still open logger.debug(f"Waiting for data on stream {self.peer_id}:{stream_id}") try: - await self.stream_events[stream_id].wait() - self.stream_events[stream_id] = trio.Event() + await self._wait_for_stream_event(stream_id) except KeyError: raise MuxedStreamEOF("Stream was removed") # This line should never be reached, but satisfies the type checker raise MuxedStreamEOF("Unexpected end of read_stream") + async def _wait_for_stream_event(self, stream_id: int) -> None: + async with self.streams_lock: + if stream_id not in self.stream_events or self.event_shutting_down.is_set(): + return + event = self.stream_events[stream_id] + + try: + await event.wait() + except trio.Cancelled: + raise + + async with self.streams_lock: + if ( + stream_id in self.stream_events + and not self.event_shutting_down.is_set() + ): + self.stream_events[stream_id] = trio.Event() + async def handle_incoming(self) -> None: while not self.event_shutting_down.is_set(): try: From dfe5acf122bf8e834218862459debfa4f6b826eb Mon Sep 17 00:00:00 2001 From: acul71 Date: Wed, 6 Aug 2025 02:41:13 +0200 Subject: [PATCH 11/14] fix: add 300s timeout to tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0d8ca81a2..a7b319fb4 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ typecheck: pre-commit run mypy-local --all-files && pre-commit run pyrefly-local --all-files test: - python -m pytest tests -n auto + python -m pytest tests -n auto --timeout=300 pr: clean fix lint typecheck test From 47a146bbd516093037b476d00bb9c0986c4721d1 Mon Sep 17 00:00:00 2001 From: acul71 Date: Wed, 6 Aug 2025 02:41:53 +0200 Subject: [PATCH 12/14] fix: fix test hang --- .../stream_muxer/test_yamux_interleaving.py | 30 ++++++++++++------- .../test_yamux_interleaving_EOF.py | 5 ++++ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/tests/core/stream_muxer/test_yamux_interleaving.py b/tests/core/stream_muxer/test_yamux_interleaving.py index 1ce629523..89f4b7b87 100644 --- a/tests/core/stream_muxer/test_yamux_interleaving.py +++ b/tests/core/stream_muxer/test_yamux_interleaving.py @@ -21,6 +21,9 @@ YamuxStream, ) +# Configure logger for this test module +logger = logging.getLogger(__name__) + class TrioStreamAdapter(IRawConnection): """Adapter to make trio memory streams work with libp2p.""" @@ -31,21 +34,26 @@ def __init__(self, send_stream, receive_stream, is_initiator=False): self.is_initiator = is_initiator async def write(self, data: bytes) -> None: - logging.debug(f"Attempting to write {len(data)} bytes") + logger.debug(f"Attempting to write {len(data)} bytes") with trio.move_on_after(2): await self.send_stream.send_all(data) async def read(self, n: int | None = None) -> bytes: if n is None or n <= 0: raise ValueError("Reading unbounded or zero bytes not supported") - logging.debug(f"Attempting to read {n} bytes") + logger.debug(f"Attempting to read {n} bytes") with trio.move_on_after(2): data = await self.receive_stream.receive_some(n) - logging.debug(f"Read {len(data)} bytes") + logger.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logger.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: - logging.debug("Closing stream") + logger.debug("Closing stream") await self.send_stream.aclose() await self.receive_stream.aclose() @@ -67,7 +75,7 @@ def peer_id(key_pair): @pytest.fixture async def secure_conn_pair(key_pair, peer_id): """Create a pair of secure connections for testing.""" - logging.debug("Setting up secure_conn_pair") + logger.debug("Setting up secure_conn_pair") client_send, server_receive = memory_stream_pair() server_send, client_receive = memory_stream_pair() @@ -79,13 +87,13 @@ async def secure_conn_pair(key_pair, peer_id): async def run_outbound(nursery_results): with trio.move_on_after(5): client_conn = await insecure_transport.secure_outbound(client_rw, peer_id) - logging.debug("Outbound handshake complete") + logger.debug("Outbound handshake complete") nursery_results["client"] = client_conn async def run_inbound(nursery_results): with trio.move_on_after(5): server_conn = await insecure_transport.secure_inbound(server_rw) - logging.debug("Inbound handshake complete") + logger.debug("Inbound handshake complete") nursery_results["server"] = server_conn nursery_results = {} @@ -100,14 +108,14 @@ async def run_inbound(nursery_results): if client_conn is None or server_conn is None: raise RuntimeError("Handshake failed: client_conn or server_conn is None") - logging.debug("secure_conn_pair setup complete") + logger.debug("secure_conn_pair setup complete") return client_conn, server_conn @pytest.fixture async def yamux_pair(secure_conn_pair, peer_id): """Create a pair of Yamux multiplexers for testing.""" - logging.debug("Setting up yamux_pair") + logger.debug("Setting up yamux_pair") client_conn, server_conn = secure_conn_pair client_yamux = Yamux(client_conn, peer_id, is_initiator=True) server_yamux = Yamux(server_conn, peer_id, is_initiator=False) @@ -116,9 +124,9 @@ async def yamux_pair(secure_conn_pair, peer_id): nursery.start_soon(client_yamux.start) nursery.start_soon(server_yamux.start) await trio.sleep(0.1) - logging.debug("yamux_pair started") + logger.debug("yamux_pair started") yield client_yamux, server_yamux - logging.debug("yamux_pair cleanup") + logger.debug("yamux_pair cleanup") @pytest.mark.trio diff --git a/tests/core/stream_muxer/test_yamux_interleaving_EOF.py b/tests/core/stream_muxer/test_yamux_interleaving_EOF.py index 23d2c2b4c..5ea2a94ac 100644 --- a/tests/core/stream_muxer/test_yamux_interleaving_EOF.py +++ b/tests/core/stream_muxer/test_yamux_interleaving_EOF.py @@ -44,6 +44,11 @@ async def read(self, n: int | None = None) -> bytes: data = await self.receive_stream.receive_some(n) logging.debug(f"Read {len(data)} bytes") return data + # Raise IncompleteReadError on timeout to simulate connection closed + logging.debug("Read timed out after 2 seconds, raising IncompleteReadError") + from libp2p.io.exceptions import IncompleteReadError + + raise IncompleteReadError({"requested_count": n, "received_count": 0}) async def close(self) -> None: logging.debug("Closing stream") From 647bb31f69b8a6522acb3253496cbe0cc55826b4 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 30 Aug 2025 14:40:04 +0530 Subject: [PATCH 13/14] add running commands in readme --- interop/README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/interop/README.md b/interop/README.md index 5ecff1f1c..1c927d872 100644 --- a/interop/README.md +++ b/interop/README.md @@ -9,11 +9,17 @@ docker run -p 6379:6379 -it redis:latest ## Listener ```bash -transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=false python3 native_ping.py ``` ## Dialer ```bash -transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=true python3 native_ping.py +``` + +## From the Rust-side (Listener) + +```bash +RUST_LOG=debug redis_addr=localhost:6379 ip="0.0.0.0" transport=tcp security=noise muxer=yamux is_dialer="false" cargo run --bin native_ping ``` From a9c4ce93cccafb945ea113e3d39722d149ad6f28 Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 30 Aug 2025 14:42:17 +0530 Subject: [PATCH 14/14] update readme --- interop/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/interop/README.md b/interop/README.md index 1c927d872..ce03d2d5f 100644 --- a/interop/README.md +++ b/interop/README.md @@ -9,13 +9,13 @@ docker run -p 6379:6379 -it redis:latest ## Listener ```bash -transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=false python3 native_ping.py +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=false python3 interop/exec/native_ping.py ``` ## Dialer ```bash -transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=true python3 native_ping.py +transport=tcp ip=127.0.0.1 redis_addr=6379 port=8001 test_timeout_seconds=180 security=noise muxer=yamux is_dialer=true python3 interop/exec/native_ping.py ``` ## From the Rust-side (Listener)