diff --git a/internal/generator/vector/output/lokistack/init_lokiStack.go b/internal/generator/vector/output/lokistack/init_lokiStack.go index 2a15badc5..f221bfd8c 100644 --- a/internal/generator/vector/output/lokistack/init_lokiStack.go +++ b/internal/generator/vector/output/lokistack/init_lokiStack.go @@ -33,9 +33,13 @@ func GenerateOutput(outSpec obs.OutputSpec, tenant string) obs.OutputSpec { // GenerateLokiSpec generates and returns a Loki spec for the defined lokistack output func GenerateLokiSpec(ls *obs.LokiStack, tenant string) *obs.Loki { + url := lokiStackURL(ls, tenant, false) + if url == "" { + panic("LokiStack output has no valid URL") + } return &obs.Loki{ URLSpec: obs.URLSpec{ - URL: lokiStackURL(ls, tenant, false), + URL: url, }, Authentication: &obs.HTTPAuthentication{ Token: ls.Authentication.Token, diff --git a/internal/generator/vector/output/lokistack/init_lokiStack_test.go b/internal/generator/vector/output/lokistack/init_lokiStack_test.go index c599506a9..d2937c9cd 100644 --- a/internal/generator/vector/output/lokistack/init_lokiStack_test.go +++ b/internal/generator/vector/output/lokistack/init_lokiStack_test.go @@ -294,3 +294,49 @@ var _ = Describe("#GenerateOutput", func() { ), ) }) + +var _ = Describe("#GenerateLokiSpec", func() { + var ( + lokiStack *obs.LokiStack + ) + + BeforeEach(func() { + lokiStack = &obs.LokiStack{ + Target: obs.LokiStackTarget{ + Name: "test-lokistack", + Namespace: constants.OpenshiftNS, + }, + Authentication: &obs.LokiStackAuthentication{ + Token: &obs.BearerToken{ + From: obs.BearerTokenFromServiceAccount, + }, + }, + } + }) + + Context("when given a valid tenant", func() { + It("should return a valid Loki spec", func() { + var spec *obs.Loki + Expect(func() { + spec = GenerateLokiSpec(lokiStack, string(obs.InputTypeApplication)) + }).ToNot(Panic()) + + Expect(spec).ToNot(BeNil()) + Expect(spec.URL).To(Equal("https://test-lokistack-gateway-http.openshift-logging.svc:8080/api/logs/v1/application")) + Expect(spec.Authentication.Token).To(Equal(lokiStack.Authentication.Token)) + }) + }) + + Context("when given an invalid tenant", func() { + DescribeTable("should panic with a specific message", + func(tenant string) { + Expect(func() { + GenerateLokiSpec(lokiStack, tenant) + }).To(PanicWith("LokiStack output has no valid URL")) + }, + Entry("with an empty tenant", ""), + Entry("with a non-reserved tenant name", "my-custom-logs"), + Entry("with the 'receiver' tenant name", "receiver"), + ) + }) +}) diff --git a/internal/generator/vector/output/lokistack/lokistack.go b/internal/generator/vector/output/lokistack/lokistack.go index 469de0342..0ed8f3e5d 100644 --- a/internal/generator/vector/output/lokistack/lokistack.go +++ b/internal/generator/vector/output/lokistack/lokistack.go @@ -19,47 +19,112 @@ import ( // New creates generate elements that represent configuration to forward logs to Loki using OpenShift Logging tenancy model func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, strategy common.ConfigStrategy, op utils.Options) []framework.Element { - routeID := vectorhelpers.MakeID(id, "route") - routes := map[string]string{} - var clfSpec, _ = utils.GetOption(op, vectorhelpers.CLFSpec, observability.ClusterLogForwarderSpec{}) + clfSpec, _ := utils.GetOption(op, vectorhelpers.CLFSpec, observability.ClusterLogForwarderSpec{}) if len(clfSpec.Inputs) == 0 || len(clfSpec.Pipelines) == 0 || len(clfSpec.Outputs) == 0 { panic("ClusterLogForwarderSpec not found while generating LokiStack config") } inputSpecs := clfSpec.InputSpecsTo(o) - inputTypes := sets.NewString() - for _, inputSpec := range inputSpecs { - inputType := strings.ToLower(inputSpec.Type.String()) - inputTypes.Insert(inputType) - routes[inputType] = fmt.Sprintf("'.log_type == \"%s\"'", inputType) - if inputSpec.Type == obs.InputTypeApplication && observability.IncludesInfraNamespace(inputSpec.Application) { - inputType = strings.ToLower(obs.InputTypeInfrastructure.String()) - routes[inputType] = fmt.Sprintf("'.log_type == \"%s\"'", inputType) - inputTypes.Insert(inputType) - } - } + tenants := determineTenants(inputSpecs) + + routeID := vectorhelpers.MakeID(id, "route") confs := []framework.Element{ elements.Route{ ComponentID: routeID, Inputs: vectorhelpers.MakeInputs(inputs...), - Routes: routes, + Routes: buildRoutes(tenants), }, } - confs = append(confs, elements.NewUnmatched(routeID, op, map[string]string{"output_type": strings.ToLower(obs.OutputTypeLokiStack.String())})) - for _, inputType := range inputTypes.List() { - outputID := vectorhelpers.MakeID(id, inputType) - migratedOutput := GenerateOutput(o, inputType) - log.V(4).Info("migrated lokistack output", "spec", migratedOutput) - factory := loki.New - if migratedOutput.Type == obs.OutputTypeOTLP { - factory = otlp.New - } - inputSources := observability.Inputs(inputSpecs).InputSources(obs.InputType(inputType)) - if len(inputSources) == 0 && obs.InputType(inputType) == obs.InputTypeInfrastructure { - inputSources = append(inputSources, observability.ReservedInfrastructureSources.List()...) - } - op[otlp.OtlpLogSourcesOption] = inputSources - confs = append(confs, factory(outputID, migratedOutput, []string{vectorhelpers.MakeRouteInputID(routeID, inputType)}, secrets, strategy, op)...) + + confs = append(confs, elements.NewUnmatched(routeID, op, map[string]string{ + "output_type": strings.ToLower(obs.OutputTypeLokiStack.String()), + })) + + for _, inputType := range tenants.List() { + confs = append(confs, generateSinkForTenant(id, routeID, inputType, o, inputSpecs, secrets, strategy, op)...) } + return confs } + +func determineTenants(inputSpecs []obs.InputSpec) *sets.String { + tenants := sets.NewString() + + for _, inputSpec := range inputSpecs { + switch inputSpec.Type { + case obs.InputTypeApplication: + tenants.Insert(string(obs.InputTypeApplication)) + if observability.IncludesInfraNamespace(inputSpec.Application) { + tenants.Insert(string(obs.InputTypeInfrastructure)) + } + case obs.InputTypeAudit: + tenants.Insert(string(obs.InputTypeAudit)) + case obs.InputTypeInfrastructure: + tenants.Insert(string(obs.InputTypeInfrastructure)) + case obs.InputTypeReceiver: + tenants.Insert(getTenantForReceiver(inputSpec.Receiver.Type)) + } + } + + return tenants +} + +func getTenantForReceiver(receiverType obs.ReceiverType) string { + if receiverType == obs.ReceiverTypeHTTP { + return string(obs.InputTypeAudit) + } + return string(obs.InputTypeInfrastructure) +} + +func buildRoutes(tenants *sets.String) map[string]string { + routes := make(map[string]string, tenants.Len()) + for _, tenant := range tenants.List() { + routes[tenant] = fmt.Sprintf("'.log_type == \"%s\"'", tenant) + } + return routes +} + +func generateSinkForTenant(id, routeID, inputType string, o obs.OutputSpec, inputSpecs []obs.InputSpec, + secrets observability.Secrets, strategy common.ConfigStrategy, op utils.Options) []framework.Element { + + outputID := vectorhelpers.MakeID(id, inputType) + migratedOutput := GenerateOutput(o, inputType) + log.V(4).Info("migrated lokistack output", "spec", migratedOutput) + + factoryInput := vectorhelpers.MakeRouteInputID(routeID, inputType) + + if migratedOutput.Type == obs.OutputTypeOTLP { + op[otlp.OtlpLogSourcesOption] = getInputSources(inputSpecs, obs.InputType(inputType)) + return otlp.New(outputID, migratedOutput, []string{factoryInput}, secrets, strategy, op) + } + + return loki.New(outputID, migratedOutput, []string{factoryInput}, secrets, strategy, op) +} + +func getInputSources(inputSpecs []obs.InputSpec, inputType obs.InputType) []string { + inputSources := observability.Inputs(inputSpecs).InputSources(inputType) + + if len(inputSources) == 0 && inputType == obs.InputTypeInfrastructure { + inputSources = append(inputSources, observability.ReservedInfrastructureSources.List()...) + } + + addReceiverSources(&inputSources, inputSpecs, inputType) + + return inputSources +} + +func addReceiverSources(inputSources *[]string, inputSpecs []obs.InputSpec, inputType obs.InputType) { + for _, is := range inputSpecs { + if is.Type != obs.InputTypeReceiver { + continue + } + + if inputType == obs.InputTypeAudit && is.Receiver.Type == obs.ReceiverTypeHTTP { + *inputSources = append(*inputSources, "receiver.http") + } + + if inputType == obs.InputTypeInfrastructure && is.Receiver.Type == obs.ReceiverTypeSyslog { + *inputSources = append(*inputSources, "receiver.syslog") + } + } +} \ No newline at end of file diff --git a/internal/generator/vector/output/lokistack/lokistack_test.go b/internal/generator/vector/output/lokistack/lokistack_test.go index 21f61f34b..2995583c7 100644 --- a/internal/generator/vector/output/lokistack/lokistack_test.go +++ b/internal/generator/vector/output/lokistack/lokistack_test.go @@ -98,6 +98,39 @@ var _ = Describe("Generate vector config", func() { }), } } + + initReceiverOptions = func() utils.Options { + output := initOutput() + return utils.Options{ + framework.OptionServiceAccountTokenSecretName: saTokenSecretName, + helpers.CLFSpec: observability.ClusterLogForwarderSpec(obs.ClusterLogForwarderSpec{ + Outputs: []obs.OutputSpec{output}, + Pipelines: []obs.PipelineSpec{ + { + Name: "lokistack-receivers", + InputRefs: []string{"http-receiver", "syslog-receiver"}, + OutputRefs: []string{output.Name}, + }, + }, + Inputs: []obs.InputSpec{ + { + Name: "http-receiver", + Type: obs.InputTypeReceiver, + Receiver: &obs.ReceiverSpec{ + Type: obs.ReceiverTypeHTTP, + }, + }, + { + Name: "syslog-receiver", + Type: obs.InputTypeReceiver, + Receiver: &obs.ReceiverSpec{ + Type: obs.ReceiverTypeSyslog, + }, + }, + }, + }), + } + } ) DescribeTable("for LokiStack output", func(expFile string, op framework.Options, tune bool, visit func(spec *obs.OutputSpec)) { exp, err := tomlContent.ReadFile(expFile) @@ -118,5 +151,6 @@ var _ = Describe("Generate vector config", func() { Entry("with Otel datamodel", "lokistack_otel.toml", initOptions(), false, func(spec *obs.OutputSpec) { spec.LokiStack.DataModel = obs.LokiStackDataModelOpenTelemetry }), + Entry("with ViaQ datamodel with receiver", "lokistack_viaq_receiver.toml", initReceiverOptions(), false, func(spec *obs.OutputSpec) {}), ) }) diff --git a/internal/generator/vector/output/lokistack/lokistack_viaq_receiver.toml b/internal/generator/vector/output/lokistack/lokistack_viaq_receiver.toml new file mode 100644 index 000000000..144b17742 --- /dev/null +++ b/internal/generator/vector/output/lokistack/lokistack_viaq_receiver.toml @@ -0,0 +1,143 @@ +[transforms.output_default_lokistack_route] +type = "route" +inputs = ["pipeline_fake"] +route.audit = '.log_type == "audit"' +route.infrastructure = '.log_type == "infrastructure"' + + + + + +[transforms.output_default_lokistack_route_unmatched] +inputs = ["output_default_lokistack_route._unmatched"] +type = "log_to_metric" + +[[transforms.output_default_lokistack_route_unmatched.metrics]] +field = "message" +kind = "incremental" +name = "component_event_unmatched_count" +namespace = "logcollector" +tags = {component_id = "output_default_lokistack_route", log_source = "{{ log_source }}", log_type = "{{ log_type }}", output_type = "lokistack"} +type = "counter" + + + +[transforms.output_default_lokistack_audit_remap] +type = "remap" +inputs = ["output_default_lokistack_route.audit"] +source = ''' + del(.tag) + ''' + +[transforms.output_default_lokistack_audit_remap_label] +type = "remap" +inputs = ["output_default_lokistack_audit_remap"] +source = ''' + + if !exists(.kubernetes.namespace_name) { + .kubernetes.namespace_name = "" + } + if !exists(.kubernetes.pod_name) { + .kubernetes.pod_name = "" + } + if !exists(.kubernetes.container_name) { + .kubernetes.container_name = "" + } + ''' + +[sinks.output_default_lokistack_audit] +type = "loki" +inputs = ["output_default_lokistack_audit_remap_label"] +endpoint = "https://logging-loki-gateway-http.openshift-logging.svc:8080/api/logs/v1/audit" +out_of_order_action = "accept" +healthcheck.enabled = false + + +[sinks.output_default_lokistack_audit.encoding] +codec = "json" + +except_fields = ["_internal"] + + + + + +[sinks.output_default_lokistack_audit.labels] +k8s_container_name = "{{kubernetes.container_name}}" +k8s_namespace_name = "{{kubernetes.namespace_name}}" +k8s_node_name = "${VECTOR_SELF_NODE_NAME}" +k8s_pod_name = "{{kubernetes.pod_name}}" +kubernetes_container_name = "{{kubernetes.container_name}}" +kubernetes_host = "${VECTOR_SELF_NODE_NAME}" +kubernetes_namespace_name = "{{kubernetes.namespace_name}}" +kubernetes_pod_name = "{{kubernetes.pod_name}}" +log_type = "{{log_type}}" +openshift_log_type = "{{log_type}}" + +[sinks.output_default_lokistack_audit.tls] + +ca_file = "/var/run/ocp-collector/config/openshift-service-ca.crt/ca-bundle.crt" + +[sinks.output_default_lokistack_audit.auth] +strategy = "bearer" +token = "SECRET[kubernetes_secret.test-sa-token/token]" + +[transforms.output_default_lokistack_infrastructure_remap] +type = "remap" +inputs = ["output_default_lokistack_route.infrastructure"] +source = ''' + del(.tag) + ''' + +[transforms.output_default_lokistack_infrastructure_remap_label] +type = "remap" +inputs = ["output_default_lokistack_infrastructure_remap"] +source = ''' + + if !exists(.kubernetes.namespace_name) { + .kubernetes.namespace_name = "" + } + if !exists(.kubernetes.pod_name) { + .kubernetes.pod_name = "" + } + if !exists(.kubernetes.container_name) { + .kubernetes.container_name = "" + } + ''' + +[sinks.output_default_lokistack_infrastructure] +type = "loki" +inputs = ["output_default_lokistack_infrastructure_remap_label"] +endpoint = "https://logging-loki-gateway-http.openshift-logging.svc:8080/api/logs/v1/infrastructure" +out_of_order_action = "accept" +healthcheck.enabled = false + + +[sinks.output_default_lokistack_infrastructure.encoding] +codec = "json" + +except_fields = ["_internal"] + + + + + +[sinks.output_default_lokistack_infrastructure.labels] +k8s_container_name = "{{kubernetes.container_name}}" +k8s_namespace_name = "{{kubernetes.namespace_name}}" +k8s_node_name = "${VECTOR_SELF_NODE_NAME}" +k8s_pod_name = "{{kubernetes.pod_name}}" +kubernetes_container_name = "{{kubernetes.container_name}}" +kubernetes_host = "${VECTOR_SELF_NODE_NAME}" +kubernetes_namespace_name = "{{kubernetes.namespace_name}}" +kubernetes_pod_name = "{{kubernetes.pod_name}}" +log_type = "{{log_type}}" +openshift_log_type = "{{log_type}}" + +[sinks.output_default_lokistack_infrastructure.tls] + +ca_file = "/var/run/ocp-collector/config/openshift-service-ca.crt/ca-bundle.crt" + +[sinks.output_default_lokistack_infrastructure.auth] +strategy = "bearer" +token = "SECRET[kubernetes_secret.test-sa-token/token]" \ No newline at end of file diff --git a/test/e2e/logforwarding/lokistack/forward_to_lokistack_test.go b/test/e2e/logforwarding/lokistack/forward_to_lokistack_test.go index dcc843cd1..75b963277 100644 --- a/test/e2e/logforwarding/lokistack/forward_to_lokistack_test.go +++ b/test/e2e/logforwarding/lokistack/forward_to_lokistack_test.go @@ -1,16 +1,17 @@ package lokistack import ( + "encoding/json" "fmt" + "time" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" "github.com/openshift/cluster-logging-operator/internal/constants" "github.com/openshift/cluster-logging-operator/internal/runtime" obsruntime "github.com/openshift/cluster-logging-operator/internal/runtime/observability" framework "github.com/openshift/cluster-logging-operator/test/framework/e2e" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - obs "github.com/openshift/cluster-logging-operator/api/observability/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -19,6 +20,7 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() { const ( forwarderName = "my-forwarder" logGenName = "log-generator" + outputName = "lokistack-output" ) var ( err error @@ -53,8 +55,7 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() { Fail(err.Error()) } - outputName := "lokistack-otlp" - forwarder = obsruntime.NewClusterLogForwarder(deployNS, "my-forwarder", runtime.Initialize, func(clf *obs.ClusterLogForwarder) { + forwarder = obsruntime.NewClusterLogForwarder(deployNS, forwarderName, runtime.Initialize, func(clf *obs.ClusterLogForwarder) { clf.Spec.ServiceAccount.Name = serviceAccount.Name clf.Annotations = map[string]string{constants.AnnotationOtlpOutputTechPreview: "true"} clf.Spec.Pipelines = append(clf.Spec.Pipelines, obs.PipelineSpec{ @@ -163,6 +164,130 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() { Expect(found).To(BeTrue()) }) + It("should send logs to lokistack with HTTP receiver as audit logs", func() { + const ( + httpReceiverPort = 8080 + httpReceiver = "http-audit" + ) + + forwarder.Spec.Inputs = []obs.InputSpec{ + { + Name: httpReceiver, + Type: obs.InputTypeReceiver, + Receiver: &obs.ReceiverSpec{ + Type: obs.ReceiverTypeHTTP, + Port: httpReceiverPort, + HTTP: &obs.HTTPReceiver{ + Format: obs.HTTPReceiverFormatKubeAPIAudit, + }, + }, + }, + } + + forwarder.Spec.Pipelines = []obs.PipelineSpec{ + { + Name: "input-receiver-logs", + OutputRefs: []string{outputName}, + InputRefs: []string{httpReceiver}, + }, + } + + forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut) + + if err := e2e.CreateObservabilityClusterLogForwarder(forwarder); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of logforwarder: %v", err)) + } + if err := e2e.WaitForDaemonSet(forwarder.Namespace, forwarder.Name); err != nil { + Fail(err.Error()) + } + + httpReceiverServiceName := fmt.Sprintf("%s-%s", forwarderName, httpReceiver) + httpReceiverEndpoint := fmt.Sprintf("https://%s.%s.svc.cluster.local:%d", httpReceiverServiceName, deployNS, httpReceiverPort) + + if err = e2e.DeployCURLLogGeneratorWithNamespaceAndEndpoint(deployNS, httpReceiverEndpoint); err != nil { + Fail(fmt.Sprintf("unable to deploy log generator %v.", err)) + } + + found, err := lokistackReceiver.HasAuditLogs(serviceAccount.Name, framework.DefaultWaitForLogsTimeout) + Expect(err).To(BeNil()) + Expect(found).To(BeTrue()) + }) + + It("should send logs to lokistack with Syslog receiver as infrastructure logs", func() { + const ( + syslogReceiver = "syslog-infra" + syslogReceiverPort = 8443 + syslogLogGenerator = "syslog-log-generator" + ) + + forwarder.Spec.Inputs = []obs.InputSpec{ + { + Name: syslogReceiver, + Type: obs.InputTypeReceiver, + Receiver: &obs.ReceiverSpec{ + Port: syslogReceiverPort, + Type: obs.ReceiverTypeSyslog, + }, + }, + } + + forwarder.Spec.Pipelines = []obs.PipelineSpec{ + { + Name: "input-receiver-logs", + OutputRefs: []string{outputName}, + InputRefs: []string{syslogReceiver}, + }, + } + + forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut) + + if err := e2e.CreateObservabilityClusterLogForwarder(forwarder); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of logforwarder: %v", err)) + } + if err := e2e.WaitForDaemonSet(forwarder.Namespace, forwarder.Name); err != nil { + Fail(err.Error()) + } + + if err = e2e.DeploySocat(forwarder.Namespace, syslogLogGenerator, forwarderName, syslogReceiver, framework.NewDefaultLogGeneratorOptions()); err != nil { + Fail(fmt.Sprintf("unable to deploy log generator %v.", err)) + } + + requiredApps := write2syslog(e2e, forwarder, syslogLogGenerator, syslogReceiverPort) + requiredAppsChecklist := map[string]bool{} + for _, app := range requiredApps { + requiredAppsChecklist[app] = false + } + + type LogLineData struct { + AppName string `json:"appname"` + } + + Eventually(func(g Gomega) { + res, err := lokistackReceiver.InfrastructureLogs(serviceAccount.Name, 0, len(requiredApps)) + g.Expect(err).To(BeNil()) + for _, stream := range res { + for _, valPair := range stream.Values { + logLine := valPair[1] + var data LogLineData + err := json.Unmarshal([]byte(logLine), &data) + if err != nil { + GinkgoWriter.Printf("Failed to parse log line: %v\n", err) + continue + } + appName := data.AppName + if _, isRequired := requiredAppsChecklist[appName]; isRequired { + requiredAppsChecklist[appName] = true + } + } + } + + for appName, found := range requiredAppsChecklist { + g.Expect(found).To(BeTrue(), "Failed to find required app '%s' in log streams", appName) + } + + }).WithTimeout(framework.DefaultWaitForLogsTimeout).WithPolling(5 * time.Second).Should(Succeed()) + }) + It("should send logs to lokistack with otel equivalent default labels when data model is viaq", func() { forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut) @@ -240,4 +365,40 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() { e2e.Cleanup() e2e.WaitForCleanupCompletion(logGenNS, []string{"test"}) }) + }) + +func write2syslog(e2e *framework.E2ETestFramework, fwd *obs.ClusterLogForwarder, logGenPodName string, port int32) []string { + const ( + host = "acme.com" + pid = 6868 + msg = "Choose Your Destiny" + msgId = "ID7" + caFile = "/etc/collector/syslog/tls.crt" + keyFile = "/etc/collector/syslog/tls.key" + ) + destinationHost := fmt.Sprintf("%s-syslog-infra.%s.svc.cluster.local", fwd.Name, fwd.Namespace) + socatCmd := fmt.Sprintf("socat openssl-connect:%s:%d,verify=0,cafile=%s,cert=%s,key=%s -", + destinationHost, port, caFile, caFile, keyFile) + + now := time.Now() + utcTime := now.UTC() + rfc5425Date := utcTime.Format(time.RFC3339) + rfc3164Date := utcTime.Format(time.Stamp) + + rfc3164AppName := "app_rfc3164" + rfc5425AppName := "app_rfc5425" + + // RFC5424 format: ver timestamp hostname app-name procid msgid SD msg + rfc5425 := fmt.Sprintf("<39>1 %s %s %s %d %s - %s", rfc5425Date, host, rfc5425AppName, pid, msgId, msg) + // RFC3164 format: timestamp hostname app-name[procid]: msg + rfc3164 := fmt.Sprintf("<30>%s %s %s[%d]: %s", rfc3164Date, host, rfc3164AppName, pid, msg) + + cmd := fmt.Sprintf("echo %q | %s; echo %q | %s", rfc3164, socatCmd, rfc5425, socatCmd) + + _, err := e2e.PodExec(fwd.Namespace, logGenPodName, logGenPodName, []string{"/bin/sh", "-c", cmd}) + if err != nil { + Fail(fmt.Sprintf("Error execution write command: %v", err)) + } + return []string{rfc5425AppName, rfc3164AppName} +} diff --git a/test/framework/e2e/framework.go b/test/framework/e2e/framework.go index afadad368..f616f6baf 100644 --- a/test/framework/e2e/framework.go +++ b/test/framework/e2e/framework.go @@ -145,8 +145,8 @@ func (tc *E2ETestFramework) DeployLogGeneratorWithNamespaceName(namespace, name } // DeploySocat will deploy pod with socat software -func (tc *E2ETestFramework) DeploySocat(namespace, name, forwarderName string, options LogGeneratorOptions) error { - pod := testruntime.NewSocatPod(namespace, name, forwarderName, options.Labels) +func (tc *E2ETestFramework) DeploySocat(namespace, name, forwarderName, receiverName string, options LogGeneratorOptions) error { + pod := testruntime.NewSocatPod(namespace, name, forwarderName, receiverName, options.Labels) if err := tc.WaitForResourceCondition(namespace, "serviceaccount", "default", "", "{}", 10, func(string) (bool, error) { return true, nil }); err != nil { return err } diff --git a/test/framework/e2e/lokistack.go b/test/framework/e2e/lokistack.go index 30f2745bc..6c464815a 100644 --- a/test/framework/e2e/lokistack.go +++ b/test/framework/e2e/lokistack.go @@ -447,19 +447,25 @@ func (ls LokistackLogStore) GetApplicationLogsWithPipeline(saName, expression st func (ls LokistackLogStore) HasApplicationLogs(saName string, timeToWait time.Duration) (bool, error) { query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeApplication) - result, err := ls.QueryUntil(query, "", string(obs.InputTypeApplication), saName, 1, defaultTimeout) + result, err := ls.QueryUntil(query, "", string(obs.InputTypeApplication), saName, 1, timeToWait) return len(result) > 0, errors.Wrap(err, "error determining if logstore has application logs") } func (ls LokistackLogStore) HasInfrastructureLogs(saName string, timeToWait time.Duration) (bool, error) { - query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeInfrastructure) - result, err := ls.QueryUntil(query, "", string(obs.InputTypeInfrastructure), saName, 1, defaultTimeout) + result, err := ls.InfrastructureLogs(saName, timeToWait, 1) return len(result) > 0, errors.Wrap(err, "error determining if logstore has infrastructure logs") } +func (ls LokistackLogStore) InfrastructureLogs(saName string, timeToWait time.Duration, limit int) ([]lokitesthelper.StreamValues, error) { + query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeInfrastructure) + result, err := ls.QueryUntil(query, "", string(obs.InputTypeInfrastructure), saName, limit, timeToWait) + clolog.V(3).Info("Loki Query", "result", test.JSONString(result)) + return result, errors.Wrap(err, "error determining if logstore has infrastructure logs") +} + func (ls LokistackLogStore) HasAuditLogs(saName string, timeToWait time.Duration) (bool, error) { query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeAudit) - result, err := ls.QueryUntil(query, "", string(obs.InputTypeAudit), saName, 1, defaultTimeout) + result, err := ls.QueryUntil(query, "", string(obs.InputTypeAudit), saName, 1, timeToWait) return len(result) > 0, errors.Wrap(err, "error determining if logstore has audit logs") } diff --git a/test/runtime/log_generator.go b/test/runtime/log_generator.go index 2ae009e12..12a7e5f90 100644 --- a/test/runtime/log_generator.go +++ b/test/runtime/log_generator.go @@ -94,10 +94,10 @@ func NewCURLLogGenerator(namespace, name, endpoint string, count int, delay time } // NewSocatPod creates pods with socat software which allow advance network call e.g. syslog message -func NewSocatPod(namespace, name, forwarderName string, labels map[string]string) *corev1.Pod { +func NewSocatPod(namespace, name, forwarderName, receiverName string, labels map[string]string) *corev1.Pod { var containers []corev1.Container containerName := name - + commonName := fmt.Sprintf("%s-%s", forwarderName, receiverName) containers = append(containers, corev1.Container{ Name: containerName, Image: "quay.io/openshift-logging/alpine-socat:1.8.0.0", @@ -109,7 +109,7 @@ func NewSocatPod(namespace, name, forwarderName string, labels map[string]string }, }, VolumeMounts: []corev1.VolumeMount{{ - Name: fmt.Sprintf("%s-syslog", forwarderName), + Name: commonName, ReadOnly: true, MountPath: "/etc/collector/syslog", }}, @@ -127,9 +127,9 @@ func NewSocatPod(namespace, name, forwarderName string, labels map[string]string pod.Labels[k] = v } pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ - Name: fmt.Sprintf("%s-syslog", forwarderName), VolumeSource: corev1.VolumeSource{ + Name: commonName, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: fmt.Sprintf("%s-syslog", forwarderName), + SecretName: commonName, }, }, })