Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: bug-fix
summary: Fix failing upgrade command when gRPC server interrupts connection
component: "elastic-agent"
pr: https://github.com/elastic/elastic-agent/pull/4519
issue: https://github.com/elastic/elastic-agent/issues/3890
18 changes: 16 additions & 2 deletions internal/pkg/agent/cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"context"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -55,10 +58,14 @@ func newUpgradeCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Comman
}

func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
c := client.New()
return upgradeCmdWithClient(streams, cmd, args, c)
}

func upgradeCmdWithClient(streams *cli.IOStreams, cmd *cobra.Command, args []string, c client.Client) error {
version := args[0]
sourceURI, _ := cmd.Flags().GetString(flagSourceURI)

c := client.New()
err := c.Connect(context.Background())
if err != nil {
return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address()))
Expand Down Expand Up @@ -106,7 +113,14 @@ func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, args []string) error
skipDefaultPgp, _ := cmd.Flags().GetBool(flagSkipDefaultPgp)
version, err = c.Upgrade(context.Background(), version, sourceURI, skipVerification, skipDefaultPgp, pgpChecks...)
if err != nil {
return errors.New(err, "Failed trigger upgrade of daemon")
s, ok := status.FromError(err)
// Sometimes the gRPC server shuts down before replying to the command which is expected
// we can determine this state by the EOF error coming from the server.
// If the server is just unavailable/not running, we should not succeed.
isConnectionInterrupted := ok && s.Code() == codes.Unavailable && strings.Contains(s.Message(), "EOF")
if !isConnectionInterrupted {
return errors.New(err, "Failed trigger upgrade of daemon")
}
}
fmt.Fprintf(streams.Out, "Upgrade triggered to version %s, Elastic Agent is currently restarting\n", version)
return nil
Expand Down
90 changes: 90 additions & 0 deletions internal/pkg/agent/cmd/upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"context"
"net"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
)

func TestUpgradeCmd(t *testing.T) {
t.Run("no error when connection gets interrupted", func(t *testing.T) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err)
defer tcpServer.Close()

s := grpc.NewServer()
defer s.Stop()

upgradeCh := make(chan struct{})
mock := &mockServer{upgradeStop: upgradeCh}
cproto.RegisterElasticAgentControlServer(s, mock)
go func() {
err := s.Serve(tcpServer)
assert.NoError(t, err)
}()

clientCh := make(chan struct{})
// use HTTP prefix for the dialer to use TCP, otherwise it's a unix socket/named pipe
c := client.New(client.WithAddress("http://" + tcpServer.Addr().String()))
args := []string{"--skip-verify", "8.13.0"}
streams := cli.NewIOStreams()
cmd := newUpgradeCommandWithArgs(args, streams)

// the upgrade command will hang until the server shut down
go func() {
err = upgradeCmdWithClient(streams, cmd, args, c)
assert.NoError(t, err)
// verify that we actually talked to the server
counter := atomic.LoadInt32(&mock.upgrades)
assert.Equal(t, int32(1), counter, "server should have handled one upgrade")
// unblock the further test execution
close(clientCh)
}()

// we will know that the client reached the server watching the `mock.upgrades` counter
require.Eventually(t, func() bool {
counter := atomic.LoadInt32(&mock.upgrades)
return counter > 0
}, 5*time.Second, 100*time.Millisecond)

// then we close the tcp server which is supposed to interrupt the connection
s.Stop()
// this stops the mock server
close(upgradeCh)
// this makes sure all client assertions are done
<-clientCh
})
}

type mockServer struct {
cproto.ElasticAgentControlServer
upgradeStop <-chan struct{}
upgrades int32
}

func (s *mockServer) Upgrade(ctx context.Context, r *cproto.UpgradeRequest) (resp *cproto.UpgradeResponse, err error) {
atomic.AddInt32(&s.upgrades, 1)
<-s.upgradeStop
return nil, nil
}

func (s *mockServer) State(ctx context.Context, r *cproto.Empty) (resp *cproto.StateResponse, err error) {
return &cproto.StateResponse{
State: cproto.State_HEALTHY,
Info: &cproto.StateAgentInfo{},
}, nil
}
10 changes: 9 additions & 1 deletion testing/upgradetest/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"runtime"
"strings"
"time"

"github.com/otiai10/copy"
Expand Down Expand Up @@ -325,7 +326,14 @@ func PerformUpgrade(

upgradeOutput, err := startFixture.Exec(ctx, upgradeCmdArgs)
if err != nil {
return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
// Sometimes the gRPC server shuts down before replying to the command which is expected
// we can determine this state by the EOF error coming from the server.
// If the server is just unavailable/not running, we should not succeed.
// Starting with version 8.13.2, this is handled by the upgrade command itself.
isConnectionInterrupted := strings.Contains(err.Error(), "Unavailable") && strings.Contains(err.Error(), "EOF")
if !isConnectionInterrupted {
return fmt.Errorf("failed to start agent upgrade to version %q: %w\n%s", endVersionInfo.Binary.Version, err, upgradeOutput)
}
}

// wait for the watcher to show up
Expand Down