Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,70 @@
assert.True(t, cpuCalled, "CPU profile collector was not called.")
mockDiagProvider.AssertExpectations(t)
}
<<<<<<< HEAD

Check failure on line 398 in internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

expected declaration, found '<<' (typecheck)
=======

func TestDiagnosticsHandlerWithEDOT(t *testing.T) {
tempAgentRoot := t.TempDir()
paths.SetTop(tempAgentRoot)
err := os.MkdirAll(path.Join(tempAgentRoot, "data"), 0755)
require.NoError(t, err)
called := false
s := NewMockServer(t, paths.DiagnosticsExtensionSocket(), &called, nil)
defer func() {
require.NoError(t, s.Shutdown(context.Background()))
}()
mockDiagProvider := newMockDiagnosticsProvider(t)
mockDiagProvider.EXPECT().DiagnosticHooks().Return([]diagnostics.Hook{hook1})
mockDiagProvider.EXPECT().PerformDiagnostics(mock.Anything, mock.Anything).Return([]runtime.ComponentUnitDiagnostic{mockUnitDiagnostic})
mockDiagProvider.EXPECT().PerformComponentDiagnostics(mock.Anything, mock.Anything).Return([]runtime.ComponentDiagnostic{mockComponentDiagnostic}, nil)

mockAcker := acker.NewMockAcker(t)

mockAcker.EXPECT().Ack(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, a fleetapi.Action) error {
require.IsType(t, new(fleetapi.ActionDiagnostics), a)
assert.NoError(t, a.(*fleetapi.ActionDiagnostics).Err)
return nil
})
mockAcker.EXPECT().Commit(mock.Anything).Return(nil)

mockUploader := NewMockUploader(t)
mockUploader.EXPECT().UploadDiagnostics(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s1, s2 string, i int64, r io.Reader) (string, error) {
expectedContent := map[string][]byte{
"components/ComponentID/mock_component_diag_file.yaml": []byte("hello: component\n"),
"components/ComponentID/UnitID/mock_unit_diag_file.yaml": []byte("hello: there\n"),
"mock_global.txt": []byte("This is a mock global diagnostic content"),
}
verifyZip(t, r, expectedContent)
return "upload-id", nil
})

testLogger, _ := loggertest.New("diagnostic-handler-test")
handler := NewDiagnostics(testLogger, tempAgentRoot, mockDiagProvider, defaultRateLimit, mockUploader)
handler.collectDiag(t.Context(), &fleetapi.ActionDiagnostics{}, mockAcker)
require.True(t, called, "expected the mock diagnostics server to be called")
}

func verifyZip(t *testing.T, reader io.Reader, expectedContent map[string][]byte) {
// Read all from io.Reader into a buffer
buf, err := io.ReadAll(reader)
require.NoErrorf(t, err, "failed to read the buffer: %v", err)

// Create a zip reader from the buffer
zr, err := zip.NewReader(bytes.NewReader(buf), int64(len(buf)))
require.NoErrorf(t, err, "got error while creating reader: %v", err)

foundContent := map[string][]byte{}
for _, f := range zr.File {
if _, ok := expectedContent[f.Name]; ok {
rc, err := f.Open()
require.NoErrorf(t, err, "failed to open the zip file at %v: %v", f.Name, err)
defer rc.Close()
content, err := io.ReadAll(rc)
require.NoErrorf(t, err, "failed to read the zip file at %v: %v", f.Name, err)
foundContent[f.Name] = content
}
}
require.Equal(t, expectedContent, foundContent)
}
>>>>>>> 7afb20069 ([edot][diagnostics] remove otel diagnostics from manager (#10415))

Check failure on line 464 in internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

illegal character U+0023 '#' (typecheck)
54 changes: 54 additions & 0 deletions internal/pkg/agent/application/actions/handlers/mock_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package handlers

import (
"encoding/json"
"net/http"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/otel/extension/elasticdiagnostics"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/ipc"
)

func NewMockServer(t *testing.T, host string, called *bool, response *elasticdiagnostics.Response) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/diagnostics", func(w http.ResponseWriter, r *http.Request) {
if called != nil {
*called = true
}
resp := elasticdiagnostics.Response{
GlobalDiagnostics: []*proto.ActionDiagnosticUnitResult{
{
Description: "Mock Global Diagnostic",
Filename: "mock_global.txt",
ContentType: "text/plain",
Content: []byte("This is a mock global diagnostic content"),
},
},
}
if response != nil {
// overwrite default response
resp = *response
}
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
})

