Skip to content

Commit 0c0a78d

Browse files
committed
pkg/portfwdserver: Close stream by returning from the handler method
The documentation says: https://pkg.go.dev/google.golang.org/grpc#BidiStreamingServer > To terminate the stream, return from the handler method and return an error from the status package, or use nil to indicate an OK status code. - Changed to calling `proxy.HandleConn()`/`bicopy.Bicopy()` as goroutines, then wait `GRPCServerRW.closeCh` in `TunnelServer.Start()` for returning from handler before `HandleConn()`/`bicopy.Bicopy()` finishes. - Added `CloseRead()` and `CloseWrite()` to `GRPCServerRW`. As a result, `GRPCServerRW` may be expected to pass the test added by #3708 with inetaf/tcpproxy's `tcpproxy.DialProxy()` or `bicopy.Bicopy()`. Signed-off-by: Norio Nomura <[email protected]>
1 parent fd04bd2 commit 0c0a78d

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

pkg/portfwd/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
func HandleTCPConnection(_ context.Context, dialContext func(ctx context.Context, network string, addr string) (net.Conn, error), conn net.Conn, guestAddr string) {
2222
proxy := tcpproxy.DialProxy{Addr: guestAddr, DialContext: dialContext}
2323
proxy.HandleConn(conn)
24+
logrus.Debugf("tcp proxy for guestAddr: %s closed", guestAddr)
2425
}
2526

2627
func HandleUDPConnection(ctx context.Context, dialContext func(ctx context.Context, network string, addr string) (net.Conn, error), conn net.PacketConn, guestAddr string) {
@@ -39,6 +40,7 @@ func HandleUDPConnection(ctx context.Context, dialContext func(ctx context.Conte
3940
}
4041
}()
4142
proxy.Run()
43+
logrus.Debugf("udp proxy for guestAddr: %s closed", guestAddr)
4244
}
4345

4446
func DialContextToGRPCTunnel(client *guestagentclient.GuestAgentClient) func(ctx context.Context, network, addr string) (net.Conn, error) {
@@ -97,6 +99,7 @@ func (g *GrpcClientRW) Read(p []byte) (n int, err error) {
9799
}
98100

99101
func (g *GrpcClientRW) Close() error {
102+
logrus.Debugf("closing GrpcClientRW for id: %s", g.id)
100103
return g.stream.CloseSend()
101104
}
102105

pkg/portfwdserver/server.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/containers/gvisor-tap-vsock/pkg/tcpproxy"
16+
"github.com/sirupsen/logrus"
1617

1718
"github.com/lima-vm/lima/v2/pkg/bicopy"
1819
"github.com/lima-vm/lima/v2/pkg/guestagent/api"
@@ -41,7 +42,11 @@ func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
4142
if err != nil {
4243
return err
4344
}
44-
rw := &GRPCServerRW{stream: stream, id: in.Id}
45+
rw := &GRPCServerRW{stream: stream, id: in.Id, closeCh: make(chan any, 1)}
46+
go func() {
47+
<-ctx.Done()
48+
rw.Close()
49+
}()
4550

4651
// FIXME: consolidate bicopy and tcpproxy into one
4752
//
@@ -51,20 +56,26 @@ func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
5156
// However, the tcpproxy package can't pass the CI for WSL2 (experimental):
5257
// https://github.com/lima-vm/lima/pull/3686#issuecomment-3034842616
5358
if wsl2, _ := seemsWSL2(); wsl2 {
54-
bicopy.Bicopy(rw, conn, nil)
59+
go bicopy.Bicopy(rw, conn, nil)
5560
} else {
5661
proxy := tcpproxy.DialProxy{DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
5762
return conn, nil
5863
}}
59-
proxy.HandleConn(rw)
64+
go proxy.HandleConn(rw)
6065
}
66+
// The stream will be closed when this function returns.
67+
// Wait here until rw.Close(), rw.CloseRead(), or rw.CloseWrite() is called.
68+
// We can't close rw.closeCh since the calling order of Close* methods is not guaranteed.
69+
<-rw.closeCh
70+
logrus.Debugf("closed GRPCServerRW for id: %s", in.Id)
6171

6272
return nil
6373
}
6474

6575
type GRPCServerRW struct {
66-
id string
67-
stream api.GuestService_TunnelServer
76+
id string
77+
stream api.GuestService_TunnelServer
78+
closeCh chan any
6879
}
6980

7081
var _ net.Conn = (*GRPCServerRW)(nil)
@@ -84,6 +95,23 @@ func (g *GRPCServerRW) Read(p []byte) (n int, err error) {
8495
}
8596

8697
func (g *GRPCServerRW) Close() error {
98+
logrus.Debugf("closing GRPCServerRW for id: %s", g.id)
99+
g.closeCh <- struct{}{}
100+
return nil
101+
}
102+
103+
// By adding CloseRead and CloseWrite methods, GRPCServerRW can work with
104+
// other than containers/gvisor-tap-vsock/pkg/tcpproxy, e.g., inetaf/tcpproxy, bicopy.Bicopy.
105+
106+
func (g *GRPCServerRW) CloseRead() error {
107+
logrus.Debugf("closing read GRPCServerRW for id: %s", g.id)
108+
g.closeCh <- struct{}{}
109+
return nil
110+
}
111+
112+
func (g *GRPCServerRW) CloseWrite() error {
113+
logrus.Debugf("closing write GRPCServerRW for id: %s", g.id)
114+
g.closeCh <- struct{}{}
87115
return nil
88116
}
89117

0 commit comments

Comments
 (0)