33
33
from libp2p .io .exceptions import (
34
34
IncompleteReadError ,
35
35
)
36
+ from libp2p .io .utils import (
37
+ read_exactly ,
38
+ )
36
39
from libp2p .network .connection .exceptions import (
37
40
RawConnError ,
38
41
)
@@ -127,20 +130,22 @@ async def write(self, data: bytes) -> None:
127
130
"Timed out waiting for window update after 5 seconds."
128
131
)
129
132
130
- if self .closed :
131
- raise MuxedStreamError ("Stream is closed" )
133
+ if self .closed :
134
+ raise MuxedStreamError ("Stream is closed" )
135
+
132
136
133
- # Calculate how much we can send now
137
+ # Calculate how much we can send now
138
+ async with self .window_lock :
134
139
to_send = min (self .send_window , total_len - sent )
135
140
chunk = data [sent : sent + to_send ]
136
141
self .send_window -= to_send
137
142
138
- # Send the data
139
- header = struct .pack (
140
- YAMUX_HEADER_FORMAT , 0 , TYPE_DATA , 0 , self .stream_id , len (chunk )
141
- )
142
- await self .conn .secured_conn .write (header + chunk )
143
- sent += to_send
143
+ # Send the data
144
+ header = struct .pack (
145
+ YAMUX_HEADER_FORMAT , 0 , TYPE_DATA , 0 , self .stream_id , len (chunk )
146
+ )
147
+ await self .conn .secured_conn .write (header + chunk )
148
+ sent += to_send
144
149
145
150
async def send_window_update (self , increment : int , skip_lock : bool = False ) -> None :
146
151
"""
@@ -397,7 +402,6 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None:
397
402
else :
398
403
if self .on_close is not None :
399
404
await self .on_close ()
400
- await trio .sleep (0.1 )
401
405
402
406
@property
403
407
def is_closed (self ) -> bool :
@@ -537,13 +541,17 @@ async def handle_incoming(self) -> None:
537
541
self .event_shutting_down .set ()
538
542
await self ._cleanup_on_error ()
539
543
break
544
+
545
+ # Debug: log raw header bytes
546
+ logging .debug (f"Raw header bytes: { header .hex ()} " )
547
+
540
548
version , typ , flags , stream_id , length = struct .unpack (
541
549
YAMUX_HEADER_FORMAT , header
542
550
)
543
551
logger .debug (
544
552
f"Received header for peer { self .peer_id } :"
545
- f"type= { typ } , flags= { flags } , stream_id= { stream_id } , "
546
- f"length={ length } "
553
+ f"version= { version } , type= { typ } , flags= { flags } , "
554
+ f"stream_id= { stream_id } , length={ length } "
547
555
)
548
556
if (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE ) and flags & FLAG_SYN :
549
557
async with self .streams_lock :
@@ -668,6 +676,85 @@ async def handle_incoming(self) -> None:
668
676
f" increment: { increment } "
669
677
)
670
678
stream .send_window += increment
679
+ elif typ == TYPE_DATA :
680
+ async with self .streams_lock :
681
+ if stream_id in self .streams :
682
+ # Store data - ensure data is not None before extending
683
+ if data is not None and len (data ) > 0 :
684
+ self .stream_buffers [stream_id ].extend (data )
685
+ if stream_id in self .stream_events :
686
+ self .stream_events [stream_id ].set ()
687
+ # Handle flags
688
+ if flags & FLAG_SYN :
689
+ logging .debug (
690
+ f"Received late SYN for stream { stream_id } "
691
+ f"for peer { self .peer_id } "
692
+ )
693
+ if flags & FLAG_ACK :
694
+ logging .debug (
695
+ f"Received ACK for stream { stream_id } "
696
+ f"for peer { self .peer_id } "
697
+ )
698
+ if flags & FLAG_FIN :
699
+ logging .debug (
700
+ f"Received FIN for stream { self .peer_id } :"
701
+ f"{ stream_id } , marking recv_closed"
702
+ )
703
+ self .streams [stream_id ].recv_closed = True
704
+ if self .streams [stream_id ].send_closed :
705
+ self .streams [stream_id ].closed = True
706
+ if stream_id in self .stream_events :
707
+ self .stream_events [stream_id ].set ()
708
+ if flags & FLAG_RST :
709
+ logging .debug (
710
+ f"Resetting stream { stream_id } "
711
+ f"for peer { self .peer_id } "
712
+ )
713
+ self .streams [stream_id ].closed = True
714
+ self .streams [stream_id ].reset_received = True
715
+ if stream_id in self .stream_events :
716
+ self .stream_events [stream_id ].set ()
717
+ else :
718
+ if flags & FLAG_SYN :
719
+ if stream_id not in self .streams :
720
+ stream = YamuxStream (stream_id , self , False )
721
+ self .streams [stream_id ] = stream
722
+ # Initialize stream buffer
723
+ buffer = bytearray ()
724
+ if data is not None and len (data ) > 0 :
725
+ buffer .extend (data )
726
+ self .stream_buffers [stream_id ] = buffer
727
+ self .stream_events [stream_id ] = trio .Event ()
728
+ self .stream_events [stream_id ].set ()
729
+ ack_header = struct .pack (
730
+ YAMUX_HEADER_FORMAT ,
731
+ 0 ,
732
+ TYPE_DATA ,
733
+ FLAG_ACK ,
734
+ stream_id ,
735
+ 0 ,
736
+ )
737
+ await self .secured_conn .write (ack_header )
738
+ logging .debug (
739
+ f"Sending stream { stream_id } "
740
+ f"to channel for peer { self .peer_id } "
741
+ )
742
+ await self .new_stream_send_channel .send (stream )
743
+ else :
744
+ rst_header = struct .pack (
745
+ YAMUX_HEADER_FORMAT ,
746
+ 0 ,
747
+ TYPE_DATA ,
748
+ FLAG_RST ,
749
+ stream_id ,
750
+ 0 ,
751
+ )
752
+ await self .secured_conn .write (rst_header )
753
+ else :
754
+ logging .warning (
755
+ f"Received data for unknown stream { stream_id } "
756
+ f"from peer { self .peer_id } (length={ length } )"
757
+ )
671
758
except Exception as e :
672
759
# Special handling for expected IncompleteReadError on stream close
673
760
if isinstance (e , IncompleteReadError ):
@@ -687,7 +774,7 @@ async def handle_incoming(self) -> None:
687
774
else :
688
775
logger .error (
689
776
f"Error in handle_incoming for peer { self .peer_id } : "
690
- + f"{ type (e ).__name__ } : { str (e )} "
777
+ f"{ type (e ).__name__ } : { str (e )} "
691
778
)
692
779
else :
693
780
# Handle RawConnError with more nuance
@@ -717,8 +804,6 @@ async def handle_incoming(self) -> None:
717
804
):
718
805
await self ._cleanup_on_error ()
719
806
break
720
- # For other errors, log and continue
721
- await trio .sleep (0.01 )
722
807
723
808
async def _cleanup_on_error (self ) -> None :
724
809
# Set shutdown flag first to prevent other operations
@@ -760,6 +845,6 @@ async def _cleanup_on_error(self) -> None:
760
845
except Exception as callback_error :
761
846
logger .error (f"Error in on_close callback: { callback_error } " )
762
847
763
- # Cancel nursery tasks
848
+ # Cancel nursery tasks if available
764
849
if self ._nursery :
765
850
self ._nursery .cancel_scope .cancel ()
0 commit comments