l, err := ipc.CreateListener(logger.NewWithoutConfig(""), host)
require.NoError(t, err)
server := &http.Server{Handler: mux} //nolint:gosec // This is a test
go func() {
err := server.Serve(l)
require.ErrorIs(t, err, http.ErrServerClosed)
}()
return server
}
134 changes: 134 additions & 0 deletions internal/pkg/otel/manager/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package manager

import (
"context"
"errors"
"fmt"
"strings"
"syscall"

"github.com/elastic/elastic-agent/internal/pkg/otel"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
)

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning
// is logged.
func (m *OTelManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
var diagnostics []runtime.ComponentUnitDiagnostic
m.mx.RLock()

Check failure on line 26 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.mx undefined (type *OTelManager has no field or method mx)
currentComponents := m.components

Check failure on line 27 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.components undefined (type *OTelManager has no field or method components)
m.mx.RUnlock()

Check failure on line 28 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.mx undefined (type *OTelManager has no field or method mx)

// if no request is provided, then perform diagnostics for all units
if len(req) == 0 {
for _, comp := range currentComponents {
for _, unit := range comp.Units {
diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{
Component: comp,
Unit: unit,
})
}
}
return diagnostics
}

// create a map of unit by component and unit id, this is used to filter out units that
// do not exist in the manager
unitByID := make(map[string]map[string]*component.Unit)
for _, r := range req {
if unitByID[r.Component.ID] == nil {
unitByID[r.Component.ID] = make(map[string]*component.Unit)
}
unitByID[r.Component.ID][r.Unit.ID] = &r.Unit
}

// create empty diagnostics for units that exist in the manager
for _, existingComp := range currentComponents {
inputComp, ok := unitByID[existingComp.ID]
if !ok {
m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID)
continue
}
for _, unit := range existingComp.Units {
if _, ok := inputComp[unit.ID]; ok {
diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{
Component: existingComp,
Unit: unit,
})
} else {
m.logger.Warnf("requested diagnostics for unit %s, but it does not exist in the manager", unit.ID)
}
}
}

return diagnostics
}

// PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided,
// then it performs the diagnostics for all current components.
func (m *OTelManager) PerformComponentDiagnostics(
ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component,
) ([]runtime.ComponentDiagnostic, error) {
var diagnostics []runtime.ComponentDiagnostic
m.mx.RLock()

Check failure on line 81 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.mx undefined (type *OTelManager has no field or method mx)
currentComponents := m.components

Check failure on line 82 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.components undefined (type *OTelManager has no field or method components)
m.mx.RUnlock()

Check failure on line 83 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

m.mx undefined (type *OTelManager has no field or method mx)

// if no request is provided, then perform diagnostics for all components
if len(req) == 0 {
req = currentComponents
}

// create a map of component by id, this is used to filter out components that do not exist in the manager
compByID := make(map[string]component.Component)
for _, comp := range req {
compByID[comp.ID] = comp
}

// create empty diagnostics for components that exist in the manager
for _, existingComp := range currentComponents {
if inputComp, ok := compByID[existingComp.ID]; ok {
diagnostics = append(diagnostics, runtime.ComponentDiagnostic{
Component: inputComp,
})
} else {
m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID)
}
}

extDiagnostics, err := otel.PerformDiagnosticsExt(ctx, false)

Check failure on line 107 in internal/pkg/otel/manager/diagnostics.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

undefined: otel.PerformDiagnosticsExt) (typecheck)

// We're not running the EDOT if:
// 1. Either the socket doesn't exist
// 2. It is refusing the connections.
// Return error for any other scenario.
if err != nil {
m.logger.Debugf("Couldn't fetch diagnostics from EDOT: %v", err)
if !errors.Is(err, syscall.ENOENT) && !errors.Is(err, syscall.ECONNREFUSED) {
return nil, fmt.Errorf("error fetching otel diagnostics: %w", err)
}
}

for idx, diag := range diagnostics {
found := false
for _, extDiag := range extDiagnostics.ComponentDiagnostics {
if strings.Contains(extDiag.Name, diag.Component.ID) {
found = true
diagnostics[idx].Results = append(diagnostics[idx].Results, extDiag)
}
}
if !found {
diagnostics[idx].Err = fmt.Errorf("failed to get diagnostics for %s", diag.Component.ID)
}
}

return diagnostics, nil
}
Loading
Loading