Skip to content

Commit c55b0b9

Browse files
committed
LOG-8068: Vector can't access log stores outside the cluster when restrict network policy is enabled and the cluster has cluster-wide proxy.
1 parent 28c8b02 commit c55b0b9

File tree

5 files changed

+224
-52
lines changed

5 files changed

+224
-52
lines changed

internal/constants/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ const (
8585

8686
var ExtraNoProxyList = []string{ElasticsearchFQDN}
8787

88+
var ProxyEnvVars = []string{"HTTP_PROXY", "https_proxy", "HTTP_PROXY", "http_proxy", "NO_PROXY", "no_proxy"}
89+
8890
func DefaultTolerations() []v1.Toleration {
8991
return []v1.Toleration{
9092
{

internal/network/network_policy.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,26 @@ func ReconcileClusterLogForwarderNetworkPolicy(k8Client client.Client, namespace
2323

2424
// For RestrictIngressEgress, determine the ports to use based on URLs in outputs and defaults
2525
if policyRuleSet == obsv1.NetworkPolicyRuleSetTypeRestrictIngressEgress {
26-
// Parse ports with protocols from outputs
27-
if len(outputs) > 0 {
28-
egressPorts = GetOutputPortsWithProtocols(outputs)
29-
}
3026
// Parse ports from inputs (receiver inputs use TCP)
3127
if len(inputs) > 0 {
3228
ingressPorts = GetInputPorts(inputs)
3329
}
30+
31+
// Parse ports for egress from outputs and proxy configuration if any
32+
egressPortMap := map[factory.PortProtocol]bool{}
33+
// Parse ports with protocols from outputs
34+
if len(outputs) > 0 {
35+
GetOutputPortsWithProtocols(outputs, egressPortMap)
36+
37+
}
38+
// Add proxy ports if any for cluster-wide proxy configuration
39+
GetProxyPorts(egressPortMap)
40+
41+
// Convert map to slice
42+
egressPorts = make([]factory.PortProtocol, 0, len(egressPortMap))
43+
for pp := range egressPortMap {
44+
egressPorts = append(egressPorts, pp)
45+
}
3446
}
3547

3648
desired := factory.NewNetworkPolicyWithProtocolPorts(namespace, policyName, instanceName, component, string(policyRuleSet), egressPorts, ingressPorts, visitor)

internal/network/ports.go

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ import (
55
"net/url"
66
"strconv"
77

8+
log "github.com/ViaQ/logerr/v2/log/static"
89
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
910
"github.com/openshift/cluster-logging-operator/internal/factory"
11+
"github.com/openshift/cluster-logging-operator/internal/utils"
1012
corev1 "k8s.io/api/core/v1"
11-
"k8s.io/utils/set"
13+
"k8s.io/apimachinery/pkg/util/sets"
1214
)
1315

1416
// GetOutputPortsWithProtocols extracts all unique ports with their protocols from the given outputs.
1517
// It parses URLs to extract ports and protocols, or uses default values based on the output type.
16-
func GetOutputPortsWithProtocols(outputs []obs.OutputSpec) []factory.PortProtocol {
17-
portProtocolMap := map[factory.PortProtocol]bool{}
18-
18+
func GetOutputPortsWithProtocols(outputs []obs.OutputSpec, portProtocolMap map[factory.PortProtocol]bool) {
1919
for _, output := range outputs {
2020
portProtocols := getPortProtocolFromOutputURL(output)
2121
for _, pp := range portProtocols {
@@ -24,19 +24,12 @@ func GetOutputPortsWithProtocols(outputs []obs.OutputSpec) []factory.PortProtoco
2424
}
2525
}
2626
}
27-
28-
result := make([]factory.PortProtocol, 0, len(portProtocolMap))
29-
for pp := range portProtocolMap {
30-
result = append(result, pp)
31-
}
32-
33-
return result
3427
}
3528

3629
// GetInputPorts extracts all unique ports from the given input receiver specs.
3730
// It returns the ports that input receivers are configured to listen on.
3831
func GetInputPorts(inputs []obs.InputSpec) []int32 {
39-
portSet := set.New[int32]()
32+
portSet := sets.NewInt32()
4033

4134
for _, input := range inputs {
4235
if input.Type == obs.InputTypeReceiver && input.Receiver != nil {
@@ -106,7 +99,7 @@ func getPortProtocolFromOutputURL(output obs.OutputSpec) []factory.PortProtocol
10699
}
107100

108101
// Fall back to default port with TCP protocol
109-
if defaultPort := getDefaultPort(output.Type, urlStr); defaultPort > 0 {
102+
if defaultPort := getDefaultOutputPort(output.Type, urlStr); defaultPort > 0 {
110103
return []factory.PortProtocol{{Port: defaultPort, Protocol: corev1.ProtocolTCP}}
111104
}
112105

@@ -123,7 +116,7 @@ func getKafkaBrokerPortProtocols(brokers []obs.BrokerURL) []factory.PortProtocol
123116
portProtocolSlice = append(portProtocolSlice, factory.PortProtocol{Port: port.Port, Protocol: port.Protocol})
124117
} else {
125118
// If no port in broker URL, use default port based on scheme
126-
if defaultPort := getDefaultPort(obs.OutputTypeKafka, string(broker)); defaultPort > 0 {
119+
if defaultPort := getDefaultOutputPort(obs.OutputTypeKafka, string(broker)); defaultPort > 0 {
127120
portProtocolSlice = append(portProtocolSlice, factory.PortProtocol{Port: defaultPort, Protocol: corev1.ProtocolTCP})
128121
}
129122
}
@@ -132,6 +125,13 @@ func getKafkaBrokerPortProtocols(brokers []obs.BrokerURL) []factory.PortProtocol
132125
return portProtocolSlice
133126
}
134127

128+
func parsePortString(portStr string) (int32, bool) {
129+
if port, err := strconv.ParseInt(portStr, 10, 32); err == nil && port > 0 {
130+
return int32(port), true
131+
}
132+
return 0, false
133+
}
134+
135135
// parsePortProtocolFromURL extracts the port from a URL string.
136136
// Returns nil if the port cannot be determined.
137137
func parsePortProtocolFromURL(urlStr string) *factory.PortProtocol {
@@ -150,19 +150,15 @@ func parsePortProtocolFromURL(urlStr string) *factory.PortProtocol {
150150
protocol = corev1.ProtocolTCP
151151
}
152152

153-
portStr := parsedURL.Port()
154-
if portStr != "" {
155-
port, err := strconv.ParseInt(portStr, 10, 32)
156-
if err == nil && port > 0 {
157-
return &factory.PortProtocol{Port: int32(port), Protocol: protocol}
158-
}
153+
if port, ok := parsePortString(parsedURL.Port()); ok {
154+
return &factory.PortProtocol{Port: port, Protocol: protocol}
159155
}
160156

161157
return nil
162158
}
163159

164-
// getDefaultPort returns the default port for a given output type based on the URL scheme or the default port for the output type.
165-
func getDefaultPort(outputType obs.OutputType, urlStr string) int32 {
160+
// getDefaultOutputPort returns the default port for a given output type based on the URL scheme or the default port for the output type.
161+
func getDefaultOutputPort(outputType obs.OutputType, urlStr string) int32 {
166162
// Parse URL to determine scheme for kafka and http/https to return appropriate default port
167163
var scheme string
168164
if urlStr != "" {
@@ -200,3 +196,46 @@ func getDefaultPort(outputType obs.OutputType, urlStr string) int32 {
200196
}
201197
panic(fmt.Sprintf("unknown output type: %s", outputType))
202198
}
199+
200+
// getDefaultProxyPort returns the default port for a given proxy environment variable based on the URL scheme.
201+
func getDefaultProxyPort(scheme string) (int32, bool) {
202+
switch scheme {
203+
case "http":
204+
return 80, true
205+
case "https":
206+
return 443, true
207+
}
208+
return 0, false
209+
}
210+
211+
// GetProxyPorts extracts unique ports from cluster-wide proxy environment variables.
212+
// It parses HTTP_PROXY and HTTPS_PROXY URLs to determine explicit proxy ports,
213+
// or adds default ports (80 for HTTP, 443 for HTTPS) when no port is specified.
214+
func GetProxyPorts(portProtocolMap map[factory.PortProtocol]bool) {
215+
// Get proxy environment variables and parse them for additional explicit ports
216+
proxyEnvVars := utils.GetProxyEnvVars()
217+
218+
for _, envVar := range proxyEnvVars {
219+
// Skip non-proxy environment variables or empty values
220+
if (envVar.Name != "http_proxy" && envVar.Name != "https_proxy") || envVar.Value == "" {
221+
continue
222+
}
223+
224+
// Parse URL for port extraction or default port determination
225+
parsedURL, err := url.Parse(envVar.Value)
226+
if err != nil {
227+
log.V(0).Error(err, "Failed to parse proxy URL", "url", envVar.Value)
228+
continue
229+
}
230+
231+
var port int32
232+
var ok bool
233+
// Extract port from URL or use default port based on URL scheme
234+
if port, ok = parsePortString(parsedURL.Port()); !ok {
235+
port, ok = getDefaultProxyPort(parsedURL.Scheme)
236+
}
237+
if ok {
238+
portProtocolMap[factory.PortProtocol{Port: port, Protocol: corev1.ProtocolTCP}] = true
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)