Skip to content

Commit 71338b8

Browse files
added functional tests
1 parent 8b95f50 commit 71338b8

File tree

3 files changed

+168
-2
lines changed

3 files changed

+168
-2
lines changed

test/framework/functional/framework.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,14 @@ func (f *CollectorFunctionalFramework) addOutputContainers(b *runtime.PodBuilder
427427
return err
428428
}
429429
case obs.OutputTypeElasticsearch:
430-
if err := f.AddESOutput(ElasticsearchVersion(output.Elasticsearch.Version), b, output, nil); err != nil {
431-
return err
430+
if len(output.Elasticsearch.Headers) == 0 {
431+
if err := f.AddESOutput(ElasticsearchVersion(output.Elasticsearch.Version), b, output, nil); err != nil {
432+
return err
433+
}
434+
} else {
435+
if err := f.AddVLOutput(b, output, nil); err != nil {
436+
return err
437+
}
432438
}
433439
case obs.OutputTypeHTTP:
434440
if err := f.AddVectorHttpOutput(b, output); err != nil {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package functional
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"strings"
8+
9+
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
10+
"k8s.io/apimachinery/pkg/util/wait"
11+
12+
"github.com/openshift/cluster-logging-operator/internal/generator/url"
13+
"github.com/openshift/cluster-logging-operator/internal/runtime"
14+
15+
log "github.com/ViaQ/logerr/v2/log/static"
16+
)
17+
18+
func (f *CollectorFunctionalFramework) AddVLOutput(b *runtime.PodBuilder, output obs.OutputSpec, args []string) error {
19+
log.V(2).Info("Adding output for victorialogs", "name", output.Name)
20+
name := strings.ToLower(output.Name)
21+
22+
esURL, err := url.Parse(output.Elasticsearch.URL)
23+
if err != nil {
24+
return err
25+
}
26+
transportPort, portError := determineTransportPort(esURL.Port())
27+
if portError != nil {
28+
return portError
29+
}
30+
31+
log.V(2).Info("Adding container", "name", name)
32+
log.V(2).Info("Adding VictoriaLogs output container", "name", obs.OutputTypeElasticsearch)
33+
34+
cmdArgs := []string{"-httpListenAddr=:" + transportPort}
35+
if len(args) > 0 {
36+
cmdArgs = append(cmdArgs, args...)
37+
}
38+
39+
b.AddContainer(name, "victoriametrics/victoria-logs:1.34.0").
40+
AddRunAsUser(2000).
41+
WithCmdArgs(cmdArgs).
42+
End().
43+
AddContainer(fmt.Sprintf("%s-sidecar", name), "curlimages/curl:8.16.0").
44+
AddRunAsUser(2000).
45+
End()
46+
return nil
47+
}
48+
49+
func (f *CollectorFunctionalFramework) GetLogsFromVL(outputName string, headers map[string]string, options ...Option) (results []string, err error) {
50+
port := "9428"
51+
if found, o := OptionsInclude("port", options); found {
52+
port = o.Value
53+
}
54+
55+
pod := runtime.NewPod(f.Test.NS.Name, outputName)
56+
if err = f.Test.Get(pod); err != nil {
57+
pod = f.Pod
58+
}
59+
60+
err = wait.PollUntilContextTimeout(context.TODO(), defaultRetryInterval, maxDuration, true, func(cxt context.Context) (done bool, err error) {
61+
curlArgs := `-X GET -H "Content-Type: application/json"`
62+
for k, v := range headers {
63+
curlArgs += fmt.Sprintf(` -H "%s: %s"`, k, v)
64+
}
65+
cmd := fmt.Sprintf(`"localhost:%s/select/logsql/query?query='*'" %s`, port, curlArgs)
66+
var result string
67+
result, err = f.RunCommandInPod(pod, fmt.Sprintf("%s-sidecar", outputName), cmd)
68+
if result != "" && err == nil {
69+
scanner := bufio.NewScanner(strings.NewReader(result))
70+
log.V(2).Info("results", "response", results)
71+
for scanner.Scan() {
72+
results = append(results, scanner.Text())
73+
}
74+
}
75+
log.V(4).Info("Polling from VictoriaLogs", "err", err, "result", result)
76+
return false, nil
77+
})
78+
log.V(4).Info("Returning", "logs", results)
79+
return results, err
80+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package elasticsearch
2+
3+
import (
4+
"sort"
5+
"time"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
10+
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
11+
"github.com/openshift/cluster-logging-operator/test/framework/functional"
12+
13+
"github.com/openshift/cluster-logging-operator/test/helpers/types"
14+
obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
15+
)
16+
17+
var _ = Describe("[Functional][Outputs][ElasticSearch] Logforwarding to VictoriaLogs", func() {
18+
19+
var (
20+
framework *functional.CollectorFunctionalFramework
21+
22+
// Template expected as output Log
23+
outputLogTemplate = functional.NewApplicationLogTemplate()
24+
)
25+
26+
Context("should write to victorialogs", func() {
27+
DescribeTable("with custom headers", func(headers map[string]string) {
28+
outputLogTemplate.ViaqIndexName = "app-write"
29+
framework = functional.NewCollectorFunctionalFramework()
30+
obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
31+
FromInput(obs.InputTypeApplication).
32+
ToElasticSearchOutput(func(output *obs.OutputSpec) {
33+
output.Elasticsearch.Headers = headers
34+
output.Elasticsearch.URL = "http://0.0.0.0:9428/insert/elasticsearch/"
35+
})
36+
defer framework.Cleanup()
37+
Expect(framework.Deploy()).To(BeNil())
38+
timestamp := functional.CRIOTime(time.Now())
39+
ukr := "привіт "
40+
jp := "こんにちは "
41+
ch := "你好"
42+
msg := functional.NewCRIOLogMessage(timestamp, ukr+jp+ch, false)
43+
Expect(framework.WriteMessagesToApplicationLog(msg, 1)).To(BeNil())
44+
Expect(framework.WriteMessagesWithNotUTF8SymbolsToLog()).To(BeNil())
45+
requestHeaders := map[string]string{
46+
"AccountID": "0",
47+
"ProjectID": "0",
48+
}
49+
for headerName := range requestHeaders {
50+
if v, ok := headers[headerName]; ok {
51+
requestHeaders[headerName] = v
52+
}
53+
}
54+
raw, err := framework.GetLogsFromVL(string(obs.OutputTypeElasticsearch), requestHeaders)
55+
Expect(err).To(BeNil(), "Expected no errors reading the logs")
56+
Expect(raw).To(Not(BeEmpty()))
57+
// Parse log line
58+
var logs []types.ApplicationLog
59+
err = types.StrictlyParseLogsFromSlice(raw, &logs)
60+
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
61+
Expect(len(logs)).To(Equal(2))
62+
//sort log by time before matching
63+
sort.Slice(logs, func(i, j int) bool {
64+
return logs[i].TimestampLegacy.Before(logs[j].TimestampLegacy)
65+
})
66+
67+
Expect(logs[0].Message).To(Equal(ukr + jp + ch))
68+
Expect(logs[1].Message).To(Equal("������������"))
69+
},
70+
Entry("Non-default tenant ID", map[string]string{
71+
"AccountID": "10",
72+
"ProjectID": "10",
73+
"VL-Msg-Field": "message",
74+
}),
75+
Entry("Add extra fields", map[string]string{
76+
"VL-Extra-Fields": "field_name=field_value",
77+
}),
78+
)
79+
})
80+
})

0 commit comments

Comments
 (0)