Skip to content

fail any rpc call which blocks the runServer loop for more than 1s #1861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/server/main-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@

func installSIGUSR1Handler() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGUSR1)

Check failure on line 91 in cmd/server/main-server.go

View workflow job for this annotation

GitHub Actions / Build for TestDriver.ai

undefined: syscall.SIGUSR1
go func() {
defer func() {
panichandler.PanicHandler("installSIGUSR1Handler", recover())
Expand Down Expand Up @@ -196,7 +196,7 @@
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc, true)
wps.Broker.SetClient(wshutil.DefaultRouter)
localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{})
localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{}, "conn:local")
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterRoute(wshutil.MakeConnectionRouteId(wshrpc.LocalConnName), localConnWsh, true)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, jwtToken stri
}
inputCh := make(chan []byte, wshutil.DefaultInputChSize)
outputCh := make(chan []byte, wshutil.DefaultOutputChSize)
connServerClient := wshutil.MakeWshRpc(inputCh, outputCh, *rpcCtx, &wshremote.ServerImpl{LogWriter: os.Stdout})
connServerClient := wshutil.MakeWshRpc(inputCh, outputCh, *rpcCtx, &wshremote.ServerImpl{LogWriter: os.Stdout}, authRtn.RouteId)
connServerClient.SetAuthToken(authRtn.AuthToken)
router.RegisterRoute(authRtn.RouteId, connServerClient, false)
wshclient.RouteAnnounceCommand(connServerClient, nil)
Expand Down
6 changes: 3 additions & 3 deletions cmd/wsh/cmd/wshcmd-root.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func preRunSetupRpcClient(cmd *cobra.Command, args []string) error {
if jwtToken == "" {
wshutil.SetTermRawModeAndInstallShutdownHandlers(true)
UsingTermWshMode = true
RpcClient, WrappedStdin = wshutil.SetupTerminalRpcClient(nil)
RpcClient, WrappedStdin = wshutil.SetupTerminalRpcClient(nil, "wshcmd-termclient")
return nil
}
err := setupRpcClient(nil, jwtToken)
Expand Down Expand Up @@ -148,7 +148,7 @@ func setupRpcClientWithToken(swapTokenStr string) (wshrpc.CommandAuthenticateRtn
return rtn, fmt.Errorf("no rpccontext in token")
}
RpcContext = *token.RpcContext
RpcClient, err = wshutil.SetupDomainSocketRpcClient(token.SockName, nil)
RpcClient, err = wshutil.SetupDomainSocketRpcClient(token.SockName, nil, "wshcmd")
if err != nil {
return rtn, fmt.Errorf("error setting up domain socket rpc client: %w", err)
}
Expand All @@ -166,7 +166,7 @@ func setupRpcClient(serverImpl wshutil.ServerImpl, jwtToken string) error {
if err != nil {
return fmt.Errorf("error extracting socket name from %s: %v", wshutil.WaveJwtTokenVarName, err)
}
RpcClient, err = wshutil.SetupDomainSocketRpcClient(sockName, serverImpl)
RpcClient, err = wshutil.SetupDomainSocketRpcClient(sockName, serverImpl, "wshcmd")
if err != nil {
return fmt.Errorf("error setting up domain socket rpc client: %v", err)
}
Expand Down
24 changes: 24 additions & 0 deletions cmd/wsh/cmd/wshcmd-test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"github.com/spf13/cobra"
)

var testCmd = &cobra.Command{
Use: "test",
Hidden: true,
Short: "test command",
PreRunE: preRunSetupRpcClient,
RunE: runTestCmd,
}

func init() {
rootCmd.AddCommand(testCmd)
}

func runTestCmd(cmd *cobra.Command, args []string) error {
return nil
}
Comment on lines +22 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement the actual test logic or remove placeholder code.
Returning nil suggests no operation performed. If the command is intended for testing system functionality, consider adding meaningful actions or removing the command to avoid confusion.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1
github.com/aws/smithy-go v1.22.2
github.com/creack/pty v1.1.21
github.com/emirpasic/gods v1.18.1
github.com/fsnotify/fsnotify v1.8.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang-migrate/migrate/v4 v4.18.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
Expand Down
87 changes: 87 additions & 0 deletions pkg/util/ds/expmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package ds

import (
"sync"
"time"

"github.com/emirpasic/gods/trees/binaryheap"
)

// an ExpMap has "expiring" keys, which are automatically deleted after a certain time

type ExpMap[T any] struct {
lock *sync.Mutex
expHeap *binaryheap.Heap // heap of expEntries (sorted by time)
m map[string]expMapEntry[T]
}

type expMapEntry[T any] struct {
Val T
Exp time.Time
}

type expEntry struct {
Key string
Exp time.Time
}

func heapComparator(aArg, bArg any) int {
a := aArg.(expEntry)
b := bArg.(expEntry)
if a.Exp.Before(b.Exp) {
return -1
} else if a.Exp.After(b.Exp) {
return 1
}
return 0
}

func MakeExpMap[T any]() *ExpMap[T] {
return &ExpMap[T]{
lock: &sync.Mutex{},
expHeap: binaryheap.NewWith(heapComparator),
m: make(map[string]expMapEntry[T]),
}
}

func (em *ExpMap[T]) Set(key string, value T, exp time.Time) {
em.lock.Lock()
defer em.lock.Unlock()
oldEntry, ok := em.m[key]
em.m[key] = expMapEntry[T]{Val: value, Exp: exp}
if !ok || oldEntry.Exp != exp {
em.expHeap.Push(expEntry{Key: key, Exp: exp}) // this might create duplicates. that's ok.
}
}

func (em *ExpMap[T]) expireItems_nolock() {
// should already hold the lock
now := time.Now()
for {
if em.expHeap.Empty() {
break
}
// we know it isn't empty, so we ignore "ok"
topI, _ := em.expHeap.Peek()
top := topI.(expEntry)
if top.Exp.After(now) {
break
}
em.expHeap.Pop()
entry, ok := em.m[top.Key]
if ok && (entry.Exp.Before(now) || entry.Exp.Equal(now)) {
delete(em.m, top.Key)
}
}
}

func (em *ExpMap[T]) Get(key string) (T, bool) {
em.lock.Lock()
defer em.lock.Unlock()
em.expireItems_nolock()
v, ok := em.m[key]
return v.Val, ok
}
2 changes: 1 addition & 1 deletion pkg/waveapp/waveapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (client *Client) Connect() error {
if err != nil {
return fmt.Errorf("error extracting socket name from %s: %v", wshutil.WaveJwtTokenVarName, err)
}
rpcClient, err := wshutil.SetupDomainSocketRpcClient(sockName, client.ServerImpl)
rpcClient, err := wshutil.SetupDomainSocketRpcClient(sockName, client.ServerImpl, "vdomclient")
if err != nil {
return fmt.Errorf("error setting up domain socket rpc client: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/wshrpc/wshclient/barerpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func GetBareRpcClient() *wshutil.WshRpc {
waveSrvClient_Once.Do(func() {
inputCh := make(chan []byte, DefaultInputChSize)
outputCh := make(chan []byte, DefaultOutputChSize)
waveSrvClient_Singleton = wshutil.MakeWshRpc(inputCh, outputCh, wshrpc.RpcContext{}, &WshServerImpl)
waveSrvClient_Singleton = wshutil.MakeWshRpc(inputCh, outputCh, wshrpc.RpcContext{}, &WshServerImpl, "bare-client")
wshutil.DefaultRouter.RegisterRoute(BareClientRoute, waveSrvClient_Singleton, true)
wps.Broker.SetClient(wshutil.DefaultRouter)
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/wshrpc/wshremote/wshremote.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ func (impl *ServerImpl) MessageCommand(ctx context.Context, data wshrpc.CommandM
return nil
}

func (impl *ServerImpl) StreamTestCommand(ctx context.Context) chan wshrpc.RespOrErrorUnion[int] {
ch := make(chan wshrpc.RespOrErrorUnion[int], 16)
go func() {
defer close(ch)
idx := 0
for {
ch <- wshrpc.RespOrErrorUnion[int]{Response: idx}
idx++
if idx == 1000 {
break
}
}
}()
return ch
}

type ByteRangeType struct {
All bool
Start int64
Expand Down
2 changes: 1 addition & 1 deletion pkg/wshrpc/wshserver/wshserverutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetMainRpcClient() *wshutil.WshRpc {
waveSrvClient_Once.Do(func() {
inputCh := make(chan []byte, DefaultInputChSize)
outputCh := make(chan []byte, DefaultOutputChSize)
waveSrvClient_Singleton = wshutil.MakeWshRpc(inputCh, outputCh, wshrpc.RpcContext{}, &WshServerImpl)
waveSrvClient_Singleton = wshutil.MakeWshRpc(inputCh, outputCh, wshrpc.RpcContext{}, &WshServerImpl, "main-client")
})
return waveSrvClient_Singleton
}
5 changes: 0 additions & 5 deletions pkg/wshutil/wshrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,21 @@ func (router *WshRouter) getAnnouncedRoute(routeId string) string {
func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool {
rpc := router.GetRpc(routeId)
if rpc != nil {
// log.Printf("[router] sending message to %q via rpc\n", routeId)
rpc.SendRpcMessage(msgBytes)
return true
}
upstream := router.GetUpstreamClient()
if upstream != nil {
log.Printf("[router] sending message to %q via upstream\n", routeId)
upstream.SendRpcMessage(msgBytes)
return true
} else {
log.Printf("[router] sending message to %q via announced route\n", routeId)
// we are the upstream, so consult our announced routes map
localRouteId := router.getAnnouncedRoute(routeId)
log.Printf("[router] local route id: %q\n", localRouteId)
rpc := router.GetRpc(localRouteId)
if rpc == nil {
log.Printf("[router] no rpc for local route id %q\n", localRouteId)
return false
}
log.Printf("[router] sending message to %q via local route\n", localRouteId)
rpc.SendRpcMessage(msgBytes)
return true
}
Expand Down
Loading
Loading