Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
close(healthCheckDone)
}()
currentStatus := aggregateStatus(componentstatus.StatusStarting, nil)
reportCollectorStatus(ctx, statusCh, currentStatus)
r.reportSubprocessCollectorStatus(ctx, statusCh, currentStatus)

// specify a max duration of not being able to get the status from the collector
const maxFailuresDuration = 130 * time.Second
Expand All @@ -168,21 +168,21 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
if err != nil {
switch {
case errors.Is(err, context.Canceled):
reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
return
}
} else {
maxFailuresTimer.Reset(maxFailuresDuration)

removeManagedHealthCheckExtensionStatus(statuses, r.healthCheckExtensionID)
if !compareStatuses(currentStatus, statuses) {
currentStatus = statuses
reportCollectorStatus(procCtx, statusCh, statuses)
r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses)
}
}

select {
case <-procCtx.Done():
reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
return
case <-healthCheckPollTimer.C:
healthCheckPollTimer.Reset(healthCheckPollDuration)
Expand All @@ -193,7 +193,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
)
if !compareStatuses(currentStatus, failedToConnectStatuses) {
currentStatus = statuses
reportCollectorStatus(procCtx, statusCh, statuses)
r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses)
}
}
}
Expand Down Expand Up @@ -223,6 +223,29 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
return ctl, nil
}

// cloneCollectorStatus creates a deep copy of the provided AggregateStatus.
func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus {
st := &status.AggregateStatus{
Event: aStatus.Event,
}

if len(aStatus.ComponentStatusMap) > 0 {
st.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(aStatus.ComponentStatusMap))
for k, cs := range aStatus.ComponentStatusMap {
st.ComponentStatusMap[k] = cloneCollectorStatus(cs)
}
}

return st
}

func (r *subprocessExecution) reportSubprocessCollectorStatus(ctx context.Context, statusCh chan *status.AggregateStatus, collectorStatus *status.AggregateStatus) {
// we need to clone the status to prevent any mutation on the receiver side
// affecting the original ref
clonedStatus := cloneCollectorStatus(collectorStatus)
reportCollectorStatus(ctx, statusCh, clonedStatus)
}

// getCollectorPorts returns the ports used by the OTel collector. If the ports set in the execution struct are 0,
// random ports are returned instead.
func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsPort int, err error) {
Expand Down Expand Up @@ -254,6 +277,16 @@ func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsP
return healthCheckPort, metricsPort, nil
}

func removeManagedHealthCheckExtensionStatus(status *status.AggregateStatus, healthCheckExtensionID string) {
extensions, exists := status.ComponentStatusMap["extensions"]
if !exists {
return
}

extensionID := "extension:" + healthCheckExtensionID
delete(extensions.ComponentStatusMap, extensionID)
}

