From 8ea5d6c64c954ee9b1ed04b96a6f61444692aa23 Mon Sep 17 00:00:00 2001 From: gunishmatta Date: Mon, 15 Aug 2022 05:28:56 +0000 Subject: [PATCH 1/2] Add support for blocking HTTP connections Signed-off-by: Gunish Matta Signed-off-by: gunishmatta --- controllers/event_handling_test.go | 70 +++++++++++++++++++++++++++++- internal/server/event_handlers.go | 17 ++++++++ internal/server/event_server.go | 4 +- main.go | 4 +- 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/controllers/event_handling_test.go b/controllers/event_handling_test.go index 8890b98cf..70826aa96 100644 --- a/controllers/event_handling_test.go +++ b/controllers/event_handling_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "time" @@ -52,7 +53,69 @@ func TestEventHandler(t *testing.T) { t.Fatalf("failed to create memory storage") } - eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true) + httpScheme := "http" + + eventServerTests := []struct { + name string + isHttpEnabled bool + url string + }{ + { + name: "http scheme is enabled", + isHttpEnabled: true, + }, { + name: "http scheme is disabled", + isHttpEnabled: false, + }, + } + for _, eventServerTest := range eventServerTests { + t.Run(eventServerTest.name, func(t *testing.T) { + + eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true, eventServerTest.isHttpEnabled) + + stopCh := make(chan struct{}) + go eventServer.ListenAndServe(stopCh, eventMdlw, store) + requestsReceived := 0 + rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestsReceived = requestsReceived + 1 + req = r + w.WriteHeader(200) + })) + defer rcvServer.Close() + defer close(stopCh) + + providerKey := types.NamespacedName{ + Name: fmt.Sprintf("provider-%s", randStringRunes(5)), + Namespace: namespace, + } + provider = ¬ifyv1.Provider{ + ObjectMeta: metav1.ObjectMeta{ + Name: providerKey.Name, + Namespace: providerKey.Namespace, + }, + Spec: notifyv1.ProviderSpec{ + Type: "generic", + Address: rcvServer.URL, + }, + } + + webhook_url, err := url.Parse(provider.Spec.Address) + + g.Expect(err).ToNot(HaveOccurred()) + + if eventServerTest.isHttpEnabled { + g.Expect(webhook_url.Scheme).To(Equal(httpScheme)) + g.Expect(requestsReceived).To(Equal(1)) + } else { + g.Expect(webhook_url.Scheme).ToNot(Equal(httpScheme)) + g.Expect(requestsReceived).To(Equal(0)) + } + + }) + } + + eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true, true) + stopCh := make(chan struct{}) go eventServer.ListenAndServe(stopCh, eventMdlw, store) @@ -77,6 +140,9 @@ func TestEventHandler(t *testing.T) { Address: rcvServer.URL, }, } + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(k8sClient.Create(context.Background(), provider)).To(Succeed()) g.Eventually(func() bool { var obj notifyv1.Provider @@ -173,6 +239,7 @@ func TestEventHandler(t *testing.T) { res, err := http.Post("http://localhost:56789/", "application/json", buf) g.Expect(err).ToNot(HaveOccurred()) g.Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted + } testForwarded := func() { @@ -294,4 +361,5 @@ func TestEventHandler(t *testing.T) { req = nil }) } + } diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index 3f94c77d0..cbf0ea768 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "net/http" + "net/url" "regexp" "strings" "time" @@ -243,6 +244,22 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) continue } + webhookUrl, err := url.Parse(webhook) + if err != nil { + s.logger.Error(nil, "Error parsing webhook url", + "reconciler kind", v1beta1.ProviderKind, + "name", providerName.Name, + "namespace", providerName.Namespace) + continue + } + + if !s.supportHttpScheme && webhookUrl.Scheme == "http" { + s.logger.Error(nil, "http scheme is blocked", + "reconciler kind", v1beta1.ProviderKind, + "name", providerName.Name, + "namespace", providerName.Namespace) + continue + } factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password) sender, err := factory.Notifier(provider.Spec.Type) if err != nil { diff --git a/internal/server/event_server.go b/internal/server/event_server.go index 96f0ea54b..bfeae7df5 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -44,15 +44,17 @@ type EventServer struct { logger logr.Logger kubeClient client.Client noCrossNamespaceRefs bool + supportHttpScheme bool } // NewEventServer returns an HTTP server that handles events -func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, noCrossNamespaceRefs bool) *EventServer { +func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, noCrossNamespaceRefs bool, supportHttpScheme bool) *EventServer { return &EventServer{ port: port, logger: logger.WithName("event-server"), kubeClient: kubeClient, noCrossNamespaceRefs: noCrossNamespaceRefs, + supportHttpScheme: supportHttpScheme, } } diff --git a/main.go b/main.go index 666309416..7db5c5f56 100644 --- a/main.go +++ b/main.go @@ -72,6 +72,7 @@ func main() { leaderElectionOptions leaderelection.Options aclOptions acl.Options rateLimiterOptions helper.RateLimiterOptions + insecureAllowHTTP bool ) flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") @@ -82,6 +83,7 @@ func main() { flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true, "Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.") flag.DurationVar(&rateLimitInterval, "rate-limit-interval", 5*time.Minute, "Interval in which rate limit has effect.") + flag.BoolVar(&insecureAllowHTTP, "insecure-allow-http", true, "Enable the use of HTTP Scheme (no HTTPS) across all controller level objects. This is not recommended for production environments") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) leaderElectionOptions.BindFlags(flag.CommandLine) @@ -169,7 +171,7 @@ func main() { Registry: crtlmetrics.Registry, }), }) - eventServer := server.NewEventServer(eventsAddr, log, mgr.GetClient(), aclOptions.NoCrossNamespaceRefs) + eventServer := server.NewEventServer(eventsAddr, log, mgr.GetClient(), aclOptions.NoCrossNamespaceRefs, insecureAllowHTTP) go eventServer.ListenAndServe(ctx.Done(), eventMdlw, store) setupLog.Info("starting webhook receiver server", "addr", receiverAddr) From 5a06288bfc38b04638913e6b677d81853adb0cac Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Mon, 14 Nov 2022 17:11:59 +0000 Subject: [PATCH 2/2] Refactor tests for blocking HTTP connections - Use noopstore to disable throttling behaviour. - Fake k8s client to remove need of interacting with an envtest apiserver. - Replace HTTP Status Code magic numbers, with their respective constants. Signed-off-by: Paulo Gomes --- controllers/event_handling_test.go | 84 ++-------------- go.mod | 2 +- internal/server/event_handlers.go | 1 - internal/server/event_server.go | 2 +- internal/server/event_server_test.go | 144 +++++++++++++++++++++++++-- 5 files changed, 145 insertions(+), 88 deletions(-) diff --git a/controllers/event_handling_test.go b/controllers/event_handling_test.go index 70826aa96..26d9d1a30 100644 --- a/controllers/event_handling_test.go +++ b/controllers/event_handling_test.go @@ -7,13 +7,12 @@ import ( "fmt" "net/http" "net/http/httptest" - "net/url" "testing" "time" "github.com/fluxcd/pkg/ssa" . "github.com/onsi/gomega" - "github.com/sethvargo/go-limiter/memorystore" + "github.com/sethvargo/go-limiter/noopstore" prommetrics "github.com/slok/go-http-metrics/metrics/prometheus" "github.com/slok/go-http-metrics/middleware" corev1 "k8s.io/api/core/v1" @@ -46,82 +45,17 @@ func TestEventHandler(t *testing.T) { }), }) - store, err := memorystore.New(&memorystore.Config{ - Interval: 5 * time.Minute, - }) - if err != nil { - t.Fatalf("failed to create memory storage") - } - - httpScheme := "http" - - eventServerTests := []struct { - name string - isHttpEnabled bool - url string - }{ - { - name: "http scheme is enabled", - isHttpEnabled: true, - }, { - name: "http scheme is disabled", - isHttpEnabled: false, - }, - } - for _, eventServerTest := range eventServerTests { - t.Run(eventServerTest.name, func(t *testing.T) { - - eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true, eventServerTest.isHttpEnabled) - - stopCh := make(chan struct{}) - go eventServer.ListenAndServe(stopCh, eventMdlw, store) - requestsReceived := 0 - rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - requestsReceived = requestsReceived + 1 - req = r - w.WriteHeader(200) - })) - defer rcvServer.Close() - defer close(stopCh) - - providerKey := types.NamespacedName{ - Name: fmt.Sprintf("provider-%s", randStringRunes(5)), - Namespace: namespace, - } - provider = ¬ifyv1.Provider{ - ObjectMeta: metav1.ObjectMeta{ - Name: providerKey.Name, - Namespace: providerKey.Namespace, - }, - Spec: notifyv1.ProviderSpec{ - Type: "generic", - Address: rcvServer.URL, - }, - } - - webhook_url, err := url.Parse(provider.Spec.Address) - - g.Expect(err).ToNot(HaveOccurred()) - - if eventServerTest.isHttpEnabled { - g.Expect(webhook_url.Scheme).To(Equal(httpScheme)) - g.Expect(requestsReceived).To(Equal(1)) - } else { - g.Expect(webhook_url.Scheme).ToNot(Equal(httpScheme)) - g.Expect(requestsReceived).To(Equal(0)) - } - - }) - } - - eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true, true) + store, err := noopstore.New() + g.Expect(err).ToNot(HaveOccurred()) + serverEndpoint := "127.0.0.1:56789" + eventServer := server.NewEventServer(serverEndpoint, logf.Log, k8sClient, true, true) stopCh := make(chan struct{}) go eventServer.ListenAndServe(stopCh, eventMdlw, store) rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { req = r - w.WriteHeader(200) + w.WriteHeader(http.StatusOK) })) defer rcvServer.Close() defer close(stopCh) @@ -236,10 +170,9 @@ func TestEventHandler(t *testing.T) { testSent := func() { buf := &bytes.Buffer{} g.Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed()) - res, err := http.Post("http://localhost:56789/", "application/json", buf) + res, err := http.Post("http://"+serverEndpoint, "application/json", buf) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted - + g.Expect(res.StatusCode).To(Equal(http.StatusAccepted)) } testForwarded := func() { @@ -361,5 +294,4 @@ func TestEventHandler(t *testing.T) { req = nil }) } - } diff --git a/go.mod b/go.mod index 0df601797..319de5d9a 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( k8s.io/api v0.25.3 k8s.io/apimachinery v0.25.3 k8s.io/client-go v0.25.3 + k8s.io/kubectl v0.24.0 sigs.k8s.io/cli-utils v0.33.0 sigs.k8s.io/controller-runtime v0.13.0 sigs.k8s.io/yaml v1.3.0 @@ -151,7 +152,6 @@ require ( k8s.io/component-base v0.25.2 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect - k8s.io/kubectl v0.24.0 // indirect k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/kustomize/api v0.12.1 // indirect diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index cbf0ea768..dd9228bcf 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -45,7 +45,6 @@ import ( func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - r.Context() body, err := io.ReadAll(r.Body) if err != nil { s.logger.Error(err, "reading the request body failed") diff --git a/internal/server/event_server.go b/internal/server/event_server.go index bfeae7df5..b865bb2ca 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -106,7 +106,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { recorder := &statusRecorder{ ResponseWriter: w, - Status: 200, + Status: http.StatusOK, } h.ServeHTTP(recorder, r) diff --git a/internal/server/event_server_test.go b/internal/server/event_server_test.go index dad0c0e14..22395e2bd 100644 --- a/internal/server/event_server_test.go +++ b/internal/server/event_server_test.go @@ -20,29 +20,39 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "testing" "time" - "github.com/onsi/gomega" + . "github.com/onsi/gomega" "github.com/sethvargo/go-limiter/httplimit" "github.com/sethvargo/go-limiter/memorystore" + "github.com/sethvargo/go-limiter/noopstore" + "github.com/slok/go-http-metrics/middleware" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/kubectl/pkg/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + notifyv1 "github.com/fluxcd/notification-controller/api/v1beta1" + "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/events" ) func TestEventKeyFunc(t *testing.T) { - g := gomega.NewGomegaWithT(t) + g := NewWithT(t) // Setup middleware store, err := memorystore.New(&memorystore.Config{ Interval: 10 * time.Minute, }) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) middleware, err := httplimit.NewMiddleware(store, eventKeyFunc) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) handler := middleware.Handle(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) @@ -129,19 +139,135 @@ func TestEventKeyFunc(t *testing.T) { Message: tt.message, } eventData, err := json.Marshal(event) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData)) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(err).ShouldNot(HaveOccurred()) res := httptest.NewRecorder() handler.ServeHTTP(res, req) if tt.rateLimit { - g.Expect(res.Code).Should(gomega.Equal(429)) - g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(gomega.Equal("0")) + g.Expect(res.Code).Should(Equal(http.StatusTooManyRequests)) + g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(Equal("0")) } else { - g.Expect(res.Code).Should(gomega.Equal(200)) + g.Expect(res.Code).Should(Equal(http.StatusOK)) } }) } } + +func TestBlockInsecureHTTP(t *testing.T) { + g := NewWithT(t) + + var requestsReceived int + rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestsReceived++ + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusOK) + })) + defer rcvServer.Close() + + utilruntime.Must(notifyv1.AddToScheme(scheme.Scheme)) + + testNamespace := "test-ns" + providerKey := "provider" + client := fake.NewFakeClientWithScheme(scheme.Scheme, + ¬ifyv1.Provider{ + ObjectMeta: metav1.ObjectMeta{ + Name: providerKey, + Namespace: testNamespace, + }, + Spec: notifyv1.ProviderSpec{ + Type: "generic", + Address: rcvServer.URL, + }, + }, + ¬ifyv1.Alert{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-alert-name", + Namespace: testNamespace, + }, + Spec: notifyv1.AlertSpec{ + ProviderRef: meta.LocalObjectReference{ + Name: providerKey, + }, + EventSeverity: "info", + EventSources: []notifyv1.CrossNamespaceObjectReference{ + { + Kind: "Bucket", + Name: "hyacinth", + Namespace: testNamespace, + }, + }, + }, + Status: notifyv1.AlertStatus{ + Conditions: []metav1.Condition{ + {Type: meta.ReadyCondition, Status: metav1.ConditionTrue}, + }, + }, + }, + ) + + eventMdlw := middleware.New(middleware.Config{}) + + store, err := noopstore.New() + g.Expect(err).ToNot(HaveOccurred()) + + serverEndpoint := "127.0.0.1:56789" + eventServer := NewEventServer(serverEndpoint, logf.Log, client, true, true) + stopCh := make(chan struct{}) + go eventServer.ListenAndServe(stopCh, eventMdlw, store) + defer close(stopCh) + + event := events.Event{ + InvolvedObject: corev1.ObjectReference{ + Kind: "Bucket", + Name: "hyacinth", + Namespace: testNamespace, + }, + Severity: "info", + Timestamp: metav1.Now(), + Message: "well that happened", + Reason: "event-happened", + ReportingController: "source-controller", + } + + eventServerTests := []struct { + name string + isHttpEnabled bool + url string + wantRequest int + }{ + { + name: "http scheme is disabled", + isHttpEnabled: false, + wantRequest: 0, + }, + { + name: "http scheme is enabled", + isHttpEnabled: true, + wantRequest: 1, + }, + } + for _, tt := range eventServerTests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + requestsReceived = 0 // reset counter + + // Change the internal state instead of creating a new server. + eventServer.supportHttpScheme = tt.isHttpEnabled + + buf := &bytes.Buffer{} + g.Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed()) + res, err := http.Post("http://"+serverEndpoint, "application/json", buf) + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.StatusCode).To(Equal(http.StatusAccepted)) + + // Requests happens async, so should the assertion. + g.Eventually(func() bool { + return requestsReceived == tt.wantRequest + }, 5*time.Second).Should(BeTrue()) + }) + } +}