Skip to content

Commit 05eea16

Browse files
LOG-7804: support custom headers for Elasticsearch output
1 parent 6cdb348 commit 05eea16

File tree

14 files changed

+333
-9
lines changed

14 files changed

+333
-9
lines changed

api/observability/v1/output_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,12 @@ type Elasticsearch struct {
537537
// +kubebuilder:validation:Required
538538
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="ElasticSearch Version",xDescriptors={"urn:alm:descriptor:com.tectonic.ui:number"}
539539
Version int `json:"version"`
540+
541+
// Headers specify optional headers to be sent with the request
542+
//
543+
// +kubebuilder:validation:Optional
544+
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Headers"
545+
Headers map[string]string `json:"headers,omitempty"`
540546
}
541547

542548
// GoogleCloudLoggingAuthentication contains configuration for authenticating requests to a GoogleCloudLogging output.

api/observability/v1/zz_generated.deepcopy.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,6 +2193,12 @@ spec:
21932193
- secretName
21942194
type: object
21952195
type: object
2196+
headers:
2197+
additionalProperties:
2198+
type: string
2199+
description: Headers specify optional headers to be sent
2200+
with the request
2201+
type: object
21962202
index:
21972203
description: |-
21982204
Index is the index for the logs. This supports template syntax to allow dynamic per-event values.

docs/reference/operator/api_observability_v1.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,6 +2051,8 @@ The 'username@password' part of `url` is ignored.
20512051

20522052
|authentication|object| Authentication sets credentials for authenticating the requests.
20532053

2054+
|headers|object| Headers specify optional headers to be sent with the request
2055+
20542056
|index|string| Index is the index for the logs. This supports template syntax to allow dynamic per-event values.
20552057

20562058
The Index can be a combination of static and dynamic values consisting of field paths followed by `||` followed by another field path or a static value.
@@ -2160,6 +2162,10 @@ Type:: object
21602162

21612163
|======================
21622164

2165+
=== .spec.outputs[].elasticsearch.headers
2166+
2167+
Type:: object
2168+
21632169
=== .spec.outputs[].elasticsearch.tuning
21642170

21652171
Type:: object

internal/generator/vector/output/elasticsearch/elasticsearch.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ if exists(.kubernetes.event.metadata.uid) {
8181
common.NewAcknowledgments(id, strategy),
8282
common.NewBatch(id, strategy),
8383
common.NewBuffer(id, strategy),
84-
common.NewRequest(id, strategy),
84+
Request(id, o.Elasticsearch, strategy),
8585
tls.New(id, o.TLS, secrets, op, Option{Name: URL, Value: o.Elasticsearch.URL}),
8686
)
8787

@@ -110,3 +110,11 @@ func Output(id string, o obs.OutputSpec, inputs []string, index string, secrets
110110
}
111111
return &es
112112
}
113+
114+
func Request(id string, o *obs.Elasticsearch, strategy common.ConfigStrategy) *common.Request {
115+
req := common.NewRequest(id, strategy)
116+
if len(o.Headers) != 0 {
117+
req.SetHeaders(o.Headers)
118+
}
119+
return req
120+
}

internal/generator/vector/output/elasticsearch/elasticsearch_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,5 +145,10 @@ var _ = Describe("Generate Vector config", func() {
145145
BaseOutputTuningSpec: *baseTune,
146146
}
147147
}, true, framework.NoOptions, "es_with_tune.toml"),
148+
Entry("with headers", func(spec *obs.OutputSpec) {
149+
spec.Elasticsearch.Headers = map[string]string{
150+
"Key": "Value",
151+
}
152+
}, true, framework.NoOptions, "es_with_headers.toml"),
148153
)
149154
})
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Elasticsearch Index
2+
[transforms.es_1_index]
3+
type = "remap"
4+
inputs = ["application"]
5+
source = '''
6+
._internal.es_1_index = to_string!(._internal.log_type||"none")
7+
'''
8+
9+
[sinks.es_1]
10+
type = "elasticsearch"
11+
inputs = ["es_1_index"]
12+
endpoints = ["https://es.svc.infra.cluster:9200"]
13+
bulk.index = "{{ _internal.es_1_index }}"
14+
bulk.action = "create"
15+
api_version = "v8"
16+
17+
[sinks.es_1.encoding]
18+
except_fields = ["_internal"]
19+
20+
[sinks.es_1.request]
21+
headers = {"Key"="Value"}
22+
23+
[sinks.es_1.auth]
24+
strategy = "basic"
25+
user = "SECRET[kubernetes_secret.es-1/username]"
26+
password = "SECRET[kubernetes_secret.es-1/password]"