type procHandle struct {
processDoneCh chan struct{}
processInfo *process.Info
Expand Down
19 changes: 19 additions & 0 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"hash/fnv"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -478,6 +479,24 @@ func (m *OTelManager) MergedOtelConfig() *confmap.Conf {
// and prepares component state updates for distribution to watchers.
// Returns component state updates and any error encountered during processing.
func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus) ([]runtime.ComponentComponentState, error) {
// Remove agent managed extensions from the status report
if otelStatus != nil {
if extensionsMap, exists := otelStatus.ComponentStatusMap["extensions"]; exists {
for extensionKey := range extensionsMap.ComponentStatusMap {
switch {
case strings.HasPrefix(extensionKey, "extension:beatsauth"):
delete(extensionsMap.ComponentStatusMap, extensionKey)
case strings.HasPrefix(extensionKey, "extension:elastic_diagnostics"):
delete(extensionsMap.ComponentStatusMap, extensionKey)
}
}

if len(extensionsMap.ComponentStatusMap) == 0 {
delete(otelStatus.ComponentStatusMap, "extensions")
}
}
}

// Extract component states from otel status
componentStates, err := translate.GetAllComponentStates(otelStatus, m.components)
if err != nil {
Expand Down
79 changes: 64 additions & 15 deletions internal/pkg/otel/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/version"

"github.com/gofrs/uuid/v5"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelComponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/confmap"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/version"

"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
Expand Down Expand Up @@ -376,8 +377,9 @@ func TestOTelManager_Run(t *testing.T) {

// stop it, this should be restarted by the manager
updateTime = time.Now()
require.NotNil(t, exec.handle, "execModeFn handle should not be nil")
exec.handle.Stop(waitTimeForStop)
execHandle := exec.getProcessHandle()
require.NotNil(t, execHandle, "execModeFn handle should not be nil")
execHandle.Stop(waitTimeForStop)
e.EnsureHealthy(t, updateTime)

// no configuration should stop the runner
Expand All @@ -402,10 +404,11 @@ func TestOTelManager_Run(t *testing.T) {

// stop it, this should be restarted by the manager
updateTime = time.Now()
require.NotNil(t, exec.handle, "execModeFn handle should not be nil")
exec.handle.Stop(waitTimeForStop)
execHandle := exec.getProcessHandle()
require.NotNil(t, execHandle, "execModeFn handle should not be nil")
execHandle.Stop(waitTimeForStop)
e.EnsureHealthy(t, updateTime)
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0")

// no configuration should stop the runner
updateTime = time.Now()
Expand All @@ -427,7 +430,7 @@ func TestOTelManager_Run(t *testing.T) {
updateTime := time.Now()
m.Update(cfg, nil)
e.EnsureHealthy(t, updateTime)
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0")

var oldPHandle *procHandle
// repeatedly kill the collector
Expand All @@ -445,7 +448,7 @@ func TestOTelManager_Run(t *testing.T) {
// the collector should restart and report healthy
updateTime = time.Now()
e.EnsureHealthy(t, updateTime)
assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0")
}

seenRecoveredTimes := m.recoveryRetries.Load()
Expand Down Expand Up @@ -600,6 +603,41 @@ func TestOTelManager_Run(t *testing.T) {
}
},
},
{
name: "subprocess user has healthcheck extension",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {

subprocessExec, ok := exec.exec.(*subprocessExecution)
require.True(t, ok, "execution mode isn't subprocess")

cfg := confmap.NewFromStringMap(testConfig)

nsUUID, err := uuid.NewV4()
require.NoError(t, err, "failed to create a uuid")

componentType, err := otelComponent.NewType(healthCheckExtensionName)
require.NoError(t, err, "failed to create component type")

healthCheckExtensionID := otelComponent.NewIDWithName(componentType, nsUUID.String()).String()

ports, err := findRandomTCPPorts(3)
require.NoError(t, err, "failed to find random tcp ports")
subprocessExec.collectorHealthCheckPort = ports[0]
subprocessExec.collectorMetricsPort = ports[1]
err = injectHeathCheckV2Extension(cfg, healthCheckExtensionID, ports[2])
require.NoError(t, err, "failed to inject user health extension")

updateTime := time.Now()
m.Update(cfg, nil)
e.EnsureHealthy(t, updateTime)

assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1")
},
},
{
name: "embedded collector invalid config",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
Expand Down Expand Up @@ -1198,6 +1236,17 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) {
"pipeline:logs": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
"extensions": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
ComponentStatusMap: map[string]*status.AggregateStatus{
"extension:beatsauth/test": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
"extension:elastic_diagnostics/test": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
},
},
},
},
expectedCollectorStatus: &status.AggregateStatus{
Expand Down
8 changes: 3 additions & 5 deletions internal/pkg/remote/client_fips_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,9 @@ func TestClientWithCertificate(t *testing.T) {
require.Contains(t, err.Error(), test.expectedHandshakeErr)
}

require.Eventually(
t,
func() bool {
return assert.Contains(t, serverLog.String(), test.expectedServerLog)
},
require.EventuallyWithT(t, func(c *assert.CollectT) {
require.Contains(c, serverLog.String(), test.expectedServerLog)
},
100*time.Millisecond, 10*time.Millisecond,
)
})
Expand Down