Skip to content

Commit c4a5487

Browse files
committed
add error messages from syslog drains to the app log stream
add error messages from filtered binding fetcher to the app log stream rename LoggregatorEmitter to AppLogEmitter change format of NewFilteredBindingFetcher calls rename references to AppLogEmitter move spyLogClient to testhelper package use grpc port for ingress client from config add comment rearrange imports introduce AppLogEmitterFactory remove AppLogEmitter interface and refactor to struct rename factory rename NewAppLogEmitterFactory method
1 parent cd0efca commit c4a5487

16 files changed

+353
-162
lines changed

src/cmd/syslog-agent/app/syslog_agent.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,8 @@ func NewSyslogAgent(
5656
cfg Config,
5757
m Metrics,
5858
l *log.Logger,
59+
appLogEmitterFactory syslog.AppLogEmitterFactory,
5960
) *SyslogAgent {
60-
internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
61-
writerFactory := syslog.NewWriterFactory(
62-
internalTlsConfig,
63-
externalTlsConfig,
64-
syslog.NetworkTimeoutConfig{
65-
Keepalive: 10 * time.Second,
66-
DialTimeout: 10 * time.Second,
67-
WriteTimeout: 10 * time.Second,
68-
},
69-
m,
70-
)
71-
7261
ingressTLSConfig, err := loggregator.NewIngressTLSConfig(
7362
cfg.GRPC.CAFile,
7463
cfg.GRPC.CertFile,
@@ -81,17 +70,25 @@ func NewSyslogAgent(
8170
logClient, err := loggregator.NewIngressClient(
8271
ingressTLSConfig,
8372
loggregator.WithLogger(log.New(os.Stderr, "", log.LstdFlags)),
73+
loggregator.WithAddr(fmt.Sprintf("127.0.0.1:%d", cfg.GRPC.Port)),
8474
)
8575
if err != nil {
8676
l.Panicf("failed to create log client for syslog connector: %q", err)
8777
}
8878

79+
internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
80+
writerFactory := syslog.NewWriterFactory(internalTlsConfig, externalTlsConfig, syslog.NetworkTimeoutConfig{
81+
Keepalive: 10 * time.Second,
82+
DialTimeout: 10 * time.Second,
83+
WriteTimeout: 10 * time.Second,
84+
}, m)
85+
8986
connector := syslog.NewSyslogConnector(
9087
cfg.DrainSkipCertVerify,
9188
timeoutwaitgroup.New(time.Minute),
9289
writerFactory,
9390
m,
94-
syslog.WithLogClient(logClient, "syslog_agent"),
91+
syslog.WithAppLogEmitter(appLogEmitterFactory.NewAppLogEmitter(logClient, "syslog_agent")),
9592
)
9693

9794
var cacheClient *cache.CacheClient
@@ -112,6 +109,7 @@ func NewSyslogAgent(
112109
m,
113110
cfg.WarnOnInvalidDrains,
114111
l,
112+
appLogEmitterFactory.NewAppLogEmitter(logClient, "syslog_agent"),
115113
)
116114
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
117115
}

src/cmd/syslog-agent/app/syslog_agent_mtls_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package app_test
22

33
import (
4+
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
45
"context"
56
"encoding/json"
67
"fmt"
@@ -154,7 +155,9 @@ var _ = Describe("SyslogAgent with mTLS", func() {
154155
agentCfg.Cache.PollingInterval = 10 * time.Millisecond
155156
}
156157

157-
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr)
158+
factory := syslog.NewAppLogEmitterFactory()
159+
160+
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &factory)
158161
go agent.Run()
159162
})
160163

@@ -216,6 +219,7 @@ var _ = Describe("SyslogAgent with mTLS", func() {
216219
})
217220

218221
It("will not be able to connect with those drains", func() {
222+
//todo check if pr check runs
219223
ctx, cancel := context.WithCancel(context.Background())
220224
emitLogs(ctx, appIDs, grpcPort, agentCerts)
221225
defer cancel()

src/cmd/syslog-agent/app/syslog_agent_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package app_test
22

33
import (
4+
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
45
"context"
56
"crypto/tls"
67
"fmt"
@@ -48,6 +49,8 @@ var _ = Describe("SyslogAgent", func() {
4849
agentMetrics *metricsHelpers.SpyMetricsRegistry
4950
agentLogr *log.Logger
5051
agent *app.SyslogAgent
52+
53+
factory syslog.AppLogEmitterFactory
5154
)
5255

5356
BeforeEach(func() {
@@ -134,7 +137,9 @@ var _ = Describe("SyslogAgent", func() {
134137
agentCfg.Cache.PollingInterval = 10 * time.Millisecond
135138
}
136139

137-
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr)
140+
factory := syslog.NewAppLogEmitterFactory()
141+
142+
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &factory)
138143
go agent.Run()
139144
})
140145

@@ -238,6 +243,14 @@ var _ = Describe("SyslogAgent", func() {
238243
Eventually(agentMetrics.GetDebugMetricsEnabled).Should(BeFalse())
239244
})
240245

246+
It("configures appLogEmitter", func() {
247+
spyFactory := testhelper.SpyAppLogEmitterFactory{}
248+
app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &spyFactory)
249+
250+
Expect(spyFactory.SourceIndex()).Should(Equal("syslog_agent"))
251+
Expect(spyFactory.LogClient()).ShouldNot(BeNil())
252+
})
253+
241254
Context("when debug configuration is enabled", func() {
242255
BeforeEach(func() {
243256
agentCfg.MetricsServer.DebugMetrics = true
@@ -423,7 +436,7 @@ var _ = Describe("SyslogAgent", func() {
423436
cfgCopy.GRPC.KeyFile = "invalid"
424437

425438
msg := `failed to configure client TLS: "failed to load keypair: open invalid: no such file or directory"`
426-
Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr) }).To(PanicWith(msg))
439+
Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr, factory) }).To(PanicWith(msg))
427440
})
428441
})
429442
})

