diff --git a/cmd/client/main.go b/cmd/client/main.go index efe6de2..bed9ac7 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -27,12 +27,15 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" + "sync" "time" "github.com/Showmax/go-fqdn" "github.com/alecthomas/kingpin/v2" "github.com/cenkalti/backoff/v4" + "github.com/fsnotify/fsnotify" "github.com/prometheus-community/pushprox/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -41,13 +44,13 @@ import ( ) var ( - myFqdn = kingpin.Flag("fqdn", "FQDN to register with").Default(fqdn.Get()).String() - proxyURL = kingpin.Flag("proxy-url", "Push proxy to talk to.").Required().String() - caCertFile = kingpin.Flag("tls.cacert", " CA certificate to verify peer against").String() - tlsCert = kingpin.Flag("tls.cert", " Client certificate file").String() - tlsKey = kingpin.Flag("tls.key", " Private key file").String() - metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String() - + myFqdn = kingpin.Flag("fqdn", "FQDN to register with").Default(fqdn.Get()).String() + proxyURL = kingpin.Flag("proxy-url", "Push proxy to talk to.").Required().String() + caCertFile = kingpin.Flag("tls.cacert", " CA certificate to verify peer against").String() + tlsCert = kingpin.Flag("tls.cert", " Client certificate file").String() + tlsKey = kingpin.Flag("tls.key", " Private key file").String() + metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String() + bearerTokenPath = kingpin.Flag("bearer-token-path", " Path to file containing bearer token to authenticate requests").String() retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration() retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration() ) @@ -71,6 +74,10 @@ var ( Help: "Number of poll errors", }, ) + // bearerToken holds the current token string used for authentication. + // Access must be synchronized to avoid race conditions between the watcher and HTTP handlers. + bearerToken string + bearerTokenMutex sync.RWMutex ) func init() { @@ -114,6 +121,14 @@ func (c *Coordinator) doScrape(request *http.Request, client *http.Client) { c.handleErr(request, client, err) return } + bearerTokenMutex.RLock() + token := bearerToken + bearerTokenMutex.RUnlock() + + if token != "" { + request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } + ctx, cancel := context.WithTimeout(request.Context(), timeout) defer cancel() request = request.WithContext(ctx) @@ -225,6 +240,56 @@ func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) { } } +// I need to automatically reload the bearer token if it changes +// i.e. if it a short lived kubernetes token one +func watchBearerTokenFile(path string, logger *slog.Logger) { + loadBearerToken := func() { + tokenBytes, err := os.ReadFile(path) + if err != nil { + logger.Error("Failed to read bearer token", "path", path, "err", err) + return + } + bearerTokenMutex.Lock() + bearerToken = strings.TrimSpace(string(tokenBytes)) // also trim spaces/newlines + bearerTokenMutex.Unlock() + logger.Info("Bearer token loaded from file", "path", path) + } + + loadBearerToken() // initial load + + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Error("Failed to create fsnotify watcher", "err", err) + os.Exit(1) + } + defer watcher.Close() + + tokenDir := filepath.Dir(path) + if err := watcher.Add(tokenDir); err != nil { + logger.Error("Failed to watch token directory", "dir", tokenDir, "err", err) + os.Exit(1) + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if event.Name == path && + (event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create) { + logger.Info("Bearer token file changed, reloading", "event", event) + loadBearerToken() + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.Warn("fsnotify error", "err", err) + } + } +} + func main() { promslogConfig := promslog.Config{} flag.AddFlags(kingpin.CommandLine, &promslogConfig) @@ -276,6 +341,11 @@ func main() { }() } + // Set bearer token based on path + if *bearerTokenPath != "" { + go watchBearerTokenFile(*bearerTokenPath, coordinator.logger) + } + transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ diff --git a/cmd/client/main_test.go b/cmd/client/main_test.go index 2625461..185f4ce 100644 --- a/cmd/client/main_test.go +++ b/cmd/client/main_test.go @@ -14,35 +14,104 @@ package main import ( + "bytes" "errors" "fmt" + "io" "net/http" "net/http/httptest" + "net/url" + "os" + "strings" + "sync" "testing" + "time" + "github.com/cenkalti/backoff/v4" "github.com/prometheus/common/promslog" ) func prepareTest() (*httptest.Server, Coordinator) { + // This test server acts as the proxyURL ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w, "GET /index.html HTTP/1.0\n\nOK") + switch r.URL.Path { + case "/poll": + // On /poll, respond with an HTTP request serialized in the body + var buf bytes.Buffer + req, _ := http.NewRequest("GET", fmt.Sprintf("http://%s/", *myFqdn), nil) + req.Header.Set("id", "test-scrape-id") + req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10") + req.Write(&buf) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(buf.Bytes()) + case "/push": + // Accept pushed scrape results, just respond OK + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } })) + c := Coordinator{logger: promslog.NewNopLogger()} - *proxyURL = ts.URL + *proxyURL = ts.URL + "/" + *myFqdn = "test.local" // Set fqdn to test.local for matching hostnames + return ts, c } -func TestDoScrape(t *testing.T) { +func TestDoScrape_Success(t *testing.T) { ts, c := prepareTest() defer ts.Close() - req, err := http.NewRequest("GET", ts.URL, nil) + // Setup a test target server that will be scraped by doScrape + targetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify Authorization header if set + auth := r.Header.Get("Authorization") + if auth != "" && auth != "Bearer dummy-token" { + t.Errorf("unexpected Authorization header: %s", auth) + } + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "OK") + })) + defer targetServer.Close() + + // Override myFqdn to match targetServer hostname + u, err := url.Parse(targetServer.URL) + if err != nil { + t.Fatal(err) + } + *myFqdn = u.Hostname() + + // Prepare a scrape request targeting the test target server + req, err := http.NewRequest("GET", targetServer.URL, nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("id", "scrape-id-123") + req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10") + + // Set bearerToken for authorization testing + bearerTokenMutex.Lock() + bearerToken = "dummy-token" + bearerTokenMutex.Unlock() + + c.doScrape(req, targetServer.Client()) +} + +func TestDoScrape_FailWrongFQDN(t *testing.T) { + ts, c := prepareTest() + defer ts.Close() + + req, err := http.NewRequest("GET", "http://wronghost.local", nil) if err != nil { t.Fatal(err) } - req.Header.Add("X-Prometheus-Scrape-Timeout-Seconds", "10.0") - *myFqdn = ts.URL + req.Header.Set("id", "fail-id") + req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10") + + // This should cause handleErr due to fqdn mismatch c.doScrape(req, ts.Client()) } @@ -57,10 +126,140 @@ func TestHandleErr(t *testing.T) { c.handleErr(req, ts.Client(), errors.New("test error")) } -func TestLoop(t *testing.T) { +func TestDoPush_ErrorOnInvalidProxyURL(t *testing.T) { + c := Coordinator{logger: promslog.NewNopLogger()} + *proxyURL = "http://%41:8080" // invalid URL (percent-encoding issue) + + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("test")), + Header: http.Header{}, + } + req, _ := http.NewRequest("GET", "http://example.com", nil) + err := c.doPush(resp, req, http.DefaultClient) + if err == nil { + t.Errorf("expected error on invalid proxy URL, got nil") + } +} + +func TestDoPoll(t *testing.T) { ts, c := prepareTest() defer ts.Close() - if err := c.doPoll(ts.Client()); err != nil { + + err := c.doPoll(ts.Client()) + if err != nil { + t.Fatalf("doPoll failed: %v", err) + } +} + +func TestLoopWithBackoff(t *testing.T) { + var count int + var mu sync.Mutex + done := make(chan struct{}) + var once sync.Once + + bo := backoffForTest(3) + + go func() { + err := backoff.RetryNotify(func() error { + mu.Lock() + defer mu.Unlock() + count++ + if count > 2 { + // safe close + once.Do(func() { close(done) }) + return errors.New("forced error to stop retry") + } + return errors.New("temporary error") + }, bo, func(err error, d time.Duration) { + // No-op + }) + + if err != nil { + // safe even if already closed + once.Do(func() { close(done) }) + } + }() + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("loop test timed out") + } +} + +func backoffForTest(maxRetries int) backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.InitialInterval = 1 * time.Millisecond + b.MaxInterval = 5 * time.Millisecond + b.MaxElapsedTime = 10 * time.Millisecond + return backoff.WithMaxRetries(b, uint64(maxRetries)) +} + +func TestWatchBearerTokenFile(t *testing.T) { + // This function is hard to test fully without fsnotify events, + // but we can test the initial loading of the token file. + + // Create a temporary file with a token + tmpfile := t.TempDir() + "/tokenfile" + tokenContent := "file-token\n" + if err := os.WriteFile(tmpfile, []byte(tokenContent), 0600); err != nil { t.Fatal(err) } + + logger := promslog.NewNopLogger() + + // Run watchBearerTokenFile in a goroutine; it will load token initially + go func() { + // This will block watching the directory, so we only wait shortly + watchBearerTokenFile(tmpfile, logger) + }() + + // Wait briefly for the token to load + time.Sleep(100 * time.Millisecond) + + bearerTokenMutex.RLock() + defer bearerTokenMutex.RUnlock() + if bearerToken != strings.TrimSpace(tokenContent) { + t.Errorf("expected bearer token %q, got %q", strings.TrimSpace(tokenContent), bearerToken) + } +} + +func TestBearerTokenHeader(t *testing.T) { + token := "dummy-token" + bearerTokenMutex.Lock() + bearerToken = token + bearerTokenMutex.Unlock() + + var receivedToken string + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedToken = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + // Ensure myFqdn matches the test server's hostname + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatal(err) + } + *myFqdn = u.Hostname() + + req, err := http.NewRequest("GET", ts.URL, nil) + if err != nil { + t.Fatal(err) + } + + // Set required headers for doScrape to accept this request + req.Header.Set("id", "token-test-id") + req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10") + + c := Coordinator{logger: promslog.NewNopLogger()} + c.doScrape(req, ts.Client()) + + expected := "Bearer dummy-token" + if receivedToken != expected { + t.Fatalf("expected %q, got %q", expected, receivedToken) + } } diff --git a/go.mod b/go.mod index 62f940f..f3b4fac 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,25 @@ module github.com/prometheus-community/pushprox -go 1.22 +go 1.23.11 require ( github.com/Showmax/go-fqdn v1.0.0 github.com/alecthomas/kingpin/v2 v2.4.0 github.com/cenkalti/backoff/v4 v4.3.0 + github.com/fsnotify/fsnotify v1.9.0 github.com/google/uuid v1.6.0 - github.com/prometheus/client_golang v1.21.0 - github.com/prometheus/common v0.62.0 + github.com/prometheus/client_golang v1.22.0 + github.com/prometheus/common v0.65.0 ) require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect - golang.org/x/sys v0.28.0 // indirect - google.golang.org/protobuf v1.36.1 // indirect + golang.org/x/sys v0.33.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/go.sum b/go.sum index fa43ba9..559bd46 100644 --- a/go.sum +++ b/go.sum @@ -13,24 +13,26 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= -github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= -github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -39,10 +41,10 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=