Skip to content

Commit 089f8b6

Browse files
committed
add syslog receiver test
1 parent 8aa7324 commit 089f8b6

File tree

4 files changed

+186
-65
lines changed

4 files changed

+186
-65
lines changed

test/e2e/logforwarding/lokistack/forward_to_lokistack_test.go

Lines changed: 169 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
package lokistack
22

33
import (
4+
"encoding/json"
45
"fmt"
6+
"time"
57

8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
611
"github.com/openshift/cluster-logging-operator/internal/constants"
712
"github.com/openshift/cluster-logging-operator/internal/runtime"
813
obsruntime "github.com/openshift/cluster-logging-operator/internal/runtime/observability"
914
framework "github.com/openshift/cluster-logging-operator/test/framework/e2e"
10-
11-
. "github.com/onsi/ginkgo/v2"
12-
. "github.com/onsi/gomega"
13-
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
1415
corev1 "k8s.io/api/core/v1"
1516
"k8s.io/apimachinery/pkg/api/resource"
1617
)
1718

1819
var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() {
1920
const (
20-
forwarderName = "my-forwarder"
21-
logGenName = "log-generator"
22-
httpReceiverPort = 8080
23-
httpReceiver = "http-audit"
21+
forwarderName = "my-forwarder"
22+
logGenName = "log-generator"
23+
outputName = "lokistack-output"
2424
)
2525
var (
2626
err error
@@ -55,8 +55,7 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() {
5555
Fail(err.Error())
5656
}
5757

58-
outputName := "lokistack-otlp"
59-
forwarder = obsruntime.NewClusterLogForwarder(deployNS, "my-forwarder", runtime.Initialize, func(clf *obs.ClusterLogForwarder) {
58+
forwarder = obsruntime.NewClusterLogForwarder(deployNS, forwarderName, runtime.Initialize, func(clf *obs.ClusterLogForwarder) {
6059
clf.Spec.ServiceAccount.Name = serviceAccount.Name
6160
clf.Annotations = map[string]string{constants.AnnotationOtlpOutputTechPreview: "true"}
6261
clf.Spec.Pipelines = append(clf.Spec.Pipelines, obs.PipelineSpec{
@@ -165,6 +164,130 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() {
165164
Expect(found).To(BeTrue())
166165
})
167166

167+
It("should send logs to lokistack with HTTP receiver as audit logs", func() {
168+
const (
169+
httpReceiverPort = 8080
170+
httpReceiver = "http-audit"
171+
)
172+
173+
forwarder.Spec.Inputs = []obs.InputSpec{
174+
{
175+
Name: httpReceiver,
176+
Type: obs.InputTypeReceiver,
177+
Receiver: &obs.ReceiverSpec{
178+
Type: obs.ReceiverTypeHTTP,
179+
Port: httpReceiverPort,
180+
HTTP: &obs.HTTPReceiver{
181+
Format: obs.HTTPReceiverFormatKubeAPIAudit,
182+
},
183+
},
184+
},
185+
}
186+
187+
forwarder.Spec.Pipelines = []obs.PipelineSpec{
188+
{
189+
Name: "input-receiver-logs",
190+
OutputRefs: []string{outputName},
191+
InputRefs: []string{httpReceiver},
192+
},
193+
}
194+
195+
forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut)
196+
197+
if err := e2e.CreateObservabilityClusterLogForwarder(forwarder); err != nil {
198+
Fail(fmt.Sprintf("Unable to create an instance of logforwarder: %v", err))
199+
}
200+
if err := e2e.WaitForDaemonSet(forwarder.Namespace, forwarder.Name); err != nil {
201+
Fail(err.Error())
202+
}
203+
204+
httpReceiverServiceName := fmt.Sprintf("%s-%s", forwarderName, httpReceiver)
205+
httpReceiverEndpoint := fmt.Sprintf("https://%s.%s.svc.cluster.local:%d", httpReceiverServiceName, deployNS, httpReceiverPort)
206+
207+
if err = e2e.DeployCURLLogGeneratorWithNamespaceAndEndpoint(deployNS, httpReceiverEndpoint); err != nil {
208+
Fail(fmt.Sprintf("unable to deploy log generator %v.", err))
209+
}
210+
211+
found, err := lokistackReceiver.HasAuditLogs(serviceAccount.Name, framework.DefaultWaitForLogsTimeout)
212+
Expect(err).To(BeNil())
213+
Expect(found).To(BeTrue())
214+
})
215+
216+
It("should send logs to lokistack with Syslog receiver as infrastructure logs", func() {
217+
const (
218+
syslogReceiver = "syslog-infra"
219+
syslogReceiverPort = 8443
220+
syslogLogGenerator = "syslog-log-generator"
221+
)
222+
223+
forwarder.Spec.Inputs = []obs.InputSpec{
224+
{
225+
Name: syslogReceiver,
226+
Type: obs.InputTypeReceiver,
227+
Receiver: &obs.ReceiverSpec{
228+
Port: syslogReceiverPort,
229+
Type: obs.ReceiverTypeSyslog,
230+
},
231+
},
232+
}
233+
234+
forwarder.Spec.Pipelines = []obs.PipelineSpec{
235+
{
236+
Name: "input-receiver-logs",
237+
OutputRefs: []string{outputName},
238+
InputRefs: []string{syslogReceiver},
239+
},
240+
}
241+
242+
forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut)
243+
244+
if err := e2e.CreateObservabilityClusterLogForwarder(forwarder); err != nil {
245+
Fail(fmt.Sprintf("Unable to create an instance of logforwarder: %v", err))
246+
}
247+
if err := e2e.WaitForDaemonSet(forwarder.Namespace, forwarder.Name); err != nil {
248+
Fail(err.Error())
249+
}
250+
251+
if err = e2e.DeploySocat(forwarder.Namespace, syslogLogGenerator, forwarderName, syslogReceiver, framework.NewDefaultLogGeneratorOptions()); err != nil {
252+
Fail(fmt.Sprintf("unable to deploy log generator %v.", err))
253+
}
254+
255+
requiredApps := write2syslog(e2e, forwarder, syslogLogGenerator, syslogReceiverPort)
256+
requiredAppsChecklist := map[string]bool{}
257+
for _, app := range requiredApps {
258+
requiredAppsChecklist[app] = false
259+
}
260+
261+
type LogLineData struct {
262+
AppName string `json:"appname"`
263+
}
264+
265+
Eventually(func(g Gomega) {
266+
res, err := lokistackReceiver.InfrastructureLogs(serviceAccount.Name, 0, len(requiredApps))
267+
g.Expect(err).To(BeNil())
268+
for _, stream := range res {
269+
for _, valPair := range stream.Values {
270+
logLine := valPair[1]
271+
var data LogLineData
272+
err := json.Unmarshal([]byte(logLine), &data)
273+
if err != nil {
274+
GinkgoWriter.Printf("Failed to parse log line: %v\n", err)
275+
continue
276+
}
277+
appName := data.AppName
278+
if _, isRequired := requiredAppsChecklist[appName]; isRequired {
279+
requiredAppsChecklist[appName] = true
280+
}
281+
}
282+
}
283+
284+
for appName, found := range requiredAppsChecklist {
285+
g.Expect(found).To(BeTrue(), "Failed to find required app '%s' in log streams", appName)
286+
}
287+
288+
}).WithTimeout(framework.DefaultWaitForLogsTimeout).WithPolling(5 * time.Second).Should(Succeed())
289+
})
290+
168291
It("should send logs to lokistack with otel equivalent default labels when data model is viaq", func() {
169292
forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut)
170293

@@ -238,52 +361,44 @@ var _ = Describe("[ClusterLogForwarder] Forward to Lokistack", func() {
238361
Expect(found).To(BeTrue())
239362
})
240363

241-
It("should send logs to lokistack with HTTP receiver as audit logs", func() {
242-
forwarder.Spec.Inputs = []obs.InputSpec{
243-
{
244-
Name: httpReceiver,
245-
Type: obs.InputTypeReceiver,
246-
Receiver: &obs.ReceiverSpec{
247-
Type: obs.ReceiverTypeHTTP,
248-
Port: httpReceiverPort,
249-
HTTP: &obs.HTTPReceiver{
250-
Format: obs.HTTPReceiverFormatKubeAPIAudit,
251-
},
252-
},
253-
},
254-
}
255-
256-
forwarder.Spec.Pipelines = []obs.PipelineSpec{
257-
{
258-
Name: "input-receiver-logs",
259-
OutputRefs: []string{"lokistack-otlp"},
260-
InputRefs: []string{httpReceiver},
261-
},
262-
}
263-
264-
forwarder.Spec.Outputs = append(forwarder.Spec.Outputs, *lokiStackOut)
265-
266-
if err := e2e.CreateObservabilityClusterLogForwarder(forwarder); err != nil {
267-
Fail(fmt.Sprintf("Unable to create an instance of logforwarder: %v", err))
268-
}
269-
if err := e2e.WaitForDaemonSet(forwarder.Namespace, forwarder.Name); err != nil {
270-
Fail(err.Error())
271-
}
272-
273-
httpReceiverServiceName := fmt.Sprintf("%s-%s", forwarderName, httpReceiver)
274-
httpReceiverEndpoint := fmt.Sprintf("https://%s.%s.svc.cluster.local:%d", httpReceiverServiceName, deployNS, httpReceiverPort)
275-
276-
if err = e2e.DeployCURLLogGeneratorWithNamespaceAndEndpoint(deployNS, httpReceiverEndpoint); err != nil {
277-
Fail(fmt.Sprintf("unable to deploy log generator %v.", err))
278-
}
279-
280-
found, err := lokistackReceiver.HasAuditLogs(serviceAccount.Name, framework.DefaultWaitForLogsTimeout)
281-
Expect(err).To(BeNil())
282-
Expect(found).To(BeTrue())
283-
})
284-
285364
AfterEach(func() {
286365
e2e.Cleanup()
287366
e2e.WaitForCleanupCompletion(logGenNS, []string{"test"})
288367
})
368+
289369
})
370+
371+
func write2syslog(e2e *framework.E2ETestFramework, fwd *obs.ClusterLogForwarder, logGenPodName string, port int32) []string {
372+
const (
373+
host = "acme.com"
374+
pid = 6868
375+
msg = "Choose Your Destiny"
376+
msgId = "ID7"
377+
caFile = "/etc/collector/syslog/tls.crt"
378+
keyFile = "/etc/collector/syslog/tls.key"
379+
)
380+
destinationHost := fmt.Sprintf("%s-syslog-infra.%s.svc.cluster.local", fwd.Name, fwd.Namespace)
381+
socatCmd := fmt.Sprintf("socat openssl-connect:%s:%d,verify=0,cafile=%s,cert=%s,key=%s -",
382+
destinationHost, port, caFile, caFile, keyFile)
383+
384+
now := time.Now()
385+
utcTime := now.UTC()
386+
rfc5425Date := utcTime.Format(time.RFC3339)
387+
rfc3164Date := utcTime.Format(time.Stamp)
388+
389+
rfc3164AppName := "app_rfc3164"
390+
rfc5425AppName := "app_rfc5425"
391+
392+
// RFC5424 format: <pri>ver timestamp hostname app-name procid msgid SD msg
393+
rfc5425 := fmt.Sprintf("<39>1 %s %s %s %d %s - %s", rfc5425Date, host, rfc5425AppName, pid, msgId, msg)
394+
// RFC3164 format: <pri>timestamp hostname app-name[procid]: msg
395+
rfc3164 := fmt.Sprintf("<30>%s %s %s[%d]: %s", rfc3164Date, host, rfc3164AppName, pid, msg)
396+
397+
cmd := fmt.Sprintf("echo %q | %s; echo %q | %s", rfc3164, socatCmd, rfc5425, socatCmd)
398+
399+
_, err := e2e.PodExec(fwd.Namespace, logGenPodName, logGenPodName, []string{"/bin/sh", "-c", cmd})
400+
if err != nil {
401+
Fail(fmt.Sprintf("Error execution write command: %v", err))
402+
}
403+
return []string{rfc5425AppName, rfc3164AppName}
404+
}

test/framework/e2e/framework.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ func (tc *E2ETestFramework) DeployLogGeneratorWithNamespaceName(namespace, name
145145
}
146146

147147
// DeploySocat will deploy pod with socat software
148-
func (tc *E2ETestFramework) DeploySocat(namespace, name, forwarderName string, options LogGeneratorOptions) error {
149-
pod := testruntime.NewSocatPod(namespace, name, forwarderName, options.Labels)
148+
func (tc *E2ETestFramework) DeploySocat(namespace, name, forwarderName, receiverName string, options LogGeneratorOptions) error {
149+
pod := testruntime.NewSocatPod(namespace, name, forwarderName, receiverName, options.Labels)
150150
if err := tc.WaitForResourceCondition(namespace, "serviceaccount", "default", "", "{}", 10, func(string) (bool, error) { return true, nil }); err != nil {
151151
return err
152152
}

test/framework/e2e/lokistack.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,19 +447,25 @@ func (ls LokistackLogStore) GetApplicationLogsWithPipeline(saName, expression st
447447

448448
func (ls LokistackLogStore) HasApplicationLogs(saName string, timeToWait time.Duration) (bool, error) {
449449
query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeApplication)
450-
result, err := ls.QueryUntil(query, "", string(obs.InputTypeApplication), saName, 1, defaultTimeout)
450+
result, err := ls.QueryUntil(query, "", string(obs.InputTypeApplication), saName, 1, timeToWait)
451451
return len(result) > 0, errors.Wrap(err, "error determining if logstore has application logs")
452452
}
453453

454454
func (ls LokistackLogStore) HasInfrastructureLogs(saName string, timeToWait time.Duration) (bool, error) {
455-
query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeInfrastructure)
456-
result, err := ls.QueryUntil(query, "", string(obs.InputTypeInfrastructure), saName, 1, defaultTimeout)
455+
result, err := ls.InfrastructureLogs(saName, timeToWait, 1)
457456
return len(result) > 0, errors.Wrap(err, "error determining if logstore has infrastructure logs")
458457
}
459458

459+
func (ls LokistackLogStore) InfrastructureLogs(saName string, timeToWait time.Duration, limit int) ([]lokitesthelper.StreamValues, error) {
460+
query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeInfrastructure)
461+
result, err := ls.QueryUntil(query, "", string(obs.InputTypeInfrastructure), saName, limit, timeToWait)
462+
clolog.V(3).Info("Loki Query", "result", test.JSONString(result))
463+
return result, errors.Wrap(err, "error determining if logstore has infrastructure logs")
464+
}
465+
460466
func (ls LokistackLogStore) HasAuditLogs(saName string, timeToWait time.Duration) (bool, error) {
461467
query := fmt.Sprintf(`{log_type=%q}`, obs.InputTypeAudit)
462-
result, err := ls.QueryUntil(query, "", string(obs.InputTypeAudit), saName, 1, defaultTimeout)
468+
result, err := ls.QueryUntil(query, "", string(obs.InputTypeAudit), saName, 1, timeToWait)
463469
return len(result) > 0, errors.Wrap(err, "error determining if logstore has audit logs")
464470
}
465471

test/runtime/log_generator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,10 @@ func NewCURLLogGenerator(namespace, name, endpoint string, count int, delay time
9494
}
9595

9696
// NewSocatPod creates pods with socat software which allow advance network call e.g. syslog message
97-
func NewSocatPod(namespace, name, forwarderName string, labels map[string]string) *corev1.Pod {
97+
func NewSocatPod(namespace, name, forwarderName, receiverName string, labels map[string]string) *corev1.Pod {
9898
var containers []corev1.Container
9999
containerName := name
100-
100+
commonName := fmt.Sprintf("%s-%s", forwarderName, receiverName)
101101
containers = append(containers, corev1.Container{
102102
Name: containerName,
103103
Image: "quay.io/openshift-logging/alpine-socat:1.8.0.0",
@@ -109,7 +109,7 @@ func NewSocatPod(namespace, name, forwarderName string, labels map[string]string
109109
},
110110
},
111111
VolumeMounts: []corev1.VolumeMount{{
112-
Name: fmt.Sprintf("%s-syslog", forwarderName),
112+
Name: commonName,
113113
ReadOnly: true,
114114
MountPath: "/etc/collector/syslog",
115115
}},
@@ -127,9 +127,9 @@ func NewSocatPod(namespace, name, forwarderName string, labels map[string]string
127127
pod.Labels[k] = v
128128
}
129129
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
130-
Name: fmt.Sprintf("%s-syslog", forwarderName), VolumeSource: corev1.VolumeSource{
130+
Name: commonName, VolumeSource: corev1.VolumeSource{
131131
Secret: &corev1.SecretVolumeSource{
132-
SecretName: fmt.Sprintf("%s-syslog", forwarderName),
132+
SecretName: commonName,
133133
},
134134
},
135135
})

0 commit comments

Comments
 (0)