src/cmd/syslog-agent/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
45
"log"
56
_ "net/http/pprof" //nolint:gosec
67
"os"
@@ -33,5 +34,7 @@ func main() {
3334
),
3435
)
3536

36-
app.NewSyslogAgent(cfg, m, logger).Run()
37+
factory := syslog.NewAppLogEmitterFactory()
38+
39+
app.NewSyslogAgent(cfg, m, logger, &factory).Run()
3740
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package testhelper
2+
3+
import "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
4+
5+
type SpyAppLogEmitterFactory struct {
6+
logClient syslog.LogClient
7+
sourceIndex string
8+
}
9+
10+
func (factory *SpyAppLogEmitterFactory) LogClient() syslog.LogClient {
11+
return factory.logClient
12+
}
13+
14+
func (factory *SpyAppLogEmitterFactory) SourceIndex() string {
15+
return factory.sourceIndex
16+
}
17+
18+
func (factory *SpyAppLogEmitterFactory) NewAppLogEmitter(logClient syslog.LogClient, sourceIndex string) syslog.AppLogEmitter {
19+
factory.logClient = logClient
20+
factory.sourceIndex = sourceIndex
21+
emitterFactory := syslog.NewAppLogEmitterFactory()
22+
return emitterFactory.NewAppLogEmitter(logClient, sourceIndex)
23+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package testhelper
2+
3+
import (
4+
"code.cloudfoundry.org/go-loggregator/v10"
5+
v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
6+
"sync"
7+
)
8+
9+
type spyLogClient struct {
10+
mu sync.Mutex
11+
_message []string
12+
_appID []string
13+
14+
// We use maps to ensure that we can query the keys
15+
_sourceType map[string]struct{}
16+
_sourceInstance map[string]struct{}
17+
}
18+
19+
func NewSpyLogClient() *spyLogClient {
20+
return &spyLogClient{
21+
_sourceType: make(map[string]struct{}),
22+
_sourceInstance: make(map[string]struct{}),
23+
}
24+
}
25+
26+
func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) {
27+
s.mu.Lock()
28+
defer s.mu.Unlock()
29+
30+
env := &v2.Envelope{
31+
Tags: make(map[string]string),
32+
}
33+
34+
for _, o := range opts {
35+
o(env)
36+
}
37+
38+
s._message = append(s._message, message)
39+
s._appID = append(s._appID, env.SourceId)
40+
s._sourceType[env.GetTags()["source_type"]] = struct{}{}
41+
s._sourceInstance[env.GetInstanceId()] = struct{}{}
42+
}
43+
44+
func (s *spyLogClient) Message() []string {
45+
s.mu.Lock()
46+
defer s.mu.Unlock()
47+
48+
return s._message
49+
}
50+
51+
func (s *spyLogClient) AppID() []string {
52+
s.mu.Lock()
53+
defer s.mu.Unlock()
54+
55+
return s._appID
56+
}
57+
58+
func (s *spyLogClient) SourceType() map[string]struct{} {
59+
s.mu.Lock()
60+
defer s.mu.Unlock()
61+
62+
// Copy map so the orig does not escape the mutex and induce a race.
63+
m := make(map[string]struct{})
64+
for k := range s._sourceType {
65+
m[k] = struct{}{}
66+
}
67+
68+
return m
69+
}
70+
71+
func (s *spyLogClient) SourceInstance() map[string]struct{} {
72+
s.mu.Lock()
73+
defer s.mu.Unlock()
74+
75+
// Copy map so the orig does not escape the mutex and induce a race.
76+
m := make(map[string]struct{})
77+
for k := range s._sourceInstance {
78+
m[k] = struct{}{}
79+
}
80+
81+
return m
82+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package syslog
2+
3+
import (
4+
"code.cloudfoundry.org/go-loggregator/v10"
5+
)
6+
7+
// LogClient is used to emit logs.
8+
type LogClient interface {
9+
EmitLog(message string, opts ...loggregator.EmitLogOption)
10+
}
11+
12+
// AppLogEmitter abstracts the sending of a log to the application log stream.
13+
type AppLogEmitter struct {
14+
logClient LogClient
15+
sourceIndex string
16+
}
17+
18+
// EmitLog writes a message in the application log stream using a LogClient.
19+
func (appLogEmitter *AppLogEmitter) EmitLog(appID string, message string) {
20+
if appLogEmitter.logClient == nil || appID == "" {
21+
return
22+
}
23+
24+
option := loggregator.WithAppInfo(appID, "LGR", "")
25+
appLogEmitter.logClient.EmitLog(message, option)
26+
27+
option = loggregator.WithAppInfo(
28+
appID,
29+
"SYS",
30+
appLogEmitter.sourceIndex,
31+
)
32+
appLogEmitter.logClient.EmitLog(message, option)
33+
}
34+
35+
// AppLogEmitterFactory is used to create new instances of AppLogEmitter
36+
type AppLogEmitterFactory interface {
37+
NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter
38+
}
39+
40+
// DefaultAppLogEmitterFactory implementation of AppLogEmitterFactory to produce DefaultAppLogEmitter.
41+
type DefaultAppLogEmitterFactory struct {
42+
}
43+
44+
// NewAppLogEmitter creates a new AppLogEmitter.
45+
func (factory *DefaultAppLogEmitterFactory) NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter {
46+
return AppLogEmitter{
47+
logClient: logClient,
48+
sourceIndex: sourceIndex,
49+
}
50+
}
51+
52+
func NewAppLogEmitterFactory() DefaultAppLogEmitterFactory {
53+
return DefaultAppLogEmitterFactory{}
54+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package syslog_test
2+
3+
import (
4+
"code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper"
5+
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
var _ = Describe("Loggregator Emitter", func() {
11+
Describe("DefaultAppLogEmitter", func() {
12+
It("emits a log message", func() {
13+
logClient := testhelper.NewSpyLogClient()
14+
factory := syslog.NewAppLogEmitterFactory()
15+
emitter := factory.NewAppLogEmitter(logClient, "0")
16+
17+
emitter.EmitLog("app-id", "some-message")
18+
19+
messages := logClient.Message()
20+
appIDs := logClient.AppID()
21+
sourceTypes := logClient.SourceType()
22+
Expect(messages).To(HaveLen(2))
23+
Expect(messages[0]).To(Equal("some-message"))
24+
Expect(messages[1]).To(Equal("some-message"))
25+
Expect(appIDs[0]).To(Equal("app-id"))
26+
Expect(appIDs[1]).To(Equal("app-id"))
27+
Expect(sourceTypes).To(HaveKey("LGR"))
28+
Expect(sourceTypes).To(HaveKey("SYS"))
29+
})
30+
31+
It("does not emit a log message if the appID is empty", func() {
32+
logClient := testhelper.NewSpyLogClient()
33+
factory := syslog.NewAppLogEmitterFactory()
34+
emitter := factory.NewAppLogEmitter(logClient, "0")
35+
36+
emitter.EmitLog("", "some-message")
37+
38+
messages := logClient.Message()
39+
appIDs := logClient.AppID()
40+
sourceTypes := logClient.SourceType()
41+
Expect(messages).To(HaveLen(0))
42+
Expect(appIDs).To(HaveLen(0))
43+
Expect(sourceTypes).ToNot(HaveKey("LGR"))
44+
Expect(sourceTypes).ToNot(HaveKey("SYS"))
45+
})
46+
})
47+
48+
Describe("DefaultAppLogEmitterFactory", func() {
49+
It("produces a AppLogEmitter", func() {
50+
factory := syslog.NewAppLogEmitterFactory()
51+
logClient := testhelper.NewSpyLogClient()
52+
sourceIndex := "test-index"
53+
54+
emitter := factory.NewAppLogEmitter(logClient, sourceIndex)
55+
emitter.EmitLog("app-id", "some-message")
56+
57+
messages := logClient.Message()
58+
appIDs := logClient.AppID()
59+
sourceTypes := logClient.SourceType()
60+
sourceInstance := logClient.SourceInstance()
61+
Expect(messages).To(HaveLen(2))
62+
Expect(messages[0]).To(Equal("some-message"))
63+
Expect(messages[1]).To(Equal("some-message"))
64+
Expect(appIDs[0]).To(Equal("app-id"))
65+
Expect(appIDs[1]).To(Equal("app-id"))
66+
Expect(sourceTypes).To(HaveKey("LGR"))
67+
Expect(sourceTypes).To(HaveKey("SYS"))
68+
Expect(sourceInstance).To(HaveKey(""))
69+
Expect(sourceInstance).To(HaveKey("test-index"))
70+
})
71+
})
72+
})

0 commit comments

Comments
 (0)