diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index b2711e1a8..82c2b7514 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -576,7 +576,7 @@ async def handle_incoming(self) -> None: 0, ) await self.secured_conn.write(rst_header) - elif typ == TYPE_DATA and flags & FLAG_RST: + elif (typ == TYPE_DATA or typ == TYPE_WINDOW_UPDATE) and flags & FLAG_RST: async with self.streams_lock: if stream_id in self.streams: logger.debug( @@ -661,13 +661,23 @@ async def handle_incoming(self) -> None: async with self.streams_lock: if stream_id in self.streams: stream = self.streams[stream_id] - async with stream.window_lock: + if flags & FLAG_FIN: logger.debug( - f"Received window update for stream" - f"{self.peer_id}:{stream_id}," - f" increment: {increment}" + f"Received FIN for stream {self.peer_id}:" + f"{stream_id}, marking recv_closed" ) - stream.send_window += increment + self.streams[stream_id].recv_closed = True + if self.streams[stream_id].send_closed: + self.streams[stream_id].closed = True + self.stream_events[stream_id].set() + else: + async with stream.window_lock: + logger.debug( + f"Received window update for stream" + f"{self.peer_id}:{stream_id}," + f" increment: {increment}" + ) + stream.send_window += increment except Exception as e: # Special handling for expected IncompleteReadError on stream close if isinstance(e, IncompleteReadError): diff --git a/newsfragments/931.bugfix.rst b/newsfragments/931.bugfix.rst new file mode 100644 index 000000000..da3fc0c55 --- /dev/null +++ b/newsfragments/931.bugfix.rst @@ -0,0 +1 @@ +Handle FLAG_FIN & FLAG_RST in TYPE_WINDOW_UPDATE frames