internal/validations/observability/outputs/validate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ func Validate(context internalcontext.ForwarderContext) {
2929
messages = append(messages, validateHttpContentTypeHeaders(out)...)
3030
case obs.OutputTypeLokiStack, obs.OutputTypeOTLP:
3131
messages = append(messages, ValidateTechPreviewAnnotation(out, context)...)
32+
case obs.OutputTypeElasticsearch:
33+
messages = append(messages, validateElasticsearchHeaders(out)...)
3234
}
3335
// Set condition
3436
if len(messages) > 0 {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package outputs
2+
3+
import (
4+
"fmt"
5+
log "github.com/ViaQ/logerr/v2/log/static"
6+
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
7+
"strings"
8+
)
9+
10+
// validateElasticsearchHeaders will validate Elasticsearch custom headers
11+
// it's not allowed to pass "Authorization" and "Content-Type" headers
12+
func validateElasticsearchHeaders(output obs.OutputSpec) (results []string) {
13+
if output.Type == obs.OutputTypeElasticsearch && output.Elasticsearch != nil && len(output.Elasticsearch.Headers) > 0 {
14+
var invalidHeaders []string
15+
for headerName := range output.Elasticsearch.Headers {
16+
if strings.ToLower(headerName) == "authorization" || strings.ToLower(headerName) == "content-type" {
17+
invalidHeaders = append(invalidHeaders, headerName)
18+
}
19+
}
20+
if len(invalidHeaders) > 0 {
21+
log.V(3).Info("validateElasticsearchHeaders failed", "reason", "invalid headers found: ", strings.Join(invalidHeaders, ","))
22+
results = append(results, fmt.Sprintf("invalid headers found: %s", strings.Join(invalidHeaders, ",")))
23+
}
24+
}
25+
return results
26+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package outputs
2+
3+
import (
4+
. "github.com/onsi/ginkgo/v2"
5+
. "github.com/onsi/gomega"
6+
"github.com/openshift/cluster-logging-operator/api/observability/v1"
7+
)
8+
9+
var _ = Describe("[internal][validations] ClusterLogForwarder will validate headers in Elasticsearch Output", func() {
10+
var (
11+
es *v1.Elasticsearch
12+
spec v1.OutputSpec
13+
)
14+
BeforeEach(func() {
15+
es = &v1.Elasticsearch{}
16+
spec = v1.OutputSpec{
17+
Name: "esOutput",
18+
Type: v1.OutputTypeElasticsearch,
19+
Elasticsearch: es,
20+
}
21+
})
22+
23+
Context("#validateElasticsearchHeaders", func() {
24+
25+
It("should pass validation with empty headers", func() {
26+
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
27+
})
28+
It("should pass validation when no invalid headers set", func() {
29+
spec.Elasticsearch.Headers = map[string]string{
30+
"Accept": "application/json",
31+
}
32+
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
33+
})
34+
It("should fail validation when the Content-Type header is set", func() {
35+
spec.Elasticsearch.Headers = map[string]string{
36+
"Content-Type": "application/json",
37+
}
38+
Expect(validateElasticsearchHeaders(spec)).ToNot(BeEmpty())
39+
})
40+
It("should fail validation when the Authorization header is set", func() {
41+
spec.Elasticsearch.Headers = map[string]string{
42+
"Authorization": "test",
43+
}
44+
Expect(validateElasticsearchHeaders(spec)).ToNot(BeEmpty())
45+
})
46+
It("should pass validation when no Elasticsearch Output", func() {
47+
spec = v1.OutputSpec{
48+
Name: "esOutput",
49+
Type: v1.OutputTypeElasticsearch,
50+
Elasticsearch: &v1.Elasticsearch{},
51+
}
52+
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
53+
})
54+
})
55+
})

0 commit comments

Comments
 (0)