Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/generator/vector/output/lokistack/init_lokiStack.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ func GenerateOutput(outSpec obs.OutputSpec, tenant string) obs.OutputSpec {

// GenerateLokiSpec generates and returns a Loki spec for the defined lokistack output
func GenerateLokiSpec(ls *obs.LokiStack, tenant string) *obs.Loki {
url := lokiStackURL(ls, tenant, false)
if url == "" {
panic("LokiStack output has no valid URL")
}
return &obs.Loki{
URLSpec: obs.URLSpec{
URL: lokiStackURL(ls, tenant, false),
URL: url,
},
Authentication: &obs.HTTPAuthentication{
Token: ls.Authentication.Token,
Expand Down
46 changes: 46 additions & 0 deletions internal/generator/vector/output/lokistack/init_lokiStack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,49 @@ var _ = Describe("#GenerateOutput", func() {
),
)
})

var _ = Describe("#GenerateLokiSpec", func() {
var (
lokiStack *obs.LokiStack
)

BeforeEach(func() {
lokiStack = &obs.LokiStack{
Target: obs.LokiStackTarget{
Name: "test-lokistack",
Namespace: constants.OpenshiftNS,
},
Authentication: &obs.LokiStackAuthentication{
Token: &obs.BearerToken{
From: obs.BearerTokenFromServiceAccount,
},
},
}
})

Context("when given a valid tenant", func() {
It("should return a valid Loki spec", func() {
var spec *obs.Loki
Expect(func() {
spec = GenerateLokiSpec(lokiStack, string(obs.InputTypeApplication))
}).ToNot(Panic())

Expect(spec).ToNot(BeNil())
Expect(spec.URL).To(Equal("https://test-lokistack-gateway-http.openshift-logging.svc:8080/api/logs/v1/application"))
Expect(spec.Authentication.Token).To(Equal(lokiStack.Authentication.Token))
})
})

Context("when given an invalid tenant", func() {
DescribeTable("should panic with a specific message",
func(tenant string) {
Expect(func() {
GenerateLokiSpec(lokiStack, tenant)
}).To(PanicWith("LokiStack output has no valid URL"))
},
Entry("with an empty tenant", ""),
Entry("with a non-reserved tenant name", "my-custom-logs"),
Entry("with the 'receiver' tenant name", "receiver"),
)
})
})
125 changes: 95 additions & 30 deletions internal/generator/vector/output/lokistack/lokistack.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,112 @@ import (

// New creates generate elements that represent configuration to forward logs to Loki using OpenShift Logging tenancy model
func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, strategy common.ConfigStrategy, op utils.Options) []framework.Element {
routeID := vectorhelpers.MakeID(id, "route")
routes := map[string]string{}
var clfSpec, _ = utils.GetOption(op, vectorhelpers.CLFSpec, observability.ClusterLogForwarderSpec{})
clfSpec, _ := utils.GetOption(op, vectorhelpers.CLFSpec, observability.ClusterLogForwarderSpec{})
if len(clfSpec.Inputs) == 0 || len(clfSpec.Pipelines) == 0 || len(clfSpec.Outputs) == 0 {
panic("ClusterLogForwarderSpec not found while generating LokiStack config")
}

inputSpecs := clfSpec.InputSpecsTo(o)
inputTypes := sets.NewString()
for _, inputSpec := range inputSpecs {
inputType := strings.ToLower(inputSpec.Type.String())
inputTypes.Insert(inputType)
routes[inputType] = fmt.Sprintf("'.log_type == \"%s\"'", inputType)
if inputSpec.Type == obs.InputTypeApplication && observability.IncludesInfraNamespace(inputSpec.Application) {
inputType = strings.ToLower(obs.InputTypeInfrastructure.String())
routes[inputType] = fmt.Sprintf("'.log_type == \"%s\"'", inputType)
inputTypes.Insert(inputType)
}
}
tenants := determineTenants(inputSpecs)

routeID := vectorhelpers.MakeID(id, "route")
confs := []framework.Element{
elements.Route{
ComponentID: routeID,
Inputs: vectorhelpers.MakeInputs(inputs...),
Routes: routes,
Routes: buildRoutes(tenants),
},
}
confs = append(confs, elements.NewUnmatched(routeID, op, map[string]string{"output_type": strings.ToLower(obs.OutputTypeLokiStack.String())}))
for _, inputType := range inputTypes.List() {
outputID := vectorhelpers.MakeID(id, inputType)
migratedOutput := GenerateOutput(o, inputType)
log.V(4).Info("migrated lokistack output", "spec", migratedOutput)
factory := loki.New
if migratedOutput.Type == obs.OutputTypeOTLP {
factory = otlp.New
}
inputSources := observability.Inputs(inputSpecs).InputSources(obs.InputType(inputType))
if len(inputSources) == 0 && obs.InputType(inputType) == obs.InputTypeInfrastructure {
inputSources = append(inputSources, observability.ReservedInfrastructureSources.List()...)
}
op[otlp.OtlpLogSourcesOption] = inputSources
confs = append(confs, factory(outputID, migratedOutput, []string{vectorhelpers.MakeRouteInputID(routeID, inputType)}, secrets, strategy, op)...)

confs = append(confs, elements.NewUnmatched(routeID, op, map[string]string{
"output_type": strings.ToLower(obs.OutputTypeLokiStack.String()),
}))

for _, inputType := range tenants.List() {
confs = append(confs, generateSinkForTenant(id, routeID, inputType, o, inputSpecs, secrets, strategy, op)...)
}

return confs
}

func determineTenants(inputSpecs []obs.InputSpec) *sets.String {
tenants := sets.NewString()

for _, inputSpec := range inputSpecs {
switch inputSpec.Type {
case obs.InputTypeApplication:
tenants.Insert(string(obs.InputTypeApplication))
if observability.IncludesInfraNamespace(inputSpec.Application) {
tenants.Insert(string(obs.InputTypeInfrastructure))
}
case obs.InputTypeAudit:
tenants.Insert(string(obs.InputTypeAudit))
case obs.InputTypeInfrastructure:
tenants.Insert(string(obs.InputTypeInfrastructure))
case obs.InputTypeReceiver:
tenants.Insert(getTenantForReceiver(inputSpec.Receiver.Type))
}
}

return tenants
}

func getTenantForReceiver(receiverType obs.ReceiverType) string {
if receiverType == obs.ReceiverTypeHTTP {
return string(obs.InputTypeAudit)
}
return string(obs.InputTypeInfrastructure)
}

func buildRoutes(tenants *sets.String) map[string]string {
routes := make(map[string]string, tenants.Len())
for _, tenant := range tenants.List() {
routes[tenant] = fmt.Sprintf("'.log_type == \"%s\"'", tenant)
}
return routes
}

func generateSinkForTenant(id, routeID, inputType string, o obs.OutputSpec, inputSpecs []obs.InputSpec,
secrets observability.Secrets, strategy common.ConfigStrategy, op utils.Options) []framework.Element {

outputID := vectorhelpers.MakeID(id, inputType)
migratedOutput := GenerateOutput(o, inputType)
log.V(4).Info("migrated lokistack output", "spec", migratedOutput)

factoryInput := vectorhelpers.MakeRouteInputID(routeID, inputType)

if migratedOutput.Type == obs.OutputTypeOTLP {
op[otlp.OtlpLogSourcesOption] = getInputSources(inputSpecs, obs.InputType(inputType))
return otlp.New(outputID, migratedOutput, []string{factoryInput}, secrets, strategy, op)
}

return loki.New(outputID, migratedOutput, []string{factoryInput}, secrets, strategy, op)
}

func getInputSources(inputSpecs []obs.InputSpec, inputType obs.InputType) []string {
inputSources := observability.Inputs(inputSpecs).InputSources(inputType)

if len(inputSources) == 0 && inputType == obs.InputTypeInfrastructure {
inputSources = append(inputSources, observability.ReservedInfrastructureSources.List()...)
}

addReceiverSources(&inputSources, inputSpecs, inputType)

return inputSources
}

func addReceiverSources(inputSources *[]string, inputSpecs []obs.InputSpec, inputType obs.InputType) {
for _, is := range inputSpecs {
if is.Type != obs.InputTypeReceiver {
continue
}

if inputType == obs.InputTypeAudit && is.Receiver.Type == obs.ReceiverTypeHTTP {
*inputSources = append(*inputSources, "receiver.http")
}

if inputType == obs.InputTypeInfrastructure && is.Receiver.Type == obs.ReceiverTypeSyslog {
*inputSources = append(*inputSources, "receiver.syslog")
}
}
}
34 changes: 34 additions & 0 deletions internal/generator/vector/output/lokistack/lokistack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,39 @@ var _ = Describe("Generate vector config", func() {
}),
}
}

initReceiverOptions = func() utils.Options {
output := initOutput()
return utils.Options{
framework.OptionServiceAccountTokenSecretName: saTokenSecretName,
helpers.CLFSpec: observability.ClusterLogForwarderSpec(obs.ClusterLogForwarderSpec{
Outputs: []obs.OutputSpec{output},
Pipelines: []obs.PipelineSpec{
{
Name: "lokistack-receivers",
InputRefs: []string{"http-receiver", "syslog-receiver"},
OutputRefs: []string{output.Name},
},
},
Inputs: []obs.InputSpec{
{
Name: "http-receiver",
Type: obs.InputTypeReceiver,
Receiver: &obs.ReceiverSpec{
Type: obs.ReceiverTypeHTTP,
},
},
{
Name: "syslog-receiver",
Type: obs.InputTypeReceiver,
Receiver: &obs.ReceiverSpec{
Type: obs.ReceiverTypeSyslog,
},
},
},
}),
}
}
)
DescribeTable("for LokiStack output", func(expFile string, op framework.Options, tune bool, visit func(spec *obs.OutputSpec)) {
exp, err := tomlContent.ReadFile(expFile)
Expand All @@ -118,5 +151,6 @@ var _ = Describe("Generate vector config", func() {
Entry("with Otel datamodel", "lokistack_otel.toml", initOptions(), false, func(spec *obs.OutputSpec) {
spec.LokiStack.DataModel = obs.LokiStackDataModelOpenTelemetry
}),
Entry("with ViaQ datamodel with receiver", "lokistack_viaq_receiver.toml", initReceiverOptions(), false, func(spec *obs.OutputSpec) {}),
)
})
Loading