Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions NOTICE-fips.txt

Large diffs are not rendered by default.

424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ require (
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.135.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.135.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.135.0
go.opentelemetry.io/collector/service v0.135.0
go.opentelemetry.io/ebpf-profiler v0.0.202536
go.uber.org/zap v1.27.0
go.yaml.in/yaml/v3 v3.0.4
Expand Down Expand Up @@ -722,6 +721,7 @@ require (
go.opentelemetry.io/collector/scraper v0.135.0 // indirect
go.opentelemetry.io/collector/scraper/scraperhelper v0.135.0 // indirect
go.opentelemetry.io/collector/semconv v0.128.1-0.20250610090210-188191247685 // indirect
go.opentelemetry.io/collector/service v0.135.0 // indirect
go.opentelemetry.io/collector/service/hostcapabilities v0.135.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect
Expand Down
19 changes: 17 additions & 2 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ func New(
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo, isOtelExecModeSubprocess)
monitor := componentmonitoring.New(
isMonitoringSupported,
cfg.Settings.DownloadConfig.OS(),
cfg.Settings.MonitoringConfig,
agentInfo,
isOtelExecModeSubprocess,
)

runtime, err := runtime.NewManager(
log,
Expand Down Expand Up @@ -245,7 +251,16 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
otelManager, err := otelmanager.NewOTelManager(
log.Named("otel_manager"),
logLevel, baseLogger,
otelExecMode,
agentInfo,
0, // TODO: make this configurable in a follow-up
0, // TODO: make this configurable in a follow-up
monitor.ComponentMonitoringConfig,
cfg.Settings.ProcessConfig.StopTimeout,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
47 changes: 11 additions & 36 deletions internal/pkg/agent/application/monitoring/component/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
"time"
"unicode"

"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/service"
koanfmaps "github.com/knadh/koanf/maps"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"

koanfmaps "github.com/knadh/koanf/maps"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
Expand All @@ -38,6 +35,12 @@ import (
)

const (
// OtelCollectorMetricsPortEnvVarName is the name of the environment variable used to pass the collector metrics
// port to the managed EDOT collector. It exists because by default we use a random port for this, and we want to
// determine it as late as possible. However, the monitoring manager is instantiated early in the application
// startup process, so instead we rely on this variable. The OTel manager is required to set it whenever it starts
// a collector.
OtelCollectorMetricsPortEnvVarName = "EDOT_COLLECTOR_METRICS_PORT"
// args: data path, pipeline name, application name
logFileFormat = "%s/logs/%s"
// args: data path, install path, pipeline name, application name
Expand Down Expand Up @@ -93,7 +96,6 @@ var (
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
otelConfig *confmap.Conf
operatingSystem string
agentInfo info.Agent
isOtelRuntimeSubprocess bool
Expand All @@ -115,13 +117,12 @@ type monitoringConfig struct {
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
otelConfig: otelCfg,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
Expand Down Expand Up @@ -149,7 +150,6 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error {
}

b.config = &newConfig
b.otelConfig = rawConfig.OTel
return nil
}

Expand Down Expand Up @@ -519,34 +519,9 @@ func (b *BeatsMonitor) monitoringNamespace() string {
}

func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
if b.otelConfig != nil {
if serviceConfig, err := b.otelConfig.Sub("service"); err == nil {
var service service.Config
if serviceConfig.Unmarshal(&service, confmap.WithIgnoreUnused()) == nil {
for _, reader := range service.Telemetry.Metrics.Readers {
if reader.Pull == nil || reader.Pull.Exporter.Prometheus == nil {
continue
}
prometheus := *reader.Pull.Exporter.Prometheus
host := "localhost"
port := 8888

if prometheus.Host != nil {
host = *prometheus.Host
}
if prometheus.Port != nil {
port = *prometheus.Port
}
if prometheus.Host != nil || prometheus.Port != nil {
return host + ":" + strconv.Itoa(port)
}
}
}
}
}

// If there is no explicit configuration, the collector publishes its telemetry on port 8888.
return "localhost:8888"
// The OTel manager is required to set the environment variable. See comment at the constant definition for more
// information.
return fmt.Sprintf("localhost:${env:%s}", OtelCollectorMetricsPortEnvVarName)
}

// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
monitorcfg.MonitorLogs = false
monitorcfg.MonitorMetrics = false

beatsMonitor := New(true, "", monitorcfg, nil, nil, false)
beatsMonitor := New(true, "", monitorcfg, nil, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
}
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, otelCfg, agentInfo, isOtelExecModeSubprocess)
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
return monitor.MonitoringConfig, nil
}

Expand Down
38 changes: 24 additions & 14 deletions internal/pkg/otel/manager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package manager

import (
"context"
"errors"
"fmt"
"net"

Expand Down Expand Up @@ -52,21 +53,30 @@ func reportCollectorStatus(ctx context.Context, statusCh chan *status.AggregateS
}
}

// findRandomTCPPort finds a random available TCP port on the localhost interface.
func findRandomTCPPort() (int, error) {
l, err := netListen("tcp", "localhost:0")
if err != nil {
return 0, err
}
// findRandomTCPPorts finds count random available TCP ports on the localhost interface.
func findRandomTCPPorts(count int) (ports []int, err error) {
ports = make([]int, 0, count)
listeners := make([]net.Listener, 0, count)
defer func() {
for _, listener := range listeners {
if closeErr := listener.Close(); closeErr != nil {
err = errors.Join(err, fmt.Errorf("error closing listener: %w", closeErr))
}
}
}()
for range count {
l, err := netListen("tcp", "localhost:0")
if err != nil {
return nil, err
}
listeners = append(listeners, l)

port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
if err != nil {
return 0, err
}
if port == 0 {
return 0, fmt.Errorf("failed to find random port")
port := l.Addr().(*net.TCPAddr).Port
if port == 0 {
return nil, fmt.Errorf("failed to find random port")
}
ports = append(ports, port)
}

return port, nil
return ports, err
}
17 changes: 13 additions & 4 deletions internal/pkg/otel/manager/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"errors"
"net"
"path/filepath"
"slices"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"
Expand All @@ -18,9 +21,15 @@ import (
)

func TestFindRandomPort(t *testing.T) {
port, err := findRandomTCPPort()
portCount := 2
ports, err := findRandomTCPPorts(portCount)
require.NoError(t, err)
require.NotEqual(t, 0, port)
require.Len(t, ports, portCount)
for _, port := range ports {
assert.NotEqual(t, 0, port)
}
slices.Sort(ports)
require.Len(t, slices.Compact(ports), portCount, "returned ports should be unique")

defer func() {
netListen = net.Listen
Expand All @@ -29,8 +38,8 @@ func TestFindRandomPort(t *testing.T) {
netListen = func(string, string) (net.Listener, error) {
return nil, errors.New("some error")
}
_, err = findRandomTCPPort()
require.Error(t, err, "failed to find random port")
_, err = findRandomTCPPorts(portCount)
assert.Error(t, err, "failed to find random port")
}

func testComponent(componentId string) component.Component {
Expand Down
43 changes: 41 additions & 2 deletions internal/pkg/otel/manager/execution_embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package manager

import (
"context"
"os"
"strconv"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand All @@ -14,6 +16,8 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/otel"
"github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider"
Expand All @@ -22,11 +26,14 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

func newExecutionEmbedded() *embeddedExecution {
return &embeddedExecution{}
// newExecutionEmbedded creates a new execution which runs the otel collector in a goroutine. A metricsPort of 0 will
// result in a random port being used.
func newExecutionEmbedded(metricsPort int) *embeddedExecution {
return &embeddedExecution{collectorMetricsPort: metricsPort}
}

type embeddedExecution struct {
collectorMetricsPort int
}

// startCollector starts the collector in a new goroutine.
Expand All @@ -41,6 +48,10 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
extConf := map[string]any{
"endpoint": paths.DiagnosticsExtensionSocket(),
}
collectorMetricsPort, err := r.getCollectorMetricsPort()
if err != nil {
return nil, err
}
// NewForceExtensionConverterFactory is used to ensure that the agent_status extension is always enabled.
// It is required for the Elastic Agent to extract the status out of the OTel collector.
settings := otel.NewSettings(
Expand All @@ -59,13 +70,41 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
return nil, err
}
go func() {
// Set the environment variable for the collector metrics port. See comment at the constant definition for more information.
setErr := os.Setenv(componentmonitoring.OtelCollectorMetricsPortEnvVarName, strconv.Itoa(collectorMetricsPort))
defer func() {
unsetErr := os.Unsetenv(componentmonitoring.OtelCollectorMetricsPortEnvVarName)
if unsetErr != nil {
logger.Errorf("couldn't unset environment variable %s: %v", componentmonitoring.OtelCollectorMetricsPortEnvVarName, unsetErr)
}
}()
if setErr != nil {
reportErr(ctx, errCh, setErr)
return
}
runErr := svc.Run(collectorCtx)
close(ctl.collectorDoneCh)
reportErr(ctx, errCh, runErr)
}()
return ctl, nil
}

// getCollectorPorts returns the metrics port used by the OTel collector. If the port set in the execution struct is 0,
// a random port is returned instead.
func (r *embeddedExecution) getCollectorMetricsPort() (metricsPort int, err error) {
// if the port is defined (non-zero), use it
if r.collectorMetricsPort > 0 {
return r.collectorMetricsPort, nil
}

// get a random port
ports, err := findRandomTCPPorts(1)
if err != nil {
return 0, err
}
return ports[0], nil
}

type ctxHandle struct {
collectorDoneCh chan struct{}
cancel context.CancelFunc
Expand Down
Loading