Skip to content

Commit f7bfc03

Browse files
VihasMakwanamergify[bot]
authored andcommitted
[edot][diagnostics] remove otel diagnostics from manager (#10415)
* chore: remove otel diagnostics from manager * test * more testing and refactor * fix tests * refactor (cherry picked from commit 7afb200) # Conflicts: # internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go # internal/pkg/agent/application/actions/handlers/mock_server.go # internal/pkg/otel/manager/diagnostics.go # internal/pkg/otel/manager/diagnostics_test.go # testing/integration/ess/diagnostics_test.go
1 parent 04e7fa4 commit f7bfc03

File tree

5 files changed

+709
-0
lines changed

5 files changed

+709
-0
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,3 +395,70 @@ func TestDiagnosticHandlerWithCPUProfile(t *testing.T) {
395395
assert.True(t, cpuCalled, "CPU profile collector was not called.")
396396
mockDiagProvider.AssertExpectations(t)
397397
}
398+
<<<<<<< HEAD
399+
=======
400+
401+
func TestDiagnosticsHandlerWithEDOT(t *testing.T) {
402+
tempAgentRoot := t.TempDir()
403+
paths.SetTop(tempAgentRoot)
404+
err := os.MkdirAll(path.Join(tempAgentRoot, "data"), 0755)
405+
require.NoError(t, err)
406+
called := false
407+
s := NewMockServer(t, paths.DiagnosticsExtensionSocket(), &called, nil)
408+
defer func() {
409+
require.NoError(t, s.Shutdown(context.Background()))
410+
}()
411+
mockDiagProvider := newMockDiagnosticsProvider(t)
412+
mockDiagProvider.EXPECT().DiagnosticHooks().Return([]diagnostics.Hook{hook1})
413+
mockDiagProvider.EXPECT().PerformDiagnostics(mock.Anything, mock.Anything).Return([]runtime.ComponentUnitDiagnostic{mockUnitDiagnostic})
414+
mockDiagProvider.EXPECT().PerformComponentDiagnostics(mock.Anything, mock.Anything).Return([]runtime.ComponentDiagnostic{mockComponentDiagnostic}, nil)
415+
416+
mockAcker := acker.NewMockAcker(t)
417+
418+
mockAcker.EXPECT().Ack(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, a fleetapi.Action) error {
419+
require.IsType(t, new(fleetapi.ActionDiagnostics), a)
420+
assert.NoError(t, a.(*fleetapi.ActionDiagnostics).Err)
421+
return nil
422+
})
423+
mockAcker.EXPECT().Commit(mock.Anything).Return(nil)
424+
425+
mockUploader := NewMockUploader(t)
426+
mockUploader.EXPECT().UploadDiagnostics(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s1, s2 string, i int64, r io.Reader) (string, error) {
427+
expectedContent := map[string][]byte{
428+
"components/ComponentID/mock_component_diag_file.yaml": []byte("hello: component\n"),
429+
"components/ComponentID/UnitID/mock_unit_diag_file.yaml": []byte("hello: there\n"),
430+
"mock_global.txt": []byte("This is a mock global diagnostic content"),
431+
}
432+
verifyZip(t, r, expectedContent)
433+
return "upload-id", nil
434+
})
435+
436+
testLogger, _ := loggertest.New("diagnostic-handler-test")
437+
handler := NewDiagnostics(testLogger, tempAgentRoot, mockDiagProvider, defaultRateLimit, mockUploader)
438+
handler.collectDiag(t.Context(), &fleetapi.ActionDiagnostics{}, mockAcker)
439+
require.True(t, called, "expected the mock diagnostics server to be called")
440+
}
441+
442+
func verifyZip(t *testing.T, reader io.Reader, expectedContent map[string][]byte) {
443+
// Read all from io.Reader into a buffer
444+
buf, err := io.ReadAll(reader)
445+
require.NoErrorf(t, err, "failed to read the buffer: %v", err)
446+
447+
// Create a zip reader from the buffer
448+
zr, err := zip.NewReader(bytes.NewReader(buf), int64(len(buf)))
449+
require.NoErrorf(t, err, "got error while creating reader: %v", err)
450+
451+
foundContent := map[string][]byte{}
452+
for _, f := range zr.File {
453+
if _, ok := expectedContent[f.Name]; ok {
454+
rc, err := f.Open()
455+
require.NoErrorf(t, err, "failed to open the zip file at %v: %v", f.Name, err)
456+
defer rc.Close()
457+
content, err := io.ReadAll(rc)
458+
require.NoErrorf(t, err, "failed to read the zip file at %v: %v", f.Name, err)
459+
foundContent[f.Name] = content
460+
}
461+
}
462+
require.Equal(t, expectedContent, foundContent)
463+
}
464+
>>>>>>> 7afb20069 ([edot][diagnostics] remove otel diagnostics from manager (#10415))
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package handlers
6+
7+
import (
8+
"encoding/json"
9+
"net/http"
10+
"testing"
11+
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
15+
"github.com/elastic/elastic-agent/internal/pkg/otel/extension/elasticdiagnostics"
16+
"github.com/elastic/elastic-agent/pkg/core/logger"
17+
"github.com/elastic/elastic-agent/pkg/ipc"
18+
)
19+
20+
func NewMockServer(t *testing.T, host string, called *bool, response *elasticdiagnostics.Response) *http.Server {
21+
mux := http.NewServeMux()
22+
mux.HandleFunc("/diagnostics", func(w http.ResponseWriter, r *http.Request) {
23+
if called != nil {
24+
*called = true
25+
}
26+
resp := elasticdiagnostics.Response{
27+
GlobalDiagnostics: []*proto.ActionDiagnosticUnitResult{
28+
{
29+
Description: "Mock Global Diagnostic",
30+
Filename: "mock_global.txt",
31+
ContentType: "text/plain",
32+
Content: []byte("This is a mock global diagnostic content"),
33+
},
34+
},
35+
}
36+
if response != nil {
37+
// overwrite default response
38+
resp = *response
39+
}
40+
err := json.NewEncoder(w).Encode(resp)
41+
require.NoError(t, err)
42+
w.Header().Set("Content-Type", "application/json")
43+
w.WriteHeader(http.StatusOK)
44+
})
45+
46+
l, err := ipc.CreateListener(logger.NewWithoutConfig(""), host)
47+
require.NoError(t, err)
48+
server := &http.Server{Handler: mux} //nolint:gosec // This is a test
49+
go func() {
50+
err := server.Serve(l)
51+
require.ErrorIs(t, err, http.ErrServerClosed)
52+
}()
53+
return server
54+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package manager
6+
7+
import (
8+
"context"
9+
"errors"
10+
"fmt"
11+
"strings"
12+
"syscall"
13+
14+
"github.com/elastic/elastic-agent/internal/pkg/otel"
15+
16+
"github.com/elastic/elastic-agent/pkg/component"
17+
"github.com/elastic/elastic-agent/pkg/component/runtime"
18+
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
19+
)
20+
21+
// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
22+
// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning
23+
// is logged.
24+
func (m *OTelManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
25+
var diagnostics []runtime.ComponentUnitDiagnostic
26+
m.mx.RLock()
27+
currentComponents := m.components
28+
m.mx.RUnlock()
29+
30+
// if no request is provided, then perform diagnostics for all units
31+
if len(req) == 0 {
32+
for _, comp := range currentComponents {
33+
for _, unit := range comp.Units {
34+
diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{
35+
Component: comp,
36+
Unit: unit,
37+
})
38+
}
39+
}
40+
return diagnostics
41+
}
42+
43+
// create a map of unit by component and unit id, this is used to filter out units that
44+
// do not exist in the manager
45+
unitByID := make(map[string]map[string]*component.Unit)
46+
for _, r := range req {
47+
if unitByID[r.Component.ID] == nil {
48+
unitByID[r.Component.ID] = make(map[string]*component.Unit)
49+
}
50+
unitByID[r.Component.ID][r.Unit.ID] = &r.Unit
51+
}
52+
53+
// create empty diagnostics for units that exist in the manager
54+
for _, existingComp := range currentComponents {
55+
inputComp, ok := unitByID[existingComp.ID]
56+
if !ok {
57+
m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID)
58+
continue
59+
}
60+
for _, unit := range existingComp.Units {
61+
if _, ok := inputComp[unit.ID]; ok {
62+
diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{
63+
Component: existingComp,
64+
Unit: unit,
65+
})
66+
} else {
67+
m.logger.Warnf("requested diagnostics for unit %s, but it does not exist in the manager", unit.ID)
68+
}
69+
}
70+
}
71+
72+
return diagnostics
73+
}
74+
75+
// PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided,
76+
// then it performs the diagnostics for all current components.
77+
func (m *OTelManager) PerformComponentDiagnostics(
78+
ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component,
79+
) ([]runtime.ComponentDiagnostic, error) {
80+
var diagnostics []runtime.ComponentDiagnostic
81+
m.mx.RLock()
82+
currentComponents := m.components
83+
m.mx.RUnlock()
84+
85+
// if no request is provided, then perform diagnostics for all components
86+
if len(req) == 0 {
87+
req = currentComponents
88+
}
89+
90+
// create a map of component by id, this is used to filter out components that do not exist in the manager
91+
compByID := make(map[string]component.Component)
92+
for _, comp := range req {
93+
compByID[comp.ID] = comp
94+
}
95+
96+
// create empty diagnostics for components that exist in the manager
97+
for _, existingComp := range currentComponents {
98+
if inputComp, ok := compByID[existingComp.ID]; ok {
99+
diagnostics = append(diagnostics, runtime.ComponentDiagnostic{
100+
Component: inputComp,
101+
})
102+
} else {
103+
m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID)
104+
}
105+
}
106+
107+
extDiagnostics, err := otel.PerformDiagnosticsExt(ctx, false)
108+
109+
// We're not running the EDOT if:
110+
// 1. Either the socket doesn't exist
111+
// 2. It is refusing the connections.
112+
// Return error for any other scenario.
113+
if err != nil {
114+
m.logger.Debugf("Couldn't fetch diagnostics from EDOT: %v", err)
115+
if !errors.Is(err, syscall.ENOENT) && !errors.Is(err, syscall.ECONNREFUSED) {
116+
return nil, fmt.Errorf("error fetching otel diagnostics: %w", err)
117+
}
118+
}
119+
120+
for idx, diag := range diagnostics {
121+
found := false
122+
for _, extDiag := range extDiagnostics.ComponentDiagnostics {
123+
if strings.Contains(extDiag.Name, diag.Component.ID) {
124+
found = true
125+
diagnostics[idx].Results = append(diagnostics[idx].Results, extDiag)
126+
}
127+
}
128+
if !found {
129+
diagnostics[idx].Err = fmt.Errorf("failed to get diagnostics for %s", diag.Component.ID)
130+
}
131+
}
132+
133+
return diagnostics, nil
134+
}

0 commit comments

Comments
 (0)