diff --git a/internal/constants/constants.go b/internal/constants/constants.go index fc5172f08..b94629eab 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -85,6 +85,8 @@ const ( var ExtraNoProxyList = []string{ElasticsearchFQDN} +var ProxyEnvVars = []string{"HTTP_PROXY", "https_proxy", "HTTP_PROXY", "http_proxy", "NO_PROXY", "no_proxy"} + func DefaultTolerations() []v1.Toleration { return []v1.Toleration{ { diff --git a/internal/network/network_policy.go b/internal/network/network_policy.go index 66e4a51e4..8a2e8d52e 100644 --- a/internal/network/network_policy.go +++ b/internal/network/network_policy.go @@ -23,14 +23,26 @@ func ReconcileClusterLogForwarderNetworkPolicy(k8Client client.Client, namespace // For RestrictIngressEgress, determine the ports to use based on URLs in outputs and defaults if policyRuleSet == obsv1.NetworkPolicyRuleSetTypeRestrictIngressEgress { - // Parse ports with protocols from outputs - if len(outputs) > 0 { - egressPorts = GetOutputPortsWithProtocols(outputs) - } // Parse ports from inputs (receiver inputs use TCP) if len(inputs) > 0 { ingressPorts = GetInputPorts(inputs) } + + // Parse ports for egress from outputs and proxy configuration if any + egressPortMap := map[factory.PortProtocol]bool{} + // Parse ports with protocols from outputs + if len(outputs) > 0 { + GetOutputPortsWithProtocols(outputs, egressPortMap) + + } + // Add proxy ports if any for cluster-wide proxy configuration + GetProxyPorts(egressPortMap) + + // Convert map to slice + egressPorts = make([]factory.PortProtocol, 0, len(egressPortMap)) + for pp := range egressPortMap { + egressPorts = append(egressPorts, pp) + } } desired := factory.NewNetworkPolicyWithProtocolPorts(namespace, policyName, instanceName, component, string(policyRuleSet), egressPorts, ingressPorts, visitor) diff --git a/internal/network/ports.go b/internal/network/ports.go index 13c36ea1a..79d481913 100644 --- a/internal/network/ports.go +++ b/internal/network/ports.go @@ -5,17 +5,17 @@ import ( "net/url" "strconv" + log "github.com/ViaQ/logerr/v2/log/static" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" "github.com/openshift/cluster-logging-operator/internal/factory" + "github.com/openshift/cluster-logging-operator/internal/utils" corev1 "k8s.io/api/core/v1" - "k8s.io/utils/set" + "k8s.io/apimachinery/pkg/util/sets" ) // GetOutputPortsWithProtocols extracts all unique ports with their protocols from the given outputs. // It parses URLs to extract ports and protocols, or uses default values based on the output type. -func GetOutputPortsWithProtocols(outputs []obs.OutputSpec) []factory.PortProtocol { - portProtocolMap := map[factory.PortProtocol]bool{} - +func GetOutputPortsWithProtocols(outputs []obs.OutputSpec, portProtocolMap map[factory.PortProtocol]bool) { for _, output := range outputs { portProtocols := getPortProtocolFromOutputURL(output) for _, pp := range portProtocols { @@ -24,19 +24,12 @@ func GetOutputPortsWithProtocols(outputs []obs.OutputSpec) []factory.PortProtoco } } } - - result := make([]factory.PortProtocol, 0, len(portProtocolMap)) - for pp := range portProtocolMap { - result = append(result, pp) - } - - return result } // GetInputPorts extracts all unique ports from the given input receiver specs. // It returns the ports that input receivers are configured to listen on. func GetInputPorts(inputs []obs.InputSpec) []int32 { - portSet := set.New[int32]() + portSet := sets.NewInt32() for _, input := range inputs { if input.Type == obs.InputTypeReceiver && input.Receiver != nil { @@ -106,7 +99,7 @@ func getPortProtocolFromOutputURL(output obs.OutputSpec) []factory.PortProtocol } // Fall back to default port with TCP protocol - if defaultPort := getDefaultPort(output.Type, urlStr); defaultPort > 0 { + if defaultPort := getDefaultOutputPort(output.Type, urlStr); defaultPort > 0 { return []factory.PortProtocol{{Port: defaultPort, Protocol: corev1.ProtocolTCP}} } @@ -123,7 +116,7 @@ func getKafkaBrokerPortProtocols(brokers []obs.BrokerURL) []factory.PortProtocol portProtocolSlice = append(portProtocolSlice, factory.PortProtocol{Port: port.Port, Protocol: port.Protocol}) } else { // If no port in broker URL, use default port based on scheme - if defaultPort := getDefaultPort(obs.OutputTypeKafka, string(broker)); defaultPort > 0 { + if defaultPort := getDefaultOutputPort(obs.OutputTypeKafka, string(broker)); defaultPort > 0 { portProtocolSlice = append(portProtocolSlice, factory.PortProtocol{Port: defaultPort, Protocol: corev1.ProtocolTCP}) } } @@ -132,6 +125,13 @@ func getKafkaBrokerPortProtocols(brokers []obs.BrokerURL) []factory.PortProtocol return portProtocolSlice } +func parsePortString(portStr string) (int32, bool) { + if port, err := strconv.ParseInt(portStr, 10, 32); err == nil && port > 0 { + return int32(port), true + } + return 0, false +} + // parsePortProtocolFromURL extracts the port from a URL string. // Returns nil if the port cannot be determined. func parsePortProtocolFromURL(urlStr string) *factory.PortProtocol { @@ -150,19 +150,15 @@ func parsePortProtocolFromURL(urlStr string) *factory.PortProtocol { protocol = corev1.ProtocolTCP } - portStr := parsedURL.Port() - if portStr != "" { - port, err := strconv.ParseInt(portStr, 10, 32) - if err == nil && port > 0 { - return &factory.PortProtocol{Port: int32(port), Protocol: protocol} - } + if port, ok := parsePortString(parsedURL.Port()); ok { + return &factory.PortProtocol{Port: port, Protocol: protocol} } return nil } -// getDefaultPort returns the default port for a given output type based on the URL scheme or the default port for the output type. -func getDefaultPort(outputType obs.OutputType, urlStr string) int32 { +// getDefaultOutputPort returns the default port for a given output type based on the URL scheme or the default port for the output type. +func getDefaultOutputPort(outputType obs.OutputType, urlStr string) int32 { // Parse URL to determine scheme for kafka and http/https to return appropriate default port var scheme string if urlStr != "" { @@ -200,3 +196,46 @@ func getDefaultPort(outputType obs.OutputType, urlStr string) int32 { } panic(fmt.Sprintf("unknown output type: %s", outputType)) } + +// getDefaultProxyPort returns the default port for a given proxy environment variable based on the URL scheme. +func getDefaultProxyPort(scheme string) (int32, bool) { + switch scheme { + case "http": + return 80, true + case "https": + return 443, true + } + return 0, false +} + +// GetProxyPorts extracts unique ports from cluster-wide proxy environment variables. +// It parses HTTP_PROXY and HTTPS_PROXY URLs to determine explicit proxy ports, +// or adds default ports (80 for HTTP, 443 for HTTPS) when no port is specified. +func GetProxyPorts(portProtocolMap map[factory.PortProtocol]bool) { + // Get proxy environment variables and parse them for additional explicit ports + proxyEnvVars := utils.GetProxyEnvVars() + + for _, envVar := range proxyEnvVars { + // Skip non-proxy environment variables or empty values + if (envVar.Name != "http_proxy" && envVar.Name != "https_proxy") || envVar.Value == "" { + continue + } + + // Parse URL for port extraction or default port determination + parsedURL, err := url.Parse(envVar.Value) + if err != nil { + log.V(0).Error(err, "Failed to parse proxy URL", "url", envVar.Value) + continue + } + + var port int32 + var ok bool + // Extract port from URL or use default port based on URL scheme + if port, ok = parsePortString(parsedURL.Port()); !ok { + port, ok = getDefaultProxyPort(parsedURL.Scheme) + } + if ok { + portProtocolMap[factory.PortProtocol{Port: port, Protocol: corev1.ProtocolTCP}] = true + } + } +} diff --git a/internal/network/ports_test.go b/internal/network/ports_test.go index 1d3069acb..9284da046 100644 --- a/internal/network/ports_test.go +++ b/internal/network/ports_test.go @@ -1,10 +1,13 @@ package network import ( + "os" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/constants" "github.com/openshift/cluster-logging-operator/internal/factory" corev1 "k8s.io/api/core/v1" ) @@ -35,7 +38,7 @@ var _ = Describe("Network Ports", func() { DescribeTable("getDefaultPort", func(outputType obs.OutputType, urlStr string, expected int32) { - port := getDefaultPort(outputType, urlStr) + port := getDefaultOutputPort(outputType, urlStr) Expect(port).To(Equal(expected)) }, // Different output types @@ -63,12 +66,12 @@ var _ = Describe("Network Ports", func() { It("should not panic for all supported output types", func() { for _, outputType := range obs.OutputTypes { - Expect(func() { getDefaultPort(outputType, "") }).ToNot(Panic()) + Expect(func() { getDefaultOutputPort(outputType, "") }).ToNot(Panic()) } }) It("should panic for unknown output type", func() { - Expect(func() { getDefaultPort(obs.OutputType("unknown"), "") }).To(Panic()) + Expect(func() { getDefaultOutputPort(obs.OutputType("unknown"), "") }).To(Panic()) }) DescribeTable("getKafkaBrokerPortProtocols", @@ -323,12 +326,13 @@ var _ = Describe("Network Ports", func() { }, }, } - ports := GetOutputPortsWithProtocols(outputs) - Expect(ports).To(ConsistOf( - factory.PortProtocol{Port: 9200, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 8088, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 3100, Protocol: corev1.ProtocolTCP}, - )) + portMap := map[factory.PortProtocol]bool{} + GetOutputPortsWithProtocols(outputs, portMap) + Expect(portMap).To(Equal(map[factory.PortProtocol]bool{ + {Port: 9200, Protocol: corev1.ProtocolTCP}: true, + {Port: 8088, Protocol: corev1.ProtocolTCP}: true, + {Port: 3100, Protocol: corev1.ProtocolTCP}: true, + })) }) It("should deduplicate ports from multiple outputs", func() { @@ -352,11 +356,12 @@ var _ = Describe("Network Ports", func() { }, }, } - ports := GetOutputPortsWithProtocols(outputs) - Expect(ports).To(ConsistOf( - factory.PortProtocol{Port: 9200, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 8088, Protocol: corev1.ProtocolTCP}, - )) + portMap := map[factory.PortProtocol]bool{} + GetOutputPortsWithProtocols(outputs, portMap) + Expect(portMap).To(Equal(map[factory.PortProtocol]bool{ + {Port: 9200, Protocol: corev1.ProtocolTCP}: true, + {Port: 8088, Protocol: corev1.ProtocolTCP}: true, + })) }) It("should handle outputs with default ports", func() { @@ -374,11 +379,12 @@ var _ = Describe("Network Ports", func() { }, }, } - ports := GetOutputPortsWithProtocols(outputs) - Expect(ports).To(ConsistOf( - factory.PortProtocol{Port: 9200, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 80, Protocol: corev1.ProtocolTCP}, - )) + portMap := map[factory.PortProtocol]bool{} + GetOutputPortsWithProtocols(outputs, portMap) + Expect(portMap).To(Equal(map[factory.PortProtocol]bool{ + {Port: 9200, Protocol: corev1.ProtocolTCP}: true, + {Port: 80, Protocol: corev1.ProtocolTCP}: true, + })) }) It("should handle complex Kafka output with multiple brokers", func() { @@ -401,12 +407,13 @@ var _ = Describe("Network Ports", func() { }, }, } - ports := GetOutputPortsWithProtocols(outputs) - Expect(ports).To(ConsistOf( - factory.PortProtocol{Port: 9092, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 9093, Protocol: corev1.ProtocolTCP}, - factory.PortProtocol{Port: 9200, Protocol: corev1.ProtocolTCP}, - )) + portMap := map[factory.PortProtocol]bool{} + GetOutputPortsWithProtocols(outputs, portMap) + Expect(portMap).To(Equal(map[factory.PortProtocol]bool{ + {Port: 9092, Protocol: corev1.ProtocolTCP}: true, + {Port: 9093, Protocol: corev1.ProtocolTCP}: true, + {Port: 9200, Protocol: corev1.ProtocolTCP}: true, + })) }) }) }) @@ -557,4 +564,116 @@ var _ = Describe("Network Ports", func() { }) }) }) + + Describe("GetProxyPorts", func() { + var originalEnvVars map[string]string + + BeforeEach(func() { + // Save original environment variables + originalEnvVars = make(map[string]string) + for _, envVar := range constants.ProxyEnvVars { + originalEnvVars[envVar] = os.Getenv(envVar) + os.Unsetenv(envVar) // Clear all proxy env vars for clean test state + } + }) + + AfterEach(func() { + // Restore original environment variables + for envVar, value := range originalEnvVars { + if value != "" { + os.Setenv(envVar, value) + } else { + os.Unsetenv(envVar) + } + } + }) + + setProxies := func(httpProxy, httpsProxy string) map[string]string { + return map[string]string{ + "http_proxy": httpProxy, + "https_proxy": httpsProxy, + } + } + + expectedProxyPorts := func(ports ...int32) []factory.PortProtocol { + expectedPortProtocols := make([]factory.PortProtocol, 0, len(ports)) + for _, port := range ports { + expectedPortProtocols = append(expectedPortProtocols, factory.PortProtocol{Port: port, Protocol: corev1.ProtocolTCP}) + } + return expectedPortProtocols + } + + DescribeTable("when proxy environment variables are set", + func(envVars map[string]string, expectedPorts []factory.PortProtocol) { + for key, value := range envVars { + os.Setenv(key, value) + } + + portMap := map[factory.PortProtocol]bool{} + GetProxyPorts(portMap) + ports := make([]factory.PortProtocol, 0, len(portMap)) + for pp := range portMap { + ports = append(ports, pp) + } + if len(expectedPorts) == 0 { + Expect(ports).To(BeEmpty()) + } else { + Expect(ports).To(ConsistOf(expectedPorts)) + } + }, + Entry("should extract ports from HTTP proxy URLs with explicit ports", + setProxies("http://proxy.example.com:8080", "https://proxy.example.com:8443"), + expectedProxyPorts(8080, 8443), + ), + Entry("should use default ports when proxy URLs don't specify ports", + setProxies("http://proxy.example.com", "https://proxy.example.com"), + expectedProxyPorts(80, 443), + ), + Entry("should return empty when proxy URLs have unknown schemes without ports", + setProxies("proxy://proxy.example.com", "invalid://proxy.example.com"), + expectedProxyPorts(), + ), + Entry("should deduplicate identical proxy ports", + setProxies("http://proxy.example.com:8080", "https://proxy.example.com:8080"), + expectedProxyPorts(8080), + ), + Entry("should handle malformed proxy URLs gracefully", + setProxies("not-a-valid-url", "http://proxy.example.com:8080"), + expectedProxyPorts(8080), + ), + Entry("should extract explicitly specified standard ports from proxy URLs", + setProxies("http://proxy.example.com:80", "https://proxy.example.com:443"), + expectedProxyPorts(80, 443), + ), + Entry("should handle proxy URLs with authentication", + setProxies("http://user:password@proxy.example.com:8080", "https://user:password@proxy.example.com:8443"), + expectedProxyPorts(8080, 8443), + ), + Entry("should handle IPv6 proxy URLs", + setProxies("http://[::1]:8080", ""), + expectedProxyPorts(8080), + ), + Entry("should handle empty string proxy URLs gracefully", + setProxies("", "https://proxy.example.com:8443"), + expectedProxyPorts(8443), + ), + Entry("should handle invalid ports on urls gracefully", + setProxies("http://proxy.example.com:invalid", "https://proxy.example.com:no-a-port"), + expectedProxyPorts(), + ), + Entry("should handle mix of uppercase and lowercase proxy environment variables", + map[string]string{ + "HTTP_PROXY": "http://proxy.example.com:8080", + "https_proxy": "https://proxy.example.com:8443", + }, + expectedProxyPorts(8080, 8443), + ), + ) + + It("should be empty portMap when no proxy environment variables are set", func() { + portMap := map[factory.PortProtocol]bool{} + GetProxyPorts(portMap) + Expect(portMap).To(BeEmpty()) + }) + }) }) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 16ef5426f..16c6dd28a 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -322,7 +322,7 @@ func EnvVarResourceFieldSelectorEqual(resource1, resource2 v1.ResourceFieldSelec func GetProxyEnvVars() []v1.EnvVar { envVars := []v1.EnvVar{} - for _, envvar := range []string{"HTTPS_PROXY", "https_proxy", "HTTP_PROXY", "http_proxy", "NO_PROXY", "no_proxy"} { + for _, envvar := range constants.ProxyEnvVars { if value := os.Getenv(envvar); value != "" { if envvar == "NO_PROXY" || envvar == "no_proxy" { if len(constants.ExtraNoProxyList) > 0 {