From 0e4e11f5b207fdc4b7f5c44134e7d66399a23681 Mon Sep 17 00:00:00 2001 From: Tyler Date: Fri, 28 Jul 2023 17:23:33 -0700 Subject: [PATCH 1/6] add configurable option for wsMessageSizeLimit --- rpc/client_opt.go | 9 ++++++++ rpc/server_test.go | 2 +- rpc/testservice_test.go | 4 ++++ rpc/websocket.go | 13 ++++++++---- rpc/websocket_test.go | 46 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 5 deletions(-) diff --git a/rpc/client_opt.go b/rpc/client_opt.go index 5bef08cca84..55ff5f7917c 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -35,6 +35,7 @@ type clientConfig struct { // WebSocket options wsDialer *websocket.Dialer + wsMessageSizeLimit *int64 // RPC handler options idgen func() ID @@ -66,6 +67,14 @@ func WithWebsocketDialer(dialer websocket.Dialer) ClientOption { }) } +// WithWebsocketMessageSizeLimit configures the websocket message size limit used by the RPC +// client. +func WithWebsocketMessageSizeLimit(messageSizeLimit int64) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.wsMessageSizeLimit = &messageSizeLimit + }) +} + // WithHeader configures HTTP headers set by the RPC client. Headers set using this option // will be used for both HTTP and WebSocket connections. func WithHeader(key, value string) ClientOption { diff --git a/rpc/server_test.go b/rpc/server_test.go index 5d3929dfdc6..47a15b610ae 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -45,7 +45,7 @@ func TestServerRegisterName(t *testing.T) { t.Fatalf("Expected service calc to be registered") } - wantCallbacks := 13 + wantCallbacks := 14 if len(svc.callbacks) != wantCallbacks { t.Errorf("Expected %d callbacks for service 'service', got %d", wantCallbacks, len(svc.callbacks)) } diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go index eab67f1dd5d..7d873af6670 100644 --- a/rpc/testservice_test.go +++ b/rpc/testservice_test.go @@ -90,6 +90,10 @@ func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args * return echoResult{str, i, args} } +func (s *testService) Repeat(msg string, i int) string { + return strings.Repeat(msg, i) +} + func (s *testService) PeerInfo(ctx context.Context) PeerInfo { return PeerInfoFromContext(ctx) } diff --git a/rpc/websocket.go b/rpc/websocket.go index b1213fdfa66..0b35353866e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -60,7 +60,8 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - codec := newWebsocketCodec(conn, r.Host, r.Header) + cfg := &clientConfig{} + codec := newWebsocketCodec(conn, cfg, r.Host, r.Header) s.ServeCodec(codec, 0) }) } @@ -251,7 +252,7 @@ func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, er } return nil, hErr } - return newWebsocketCodec(conn, dialURL, header), nil + return newWebsocketCodec(conn, cfg, dialURL, header), nil } return connect, nil } @@ -282,8 +283,12 @@ type websocketCodec struct { pingReset chan struct{} } -func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) ServerCodec { - conn.SetReadLimit(wsMessageSizeLimit) +func newWebsocketCodec(conn *websocket.Conn, cfg *clientConfig, host string, req http.Header) ServerCodec { + if cfg.wsMessageSizeLimit != nil { + conn.SetReadLimit(*cfg.wsMessageSizeLimit) + } else { + conn.SetReadLimit(wsMessageSizeLimit) + } conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Time{}) return nil diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index fb9357605b8..78899af7cad 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -113,6 +113,52 @@ func TestWebsocketLargeCall(t *testing.T) { } } +// This test checks whether the wsMessageSizeLimit option is obeyed. +func TestWebsocketLargeRead(t *testing.T) { + t.Parallel() + + var ( + srv = newTestServer() + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"})) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + testLimit := func(limit int64) { + opts := []ClientOption{} + if limit >= 0 { + opts = append(opts, WithWebsocketMessageSizeLimit(limit)) + } else { + limit = wsMessageSizeLimit + } + client, err := DialOptions(context.Background(), wsURL, opts...) + if err != nil { + t.Fatalf("can't dial: %v", err) + } + defer client.Close() + + // Remove some bytes for json encoding overhead. + underLimit := int(limit - 128) + var res string + err = client.Call(&res, "test_repeat", "A", underLimit) + if err != nil { + t.Fatalf("unexpected error with limit %d: %v", limit, err) + } + if len(res) != underLimit || strings.Count(res, "A") != underLimit { + t.Fatal("incorrect data") + } + + err = client.Call(&res, "test_repeat", "A", limit+1) + if err == nil || err != websocket.ErrReadLimit { + t.Fatalf("wrong error with limit %d: %v expecting %v", limit, err, websocket.ErrReadLimit) + } + } + + testLimit(-1) + testLimit(wsMessageSizeLimit * 2) +} + func TestWebsocketPeerInfo(t *testing.T) { var ( s = newTestServer() From 53703b29d68a1bdc3eae7c5d9569c187627d0f1c Mon Sep 17 00:00:00 2001 From: Tyler Date: Fri, 28 Jul 2023 17:30:49 -0700 Subject: [PATCH 2/6] fix formatting --- rpc/client_opt.go | 4 ++-- rpc/websocket.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rpc/client_opt.go b/rpc/client_opt.go index 55ff5f7917c..6693992338f 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -34,8 +34,8 @@ type clientConfig struct { httpAuth HTTPAuth // WebSocket options - wsDialer *websocket.Dialer - wsMessageSizeLimit *int64 + wsDialer *websocket.Dialer + wsMessageSizeLimit *int64 // RPC handler options idgen func() ID diff --git a/rpc/websocket.go b/rpc/websocket.go index 0b35353866e..b20c88ba55d 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -60,7 +60,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - cfg := &clientConfig{} + cfg := &clientConfig{} codec := newWebsocketCodec(conn, cfg, r.Host, r.Header) s.ServeCodec(codec, 0) }) @@ -284,11 +284,11 @@ type websocketCodec struct { } func newWebsocketCodec(conn *websocket.Conn, cfg *clientConfig, host string, req http.Header) ServerCodec { - if cfg.wsMessageSizeLimit != nil { - conn.SetReadLimit(*cfg.wsMessageSizeLimit) - } else { - conn.SetReadLimit(wsMessageSizeLimit) - } + if cfg.wsMessageSizeLimit != nil { + conn.SetReadLimit(*cfg.wsMessageSizeLimit) + } else { + conn.SetReadLimit(wsMessageSizeLimit) + } conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Time{}) return nil From add89f10884013b17a9a4d9c2b217cb81ca77f79 Mon Sep 17 00:00:00 2001 From: Tyler Date: Sun, 30 Jul 2023 12:57:59 -0700 Subject: [PATCH 3/6] don't use client config as argument for newWebsocketCodec --- rpc/websocket.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rpc/websocket.go b/rpc/websocket.go index b20c88ba55d..333df6f7f8a 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -60,8 +60,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - cfg := &clientConfig{} - codec := newWebsocketCodec(conn, cfg, r.Host, r.Header) + codec := newWebsocketCodec(conn, r.Host, r.Header, wsMessageSizeLimit) s.ServeCodec(codec, 0) }) } @@ -252,7 +251,13 @@ func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, er } return nil, hErr } - return newWebsocketCodec(conn, cfg, dialURL, header), nil + var messageSizeLimit int64 + if cfg.wsMessageSizeLimit != nil { + messageSizeLimit = *cfg.wsMessageSizeLimit + } else { + messageSizeLimit = wsMessageSizeLimit + } + return newWebsocketCodec(conn, dialURL, header, messageSizeLimit), nil } return connect, nil } @@ -283,12 +288,8 @@ type websocketCodec struct { pingReset chan struct{} } -func newWebsocketCodec(conn *websocket.Conn, cfg *clientConfig, host string, req http.Header) ServerCodec { - if cfg.wsMessageSizeLimit != nil { - conn.SetReadLimit(*cfg.wsMessageSizeLimit) - } else { - conn.SetReadLimit(wsMessageSizeLimit) - } +func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, messageSizeLimit int64) ServerCodec { + conn.SetReadLimit(messageSizeLimit) conn.SetPongHandler(func(appData string) error { conn.SetReadDeadline(time.Time{}) return nil From c81497670e8620aa58ebddfdf0958cd775843a3a Mon Sep 17 00:00:00 2001 From: Tyler Date: Sat, 2 Sep 2023 12:51:42 -0700 Subject: [PATCH 4/6] fix formatting issue from merging with github web interface --- rpc/websocket.go | 1 - 1 file changed, 1 deletion(-) diff --git a/rpc/websocket.go b/rpc/websocket.go index 9cb9c368d45..54b1400bd64 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -289,7 +289,6 @@ type websocketCodec struct { pongReceived chan struct{} } - func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, messageSizeLimit int64) ServerCodec { conn.SetReadLimit(messageSizeLimit) encode := func(v interface{}, isErrorResponse bool) error { From e28e6816329aa7e3eb347cf4319e0417144f73f0 Mon Sep 17 00:00:00 2001 From: Tyler Date: Mon, 4 Sep 2023 21:22:20 -0700 Subject: [PATCH 5/6] add more info about behavior of 0 --- rpc/client_opt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/client_opt.go b/rpc/client_opt.go index 6693992338f..a2f38b8456e 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -68,7 +68,7 @@ func WithWebsocketDialer(dialer websocket.Dialer) ClientOption { } // WithWebsocketMessageSizeLimit configures the websocket message size limit used by the RPC -// client. +// client. Passing a limit of 0 means no limit. func WithWebsocketMessageSizeLimit(messageSizeLimit int64) ClientOption { return optionFunc(func(cfg *clientConfig) { cfg.wsMessageSizeLimit = &messageSizeLimit From d6a104c56220a36341560dfaeaf28769b4e14524 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 5 Sep 2023 09:25:35 +0200 Subject: [PATCH 6/6] rpc: rename wsMessageSizeLimit (it's now a default), minor test-changes --- rpc/client_opt.go | 2 +- rpc/websocket.go | 14 ++++++------- rpc/websocket_test.go | 48 ++++++++++++++++++++++++++++--------------- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/rpc/client_opt.go b/rpc/client_opt.go index a2f38b8456e..3fa045a9b9f 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -35,7 +35,7 @@ type clientConfig struct { // WebSocket options wsDialer *websocket.Dialer - wsMessageSizeLimit *int64 + wsMessageSizeLimit *int64 // wsMessageSizeLimit nil = default, 0 = no limit // RPC handler options idgen func() ID diff --git a/rpc/websocket.go b/rpc/websocket.go index 54b1400bd64..538e53a31b7 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -38,7 +38,7 @@ const ( wsPingInterval = 30 * time.Second wsPingWriteTimeout = 5 * time.Second wsPongTimeout = 30 * time.Second - wsMessageSizeLimit = 32 * 1024 * 1024 + wsDefaultReadLimit = 32 * 1024 * 1024 ) var wsBufferPool = new(sync.Pool) @@ -60,7 +60,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - codec := newWebsocketCodec(conn, r.Host, r.Header, wsMessageSizeLimit) + codec := newWebsocketCodec(conn, r.Host, r.Header, wsDefaultReadLimit) s.ServeCodec(codec, 0) }) } @@ -251,11 +251,9 @@ func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, er } return nil, hErr } - var messageSizeLimit int64 - if cfg.wsMessageSizeLimit != nil { + messageSizeLimit := int64(wsDefaultReadLimit) + if cfg.wsMessageSizeLimit != nil && *cfg.wsMessageSizeLimit >= 0 { messageSizeLimit = *cfg.wsMessageSizeLimit - } else { - messageSizeLimit = wsMessageSizeLimit } return newWebsocketCodec(conn, dialURL, header, messageSizeLimit), nil } @@ -289,8 +287,8 @@ type websocketCodec struct { pongReceived chan struct{} } -func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, messageSizeLimit int64) ServerCodec { - conn.SetReadLimit(messageSizeLimit) +func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { + conn.SetReadLimit(readLimit) encode := func(v interface{}, isErrorResponse bool) error { return conn.WriteJSON(v) } diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index 78899af7cad..e4ac5c3fad3 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -125,38 +125,52 @@ func TestWebsocketLargeRead(t *testing.T) { defer srv.Stop() defer httpsrv.Close() - testLimit := func(limit int64) { + testLimit := func(limit *int64) { opts := []ClientOption{} - if limit >= 0 { - opts = append(opts, WithWebsocketMessageSizeLimit(limit)) - } else { - limit = wsMessageSizeLimit + expLimit := int64(wsDefaultReadLimit) + if limit != nil && *limit >= 0 { + opts = append(opts, WithWebsocketMessageSizeLimit(*limit)) + if *limit > 0 { + expLimit = *limit // 0 means infinite + } } client, err := DialOptions(context.Background(), wsURL, opts...) if err != nil { t.Fatalf("can't dial: %v", err) } defer client.Close() - // Remove some bytes for json encoding overhead. - underLimit := int(limit - 128) + underLimit := int(expLimit - 128) + overLimit := expLimit + 1 + if expLimit == wsDefaultReadLimit { + // No point trying the full 32MB in tests. Just sanity-check that + // it's not obviously limited. + underLimit = 1024 + overLimit = -1 + } var res string - err = client.Call(&res, "test_repeat", "A", underLimit) - if err != nil { - t.Fatalf("unexpected error with limit %d: %v", limit, err) + // Check under limit + if err = client.Call(&res, "test_repeat", "A", underLimit); err != nil { + t.Fatalf("unexpected error with limit %d: %v", expLimit, err) } if len(res) != underLimit || strings.Count(res, "A") != underLimit { t.Fatal("incorrect data") } - - err = client.Call(&res, "test_repeat", "A", limit+1) - if err == nil || err != websocket.ErrReadLimit { - t.Fatalf("wrong error with limit %d: %v expecting %v", limit, err, websocket.ErrReadLimit) + // Check over limit + if overLimit > 0 { + err = client.Call(&res, "test_repeat", "A", expLimit+1) + if err == nil || err != websocket.ErrReadLimit { + t.Fatalf("wrong error with limit %d: %v expecting %v", expLimit, err, websocket.ErrReadLimit) + } } } + ptr := func(v int64) *int64 { return &v } - testLimit(-1) - testLimit(wsMessageSizeLimit * 2) + testLimit(ptr(-1)) // Should be ignored (use default) + testLimit(ptr(0)) // Should be ignored (use default) + testLimit(nil) // Should be ignored (use default) + testLimit(ptr(200)) + testLimit(ptr(wsDefaultReadLimit * 2)) } func TestWebsocketPeerInfo(t *testing.T) { @@ -252,7 +266,7 @@ func TestClientWebsocketLargeMessage(t *testing.T) { defer srv.Stop() defer httpsrv.Close() - respLength := wsMessageSizeLimit - 50 + respLength := wsDefaultReadLimit - 50 srv.RegisterName("test", largeRespService{respLength}) c, err := DialWebsocket(context.Background(), wsURL, "")