Skip to content

Commit 3e80641

Browse files
feat: utilise continue_on_err in beatsauthextension (#10343)
* feat: rework elasticsearch output translation to otel config to exclude validation errors * ci: add integration test (cherry picked from commit 0c0dada) # Conflicts: # internal/pkg/otel/translate/otelconfig.go
1 parent 4f6c242 commit 3e80641

File tree

5 files changed

+786
-4
lines changed

5 files changed

+786
-4
lines changed

internal/pkg/otel/translate/otelconfig.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
"slices"
1212
"strings"
1313

14+
<<<<<<< HEAD
1415
"github.com/elastic/elastic-agent-libs/logp"
1516

17+
=======
18+
"github.com/go-viper/mapstructure/v2"
19+
>>>>>>> 0c0dada00 (feat: utilise continue_on_err in beatsauthextension (#10343))
1620
koanfmaps "github.com/knadh/koanf/maps"
1721

1822
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
@@ -22,7 +26,6 @@ import (
2226
"go.opentelemetry.io/collector/pipeline"
2327
"golang.org/x/exp/maps"
2428

25-
elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch"
2629
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
2730
"github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver"
2831
"github.com/elastic/beats/v7/x-pack/libbeat/management"
@@ -500,7 +503,7 @@ func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, er
500503

501504
// translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration.
502505
func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string]any, error) {
503-
esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logger)
506+
esConfig, err := ToOTelConfig(cfg, logger)
504507
if err != nil {
505508
return nil, err
506509
}
@@ -520,13 +523,28 @@ func BeatDataPath(componentId string) string {
520523

521524
// getBeatsAuthExtensionConfig sets http transport settings on beatsauth
522525
// currently this is only supported for elasticsearch output
523-
func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) {
526+
func getBeatsAuthExtensionConfig(outputCfg *config.C) (map[string]any, error) {
524527
defaultTransportSettings := elasticsearch.ESDefaultTransportSettings()
525-
err := cfg.Unpack(&defaultTransportSettings)
528+
529+
var resultMap map[string]any
530+
if err := outputCfg.Unpack(&resultMap); err != nil {
531+
return nil, err
532+
}
533+
534+
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
535+
Result: &defaultTransportSettings,
536+
TagName: "config",
537+
SquashTagOption: "inline",
538+
DecodeHook: cfgDecodeHookFunc(),
539+
})
526540
if err != nil {
527541
return nil, err
528542
}
529543

544+
if err = decoder.Decode(&resultMap); err != nil {
545+
return nil, err
546+
}
547+
530548
newConfig, err := config.NewConfigFrom(defaultTransportSettings)
531549
if err != nil {
532550
return nil, err
@@ -538,5 +556,9 @@ func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) {
538556
return nil, err
539557
}
540558

559+
// required to make the extension not cause the collector to fail and exit
560+
// on startup
561+
newMap["continue_on_error"] = true
562+
541563
return newMap, nil
542564
}

internal/pkg/otel/translate/otelconfig_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func TestGetOtelConfig(t *testing.T) {
236236

237237
expectedExtensionConfig := func(extra ...extraParams) map[string]any {
238238
finalOutput := map[string]any{
239+
"continue_on_error": true,
239240
"idle_connection_timeout": "3s",
240241
"proxy_disable": false,
241242
"ssl": map[string]interface{}{
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package translate
6+
7+
import (
8+
"encoding/base64"
9+
"errors"
10+
"fmt"
11+
"reflect"
12+
"strings"
13+
"time"
14+
15+
"github.com/go-viper/mapstructure/v2"
16+
17+
"github.com/elastic/beats/v7/libbeat/common"
18+
"github.com/elastic/beats/v7/libbeat/outputs"
19+
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
20+
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
21+
"github.com/elastic/elastic-agent-libs/config"
22+
"github.com/elastic/elastic-agent-libs/logp"
23+
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
24+
)
25+
26+
type esToOTelOptions struct {
27+
elasticsearch.ElasticsearchConfig `config:",inline"`
28+
outputs.HostWorkerCfg `config:",inline"`
29+
30+
Index string `config:"index"`
31+
Pipeline string `config:"pipeline"`
32+
Preset string `config:"preset"`
33+
}
34+
35+
var defaultOptions = esToOTelOptions{
36+
ElasticsearchConfig: elasticsearch.DefaultConfig(),
37+
38+
Index: "", // Dynamic routing is disabled if index is set
39+
Pipeline: "",
40+
Preset: "custom", // default is custom if not set
41+
HostWorkerCfg: outputs.HostWorkerCfg{
42+
Workers: 1,
43+
},
44+
}
45+
46+
// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
47+
// Ensure cloudid is handled before calling this method
48+
// Note: This method may override output queue settings defined by user.
49+
func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) {
50+
escfg := defaultOptions
51+
52+
// check for unsupported config
53+
err := checkUnsupportedConfig(output, logger)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
// apply preset here
59+
// It is important to apply preset before unpacking the config, as preset can override output fields
60+
preset, err := output.String("preset", -1)
61+
if err == nil {
62+
// Performance preset is present, apply it and log any fields that
63+
// were overridden
64+
overriddenFields, presetConfig, err := elasticsearch.ApplyPreset(preset, output)
65+
if err != nil {
66+
return nil, err
67+
}
68+
logger.Infof("Applying performance preset '%v': %v",
69+
preset, config.DebugString(presetConfig, false))
70+
logger.Warnf("Performance preset '%v' overrides user setting for field(s): %s",
71+
preset, strings.Join(overriddenFields, ","))
72+
}
73+
74+
unpackedMap := make(map[string]any)
75+
// unpack and validate ES config
76+
if err := output.Unpack(&unpackedMap); err != nil {
77+
return nil, fmt.Errorf("failed unpacking config. %w", err)
78+
}
79+
80+
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
81+
Result: &escfg,
82+
TagName: "config",
83+
SquashTagOption: "inline",
84+
DecodeHook: cfgDecodeHookFunc(),
85+
})
86+
if err != nil {
87+
return nil, fmt.Errorf("failed creating decoder. %w", err)
88+
}
89+
90+
err = decoder.Decode(&unpackedMap)
91+
if err != nil {
92+
return nil, fmt.Errorf("failed decoding config. %w", err)
93+
}
94+
95+
if err := escfg.Validate(); err != nil {
96+
return nil, err
97+
}
98+
99+
// Create url using host name, protocol and path
100+
hosts := []string{}
101+
for _, h := range escfg.Hosts {
102+
esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200)
103+
if err != nil {
104+
return nil, fmt.Errorf("cannot generate ES URL from host %w", err)
105+
}
106+
hosts = append(hosts, esURL)
107+
}
108+
109+
otelYAMLCfg := map[string]any{
110+
"endpoints": hosts, // hosts, protocol, path, port
111+
112+
// max_conns_per_host is a "hard" limit on number of open connections.
113+
// Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream
114+
// where it could spin as many goroutines as it liked.
115+
// Given that batcher implementation can change and it has a history of such changes,
116+
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
117+
"max_conns_per_host": escfg.NumWorkers(),
118+
119+
// Retry
120+
"retry": map[string]any{
121+
"enabled": true,
122+
"initial_interval": escfg.Backoff.Init, // backoff.init
123+
"max_interval": escfg.Backoff.Max, // backoff.max
124+
"max_retries": escfg.MaxRetries, // max_retries
125+
},
126+
127+
"sending_queue": map[string]any{
128+
"batch": map[string]any{
129+
"flush_timeout": "10s",
130+
"max_size": escfg.BulkMaxSize, // bulk_max_size
131+
"min_size": 0, // 0 means immediately trigger a flush
132+
"sizer": "items",
133+
},
134+
"enabled": true,
135+
"queue_size": getQueueSize(logger, output),
136+
"block_on_overflow": true,
137+
"wait_for_result": true,
138+
"num_consumers": escfg.NumWorkers(),
139+
},
140+
141+
"mapping": map[string]any{
142+
"mode": "bodymap",
143+
},
144+
}
145+
146+
// Compression
147+
otelYAMLCfg["compression"] = "none"
148+
if escfg.CompressionLevel > 0 {
149+
otelYAMLCfg["compression"] = "gzip"
150+
otelYAMLCfg["compression_params"] = map[string]any{
151+
"level": escfg.CompressionLevel,
152+
}
153+
}
154+
155+
// Authentication
156+
setIfNotNil(otelYAMLCfg, "user", escfg.Username) // username
157+
setIfNotNil(otelYAMLCfg, "password", escfg.Password) // password
158+
setIfNotNil(otelYAMLCfg, "api_key", base64.StdEncoding.EncodeToString([]byte(escfg.APIKey))) // api_key
159+
160+
setIfNotNil(otelYAMLCfg, "headers", escfg.Headers) // headers
161+
setIfNotNil(otelYAMLCfg, "pipeline", escfg.Pipeline) // pipeline
162+
// Dynamic routing is disabled if output.elasticsearch.index is set
163+
setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index
164+
165+
// idle_connection_timeout, timeout, ssl block,
166+
// proxy_url, proxy_headers, proxy_disable are handled by beatsauthextension https://github.com/elastic/opentelemetry-collector-components/tree/main/extension/beatsauthextension
167+
// caller of this method should take care of integrating the extension
168+
169+
return otelYAMLCfg, nil
170+
}
171+
172+
// log warning for unsupported config
173+
func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
174+
if cfg.HasField("indices") {
175+
return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported)
176+
} else if cfg.HasField("pipelines") {
177+
return fmt.Errorf("pipelines is currently not supported: %w", errors.ErrUnsupported)
178+
} else if cfg.HasField("parameters") {
179+
return fmt.Errorf("parameters is currently not supported: %w", errors.ErrUnsupported)
180+
} else if value, err := cfg.Bool("allow_older_versions", -1); err == nil && !value {
181+
return fmt.Errorf("allow_older_versions:false is currently not supported: %w", errors.ErrUnsupported)
182+
} else if cfg.HasField("loadbalance") {
183+
return fmt.Errorf("loadbalance is currently not supported: %w", errors.ErrUnsupported)
184+
} else if cfg.HasField("non_indexable_policy") {
185+
return fmt.Errorf("non_indexable_policy is currently not supported: %w", errors.ErrUnsupported)
186+
} else if cfg.HasField("escape_html") {
187+
return fmt.Errorf("escape_html is currently not supported: %w", errors.ErrUnsupported)
188+
} else if cfg.HasField("kerberos") {
189+
return fmt.Errorf("kerberos is currently not supported: %w", errors.ErrUnsupported)
190+
}
191+
192+
return nil
193+
}
194+
195+
// Helper function to check if a struct is empty
196+
func isStructEmpty(s any) bool {
197+
return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface())
198+
}
199+
200+
// Helper function to conditionally add fields to the map
201+
func setIfNotNil(m map[string]any, key string, value any) {
202+
if value == nil {
203+
return
204+
}
205+
206+
v := reflect.ValueOf(value)
207+
208+
switch v.Kind() {
209+
case reflect.String:
210+
if v.String() != "" {
211+
m[key] = value
212+
}
213+
case reflect.Map, reflect.Slice:
214+
if v.Len() > 0 {
215+
m[key] = value
216+
}
217+
case reflect.Struct:
218+
if !isStructEmpty(value) {
219+
m[key] = value
220+
}
221+
default:
222+
m[key] = value
223+
}
224+
}
225+
226+
func getQueueSize(logger *logp.Logger, output *config.C) int {
227+
size, err := output.Int("queue.mem.events", -1)
228+
if err != nil {
229+
logger.Debugf("Failed to get queue size: %v", err)
230+
return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr
231+
}
232+
return int(size)
233+
}
234+
235+
func cfgDecodeHookFunc() mapstructure.DecodeHookFunc {
236+
return func(
237+
f reflect.Type,
238+
t reflect.Type,
239+
data any,
240+
) (any, error) {
241+
if f.Kind() != reflect.String {
242+
return data, nil
243+
}
244+
245+
switch {
246+
case t == reflect.TypeOf(time.Duration(5)):
247+
d, err := time.ParseDuration(data.(string))
248+
if err != nil {
249+
return d, fmt.Errorf("failed parsing duration: %w", err)
250+
} else {
251+
return d, nil
252+
}
253+
case t == reflect.TypeOf(tlscommon.TLSVerificationMode(0)):
254+
verificationMode := tlscommon.TLSVerificationMode(0)
255+
if err := verificationMode.Unpack(data); err != nil {
256+
return nil, fmt.Errorf("failed parsing TLS verification mode: %w", err)
257+
}
258+
return verificationMode, nil
259+
default:
260+
return data, nil
261+
}
262+
}
263+
}

0 commit comments

Comments
 (0)