|
1 |
| -from dataclasses import ( |
2 |
| - dataclass, |
3 |
| -) |
4 |
| -import json |
5 | 1 | import logging
|
6 | 2 |
|
7 | 3 | import multiaddr
|
|
38 | 34 |
|
39 | 35 |
|
40 | 36 | async def handle_ping(stream: INetStream) -> None:
|
41 |
| - while True: |
42 |
| - try: |
43 |
| - payload = await stream.read(PING_LENGTH) |
44 |
| - peer_id = stream.muxed_conn.peer_id |
45 |
| - if payload is not None: |
46 |
| - print(f"received ping from {peer_id}") |
| 37 | + """Handle incoming ping requests from rust-libp2p clients""" |
| 38 | + peer_id = stream.muxed_conn.peer_id |
| 39 | + print(f"[INFO] New ping stream opened by {peer_id}") |
| 40 | + logging.info(f"Ping handler called for peer {peer_id}") |
47 | 41 |
|
48 |
| - await stream.write(payload) |
49 |
| - print(f"responded with pong to {peer_id}") |
| 42 | + ping_count = 0 |
50 | 43 |
|
51 |
| - except Exception: |
52 |
| - await stream.reset() |
53 |
| - break |
| 44 | + try: |
| 45 | + while True: |
| 46 | + try: |
| 47 | + print(f"[INFO] Wailting for ping data from {peer_id}...") |
| 48 | + logging.debug(f"Stream state: {stream}") |
| 49 | + data = await stream.read(PING_LENGTH) |
| 50 | + |
| 51 | + if not data: |
| 52 | + print( |
| 53 | + f"[INFO] No data received,conneciton likely closed by {peer_id}" |
| 54 | + ) |
| 55 | + logging.debug("No data received, stream closed") |
| 56 | + break |
| 57 | + |
| 58 | + if len(data) == 0: |
| 59 | + print(f"[INFO] Empty data received, connection closed by {peer_id}") |
| 60 | + logging.debug("Empty data received") |
| 61 | + break |
| 62 | + |
| 63 | + ping_count += 1 |
| 64 | + print( |
| 65 | + f"[PING {ping_count}] Received ping from {peer_id}:" |
| 66 | + f" {len(data)} bytes" |
| 67 | + ) |
| 68 | + logging.debug(f"Ping data: {data.hex()}") |
| 69 | + |
| 70 | + # Echo the data back (this is what ping protocol does) |
| 71 | + await stream.write(data) |
| 72 | + print(f"[PING {ping_count}] Echoed ping back to {peer_id}") |
| 73 | + |
| 74 | + except Exception as e: |
| 75 | + print(f"[ERROR] Error in ping loop with {peer_id}: {e}") |
| 76 | + logging.exception("Ping loop error") |
| 77 | + break |
54 | 78 |
|
| 79 | + except Exception as e: |
| 80 | + print(f"[ERROR] Error handling ping from {peer_id}: {e}") |
| 81 | + logging.exception("Ping handler error") |
| 82 | + finally: |
| 83 | + try: |
| 84 | + print(f"[INFO] Closing ping stream with {peer_id}") |
| 85 | + await stream.close() |
| 86 | + except Exception as e: |
| 87 | + logging.debug(f"Error closing stream: {e}") |
55 | 88 |
|
56 |
| -async def send_ping(stream: INetStream) -> None: |
57 |
| - try: |
58 |
| - payload = b"\x01" * PING_LENGTH |
59 |
| - print(f"sending ping to {stream.muxed_conn.peer_id}") |
| 89 | + print(f"[INFO] Ping session completed with {peer_id} ({ping_count} pings)") |
60 | 90 |
|
61 |
| - await stream.write(payload) |
62 | 91 |
|
63 |
| - with trio.fail_after(RESP_TIMEOUT): |
64 |
| - response = await stream.read(PING_LENGTH) |
| 92 | +async def send_ping(stream: INetStream, count: int = 1) -> None: |
| 93 | + """Send a sequence of pings compatible with rust-libp2p.""" |
| 94 | + peer_id = stream.muxed_conn.peer_id |
| 95 | + print(f"[INFO] Starting ping sequence to {peer_id} ({count} pings)") |
65 | 96 |
|
66 |
| - if response == payload: |
67 |
| - print(f"received pong from {stream.muxed_conn.peer_id}") |
| 97 | + import os |
| 98 | + import time |
68 | 99 |
|
69 |
| - except Exception as e: |
70 |
| - print(f"error occurred: {e}") |
| 100 | + rtts = [] |
| 101 | + |
| 102 | + for i in range(1, count + 1): |
| 103 | + try: |
| 104 | + # Generate random 32-byte payload as per ping protcol spec |
| 105 | + payload = os.urandom(PING_LENGTH) |
| 106 | + print(f"[PING {i}/{count}] Sending ping to {peer_id}") |
| 107 | + logging.debug(f"Sending payload: {payload.hex()}") |
| 108 | + start_time = time.time() |
| 109 | + |
| 110 | + await stream.write(payload) |
| 111 | + |
| 112 | + with trio.fail_after(RESP_TIMEOUT): |
| 113 | + response = await stream.read(PING_LENGTH) |
| 114 | + |
| 115 | + end_time = time.time() |
| 116 | + rtt = (end_time - start_time) * 1000 |
| 117 | + |
| 118 | + if ( |
| 119 | + response |
| 120 | + and len(response) >= PING_LENGTH |
| 121 | + and response[:PING_LENGTH] == payload |
| 122 | + ): |
| 123 | + rtts.append(rtt) |
| 124 | + print(f"[PING {i}] Successful RTT: {rtt:.2f}ms") |
| 125 | + else: |
| 126 | + print(f"[ERROR] Ping {i} failed: response mismatch or incomplete") |
| 127 | + if response: |
| 128 | + logging.debug(f"Expecte: {payload.hex()}") |
| 129 | + logging.debug(f"Received: {response.hex()}") |
| 130 | + |
| 131 | + if i < count: |
| 132 | + await trio.sleep(1) |
| 133 | + |
| 134 | + except trio.TooSlowError: |
| 135 | + print(f"[ERROR] Ping {i} timed out after {RESP_TIMEOUT}s") |
| 136 | + except Exception as e: |
| 137 | + print(f"[ERROR] Ping {i} failed: {e}") |
| 138 | + logging.exception(f"Ping {i} error") |
| 139 | + |
| 140 | + # Print statistics |
| 141 | + if rtts: |
| 142 | + avg_rtt = sum(rtts) / len(rtts) |
| 143 | + min_rtt = min(rtts) |
| 144 | + max_rtt = max(rtts) # Fixed typo: was max_rtts |
| 145 | + success_count = len(rtts) |
| 146 | + loss_rate = ((count - success_count) / count) * 100 |
| 147 | + |
| 148 | + print("\n[STATS] Ping Statistics:") |
| 149 | + print( |
| 150 | + f" Packets: Sent={count}, Received={success_count}," |
| 151 | + f" Lost={count - success_count}" |
| 152 | + ) |
| 153 | + print(f" Loss rate: {loss_rate:.1f}%") |
| 154 | + print(f" RTT: min={min_rtt:.2f}ms, avg={avg_rtt:.2f}ms, max={max_rtt:.2f}ms") |
| 155 | + else: |
| 156 | + print(f"\n[STATS] All pings failed ({count} attempts)") |
71 | 157 |
|
72 | 158 |
|
73 | 159 | async def run_test(
|
74 | 160 | transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
|
75 | 161 | ):
|
76 |
| - import time |
77 |
| - |
78 |
| - logging.info("Starting run_test") |
79 |
| - |
80 | 162 | redis_client = RedisClient(
|
81 | 163 | redis.Redis(host="localhost", port=int(redis_addr), db=0)
|
82 | 164 | )
|
83 | 165 | (host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
|
84 |
| - logging.info(f"Running ping test local_peer={host.get_id()}") |
85 |
| - |
86 |
| - async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: |
| 166 | + async with host.run(listen_addrs=[listen_addr]): |
87 | 167 | if not is_dialer:
|
| 168 | + print("[INFO] Starting py-libp2p ping server...") |
| 169 | + |
| 170 | + print(f"[INFO] Registering ping handler for protocol: {PING_PROTOCOL_ID}") |
88 | 171 | host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
|
| 172 | + |
| 173 | + # Also register alternative protocol IDs for better compatibilty |
| 174 | + alt_protcols = [ |
| 175 | + TProtocol("/ping/1.0.0"), |
| 176 | + TProtocol("/libp2p/ping/1.0.0"), |
| 177 | + ] |
| 178 | + |
| 179 | + for alt_proto in alt_protcols: |
| 180 | + print(f"[INFO] Also registering handler for: {alt_proto}") |
| 181 | + host.set_stream_handler(alt_proto, handle_ping) |
| 182 | + |
| 183 | + print("[INFO] Server started successfully!") |
| 184 | + print(f"[INFO] Peer ID: {host.get_id()}") |
| 185 | + print(f"[INFO] Listening: /ip4/{ip}/tcp/{port}") |
| 186 | + print(f"[INFO] Primary Protocol: {PING_PROTOCOL_ID}") |
| 187 | + |
89 | 188 | ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
|
90 | 189 | redis_client.rpush("listenerAddr", ma)
|
91 | 190 |
|
92 |
| - logging.info(f"Test instance, listening: {ma}") |
| 191 | + print("[INFO] Pushed address to Redis database") |
| 192 | + await trio.sleep_forever() |
93 | 193 | else:
|
| 194 | + print("[INFO] Starting py-libp2p ping client...") |
| 195 | + |
| 196 | + print("[INFO] Fetching remote address from Redis database...") |
94 | 197 | redis_addr = redis_client.brpop("listenerAddr", timeout=5)
|
95 | 198 | destination = redis_addr[0].decode()
|
96 | 199 | maddr = multiaddr.Multiaddr(destination)
|
97 | 200 | info = info_from_p2p_addr(maddr)
|
| 201 | + target_peer_id = info.peer_id |
98 | 202 |
|
99 |
| - handshake_start = time.perf_counter() |
| 203 | + print(f"[INFO] Our Peer ID: {host.get_id()}") |
| 204 | + print(f"[INFO] Target: {destination}") |
| 205 | + print(f"[INFO] Target Peer ID: {target_peer_id}") |
| 206 | + print("[INFO] Connecting to peer...") |
100 | 207 |
|
101 |
| - logging.info("GETTING READY FOR CONNECTION") |
102 | 208 | await host.connect(info)
|
103 |
| - logging.info("HOST CONNECTED") |
104 |
| - |
105 |
| - # TILL HERE EVERYTHING IS FINE |
106 |
| - |
107 |
| - stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID]) |
108 |
| - logging.info("CREATED NEW STREAM") |
109 |
| - |
110 |
| - # DOES NOT MORE FORWARD FROM THIS |
111 |
| - logging.info("Remote conection established") |
112 |
| - |
113 |
| - nursery.start_soon(send_ping, stream) |
114 |
| - |
115 |
| - handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0 |
116 |
| - |
117 |
| - logging.info(f"handshake time: {handshake_plus_ping:.2f}ms") |
118 |
| - return |
119 |
| - |
120 |
| - await trio.sleep_forever() |
121 |
| - |
122 |
| - |
123 |
| -@dataclass |
124 |
| -class Report: |
125 |
| - handshake_plus_one_rtt_millis: float |
126 |
| - ping_rtt_millis: float |
127 |
| - |
128 |
| - def gen_report(self): |
129 |
| - return json.dumps(self.__dict__) |
| 209 | + print("[INFO] Connection established!") |
| 210 | + |
| 211 | + # Try protocols in order of preference |
| 212 | + # Start with the standard libp2p ping protocol |
| 213 | + protocols_to_try = [ |
| 214 | + PING_PROTOCOL_ID, # /ipfs/ping/1.0.0 - standard protocol |
| 215 | + TProtocol("/ping/1.0.0"), # Alternative |
| 216 | + TProtocol("/libp2p/ping/1.0.0"), # Another alternative |
| 217 | + ] |
| 218 | + |
| 219 | + stream = None |
| 220 | + |
| 221 | + for proto in protocols_to_try: |
| 222 | + try: |
| 223 | + print(f"[INFO] Trying to open stream with protocol: {proto}") |
| 224 | + stream = await host.new_stream(target_peer_id, [proto]) |
| 225 | + print(f"[INFO] Stream opened with protocol: {proto}") |
| 226 | + break |
| 227 | + except Exception as e: |
| 228 | + print(f"[ERROR] Failed to open stream with {proto}: {e}") |
| 229 | + logging.debug(f"Protocol {proto} failed: {e}") |
| 230 | + continue |
| 231 | + |
| 232 | + if not stream: |
| 233 | + print("[ERROR] Failed to open stream with any ping protocol") |
| 234 | + print("[ERROR] Ensure the target peer supports one of these protocols") |
| 235 | + for proto in protocols_to_try: |
| 236 | + print(f"[ERROR] - {proto}") |
| 237 | + return 1 |
| 238 | + |
| 239 | + await send_ping(stream) |
| 240 | + |
| 241 | + await stream.close() |
| 242 | + print("[INFO] Stream closed successfully") |
| 243 | + |
| 244 | + print("\n[INFO] Client stopped") |
| 245 | + return 0 |
0 commit comments