diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index c861ecfe937..b567cc2b122 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -152,7 +152,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger close(healthCheckDone) }() currentStatus := aggregateStatus(componentstatus.StatusStarting, nil) - reportCollectorStatus(ctx, statusCh, currentStatus) + r.reportSubprocessCollectorStatus(ctx, statusCh, currentStatus) // specify a max duration of not being able to get the status from the collector const maxFailuresDuration = 130 * time.Second @@ -168,21 +168,21 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger if err != nil { switch { case errors.Is(err, context.Canceled): - reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) return } } else { maxFailuresTimer.Reset(maxFailuresDuration) - + removeManagedHealthCheckExtensionStatus(statuses, r.healthCheckExtensionID) if !compareStatuses(currentStatus, statuses) { currentStatus = statuses - reportCollectorStatus(procCtx, statusCh, statuses) + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) } } select { case <-procCtx.Done(): - reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) return case <-healthCheckPollTimer.C: healthCheckPollTimer.Reset(healthCheckPollDuration) @@ -193,7 +193,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger ) if !compareStatuses(currentStatus, failedToConnectStatuses) { currentStatus = statuses - reportCollectorStatus(procCtx, statusCh, statuses) + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) } } } @@ -223,6 +223,29 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger return ctl, nil } +// cloneCollectorStatus creates a deep copy of the provided AggregateStatus. +func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { + st := &status.AggregateStatus{ + Event: aStatus.Event, + } + + if len(aStatus.ComponentStatusMap) > 0 { + st.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(aStatus.ComponentStatusMap)) + for k, cs := range aStatus.ComponentStatusMap { + st.ComponentStatusMap[k] = cloneCollectorStatus(cs) + } + } + + return st +} + +func (r *subprocessExecution) reportSubprocessCollectorStatus(ctx context.Context, statusCh chan *status.AggregateStatus, collectorStatus *status.AggregateStatus) { + // we need to clone the status to prevent any mutation on the receiver side + // affecting the original ref + clonedStatus := cloneCollectorStatus(collectorStatus) + reportCollectorStatus(ctx, statusCh, clonedStatus) +} + // getCollectorPorts returns the ports used by the OTel collector. If the ports set in the execution struct are 0, // random ports are returned instead. func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsPort int, err error) { @@ -254,6 +277,16 @@ func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsP return healthCheckPort, metricsPort, nil } +func removeManagedHealthCheckExtensionStatus(status *status.AggregateStatus, healthCheckExtensionID string) { + extensions, exists := status.ComponentStatusMap["extensions"] + if !exists { + return + } + + extensionID := "extension:" + healthCheckExtensionID + delete(extensions.ComponentStatusMap, extensionID) +} + type procHandle struct { processDoneCh chan struct{} processInfo *process.Info diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index c4fc7b1b807..2b5622ba761 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -12,6 +12,7 @@ import ( "fmt" "hash/fnv" "os" + "strings" "sync" "sync/atomic" "time" @@ -478,6 +479,24 @@ func (m *OTelManager) MergedOtelConfig() *confmap.Conf { // and prepares component state updates for distribution to watchers. // Returns component state updates and any error encountered during processing. func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus) ([]runtime.ComponentComponentState, error) { + // Remove agent managed extensions from the status report + if otelStatus != nil { + if extensionsMap, exists := otelStatus.ComponentStatusMap["extensions"]; exists { + for extensionKey := range extensionsMap.ComponentStatusMap { + switch { + case strings.HasPrefix(extensionKey, "extension:beatsauth"): + delete(extensionsMap.ComponentStatusMap, extensionKey) + case strings.HasPrefix(extensionKey, "extension:elastic_diagnostics"): + delete(extensionsMap.ComponentStatusMap, extensionKey) + } + } + + if len(extensionsMap.ComponentStatusMap) == 0 { + delete(otelStatus.ComponentStatusMap, "extensions") + } + } + } + // Extract component states from otel status componentStates, err := translate.GetAllComponentStates(otelStatus, m.components) if err != nil { diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index bf97167cd23..bf9f0f8197a 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -19,23 +19,24 @@ import ( "testing" "time" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" - componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" - "github.com/elastic/elastic-agent/internal/pkg/otel/translate" - "github.com/elastic/elastic-agent/pkg/component" - "github.com/elastic/elastic-agent/pkg/component/runtime" - "github.com/elastic/elastic-agent/version" - + "github.com/gofrs/uuid/v5" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + otelComponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/version" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -376,8 +377,9 @@ func TestOTelManager_Run(t *testing.T) { // stop it, this should be restarted by the manager updateTime = time.Now() - require.NotNil(t, exec.handle, "execModeFn handle should not be nil") - exec.handle.Stop(waitTimeForStop) + execHandle := exec.getProcessHandle() + require.NotNil(t, execHandle, "execModeFn handle should not be nil") + execHandle.Stop(waitTimeForStop) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner @@ -402,10 +404,11 @@ func TestOTelManager_Run(t *testing.T) { // stop it, this should be restarted by the manager updateTime = time.Now() - require.NotNil(t, exec.handle, "execModeFn handle should not be nil") - exec.handle.Stop(waitTimeForStop) + execHandle := exec.getProcessHandle() + require.NotNil(t, execHandle, "execModeFn handle should not be nil") + execHandle.Stop(waitTimeForStop) e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") // no configuration should stop the runner updateTime = time.Now() @@ -427,7 +430,7 @@ func TestOTelManager_Run(t *testing.T) { updateTime := time.Now() m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") var oldPHandle *procHandle // repeatedly kill the collector @@ -445,7 +448,7 @@ func TestOTelManager_Run(t *testing.T) { // the collector should restart and report healthy updateTime = time.Now() e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") } seenRecoveredTimes := m.recoveryRetries.Load() @@ -600,6 +603,41 @@ func TestOTelManager_Run(t *testing.T) { } }, }, + { + name: "subprocess user has healthcheck extension", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + + subprocessExec, ok := exec.exec.(*subprocessExecution) + require.True(t, ok, "execution mode isn't subprocess") + + cfg := confmap.NewFromStringMap(testConfig) + + nsUUID, err := uuid.NewV4() + require.NoError(t, err, "failed to create a uuid") + + componentType, err := otelComponent.NewType(healthCheckExtensionName) + require.NoError(t, err, "failed to create component type") + + healthCheckExtensionID := otelComponent.NewIDWithName(componentType, nsUUID.String()).String() + + ports, err := findRandomTCPPorts(3) + require.NoError(t, err, "failed to find random tcp ports") + subprocessExec.collectorHealthCheckPort = ports[0] + subprocessExec.collectorMetricsPort = ports[1] + err = injectHeathCheckV2Extension(cfg, healthCheckExtensionID, ports[2]) + require.NoError(t, err, "failed to inject user health extension") + + updateTime := time.Now() + m.Update(cfg, nil) + e.EnsureHealthy(t, updateTime) + + assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") + }, + }, { name: "embedded collector invalid config", execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { @@ -1198,6 +1236,17 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { "pipeline:logs": { Event: componentstatus.NewEvent(componentstatus.StatusOK), }, + "extensions": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "extension:beatsauth/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "extension:elastic_diagnostics/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, }, }, expectedCollectorStatus: &status.AggregateStatus{ diff --git a/internal/pkg/remote/client_fips_test.go b/internal/pkg/remote/client_fips_test.go index ffb14a7b4e3..c0fe98219e0 100644 --- a/internal/pkg/remote/client_fips_test.go +++ b/internal/pkg/remote/client_fips_test.go @@ -186,11 +186,9 @@ func TestClientWithCertificate(t *testing.T) { require.Contains(t, err.Error(), test.expectedHandshakeErr) } - require.Eventually( - t, - func() bool { - return assert.Contains(t, serverLog.String(), test.expectedServerLog) - }, + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Contains(c, serverLog.String(), test.expectedServerLog) + }, 100*time.Millisecond, 10*time.Millisecond, ) })