From c9f634213506388e16047b36424bf1ad196b0b95 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 26 Sep 2025 13:22:10 +0000 Subject: [PATCH 1/5] Detailed gRPC timeout settings --- cmd/main.go | 4 +- cmd/router/main.go | 2 +- .../charts/jumpstarter-controller/model.py | 23 +++++++ deploy/helm/jumpstarter/values.yaml | 13 ++++ internal/config/config.go | 14 ++-- internal/config/grpc.go | 69 ++++++++++++++++++- internal/config/types.go | 8 +++ internal/service/controller_service.go | 24 ++++--- internal/service/router_service.go | 12 ++-- 9 files changed, 141 insertions(+), 28 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index f49836c1..a099ba63 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -214,8 +214,8 @@ func main() { ResourceKey: "jumpstarter-kind", NameKey: "jumpstarter-name", }), - Router: router, - ServerOption: option, + Router: router, + ServerOptions: option, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create service", "service", "Controller") os.Exit(1) diff --git a/cmd/router/main.go b/cmd/router/main.go index beb1570b..730bc641 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -61,7 +61,7 @@ func main() { } svc := service.RouterService{ - ServerOption: serverOption, + ServerOptions: serverOption, } err = svc.Start(ctx) diff --git a/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py b/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py index dee201f4..f8e8566d 100755 --- a/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py +++ b/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py @@ -29,6 +29,7 @@ class Internal(BaseModel): class Keepalive(BaseModel): model_config = ConfigDict(extra="forbid") + # EnforcementPolicy parameters minTime: Optional[str] = Field( None, description="The minimum amount of time a client should wait before sending a keepalive ping", @@ -38,6 +39,28 @@ class Keepalive(BaseModel): description="Whether to allow keepalive pings even when there are no active streams(RPCs)", ) + # ServerParameters for connection timeout control + timeout: Optional[str] = Field( + None, + description="How long the server waits for a ping response before closing the connection", + ) + maxConnectionIdle: Optional[str] = Field( + None, + description="Maximum time a connection can be idle before being closed", + ) + maxConnectionAge: Optional[str] = Field( + None, + description="Maximum lifetime of a connection before it's closed", + ) + maxConnectionAgeGrace: Optional[str] = Field( + None, + description="Grace period after max connection age before forcible closure", + ) + time: Optional[str] = Field( + None, + description="How often the server sends keepalive pings to clients", + ) + class Grpc(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/deploy/helm/jumpstarter/values.yaml b/deploy/helm/jumpstarter/values.yaml index ad5d3e5b..b0a98a95 100644 --- a/deploy/helm/jumpstarter/values.yaml +++ b/deploy/helm/jumpstarter/values.yaml @@ -33,6 +33,11 @@ global: ## @param jumpstarter-controller.config.grpc.keepalive.minTime. The minimum amount of time a client should wait before sending a keepalive ping. ## @param jumpstarter-controller.config.grpc.keepalive.permitWithoutStream. Whether to allow keepalive pings even when there are no active streams(RPCs). +## @param jumpstarter-controller.config.grpc.keepalive.timeout. How long the server waits for a ping response before closing the connection. +## @param jumpstarter-controller.config.grpc.keepalive.maxConnectionIdle. Maximum time a connection can be idle before being closed. +## @param jumpstarter-controller.config.grpc.keepalive.maxConnectionAge. Maximum lifetime of a connection before it's closed. +## @param jumpstarter-controller.config.grpc.keepalive.maxConnectionAgeGrace. Grace period after max connection age before forcible closure. +## @param jumpstarter-controller.config.grpc.keepalive.time. How often the server sends keepalive pings to clients. ## @param jumpstarter-controller.config.authentication.internal.prefix. Prefix to add to the subject claim of the tokens issued by the builtin authenticator. ## @param jumpstarter-controller.config.authentication.jwt. External OIDC authentication, see https://kubernetes.io/docs/reference/access-authn-authz/authentication/#using-authentication-configuration for documentation @@ -74,10 +79,18 @@ jumpstarter-controller: config: grpc: keepalive: + # EnforcementPolicy parameters # Safety: potentially makes server vulnerable to DDoS # https://grpc.io/docs/guides/keepalive/#how-configuring-keepalive-affects-a-call minTime: 3s permitWithoutStream: true + + # ServerParameters for connection timeout control + timeout: 120s # How long to wait for ping response before closing (default: 20s) + # maxConnectionIdle: 30m # Max idle time before closing (default: infinity) + # maxConnectionAge: 2h # Max connection lifetime (default: infinity) + # maxConnectionAgeGrace: 30s # Grace period after max age (default: infinity) + # time: 2h # How often server sends pings (default: 2h) authentication: internal: prefix: "internal:" diff --git a/internal/config/config.go b/internal/config/config.go index 2fbb5874..0b091391 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,7 +19,7 @@ func LoadRouterConfiguration( ctx context.Context, client client.Reader, key client.ObjectKey, -) (grpc.ServerOption, error) { +) ([]grpc.ServerOption, error) { var configmap corev1.ConfigMap if err := client.Get(ctx, key, &configmap); err != nil { return nil, err @@ -51,7 +51,7 @@ func LoadConfiguration( key client.ObjectKey, signer *oidc.Signer, certificateAuthority string, -) (authenticator.Token, string, Router, grpc.ServerOption, *Provisioning, error) { +) (authenticator.Token, string, Router, []grpc.ServerOption, *Provisioning, error) { var configmap corev1.ConfigMap if err := client.Get(ctx, key, &configmap); err != nil { return nil, "", nil, nil, nil, err @@ -82,10 +82,12 @@ func LoadConfiguration( return nil, "", nil, nil, nil, err } - return authenticator, prefix, router, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 1 * time.Second, - PermitWithoutStream: true, - }), &Provisioning{Enabled: false}, nil + return authenticator, prefix, router, []grpc.ServerOption{ + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 1 * time.Second, + PermitWithoutStream: true, + }), + }, &Provisioning{Enabled: false}, nil } rawConfig, ok := configmap.Data["config"] diff --git a/internal/config/grpc.go b/internal/config/grpc.go index dcb40097..e44f8473 100644 --- a/internal/config/grpc.go +++ b/internal/config/grpc.go @@ -7,14 +7,77 @@ import ( "google.golang.org/grpc/keepalive" ) -func LoadGrpcConfiguration(config Grpc) (grpc.ServerOption, error) { +// the grpc lib default is 20 seconds +const defaultGrpcTimeout = 120 * time.Second + +func LoadGrpcConfiguration(config Grpc) ([]grpc.ServerOption, error) { + var serverOptions []grpc.ServerOption + + // Parse EnforcementPolicy parameters minTime, err := time.ParseDuration(config.Keepalive.MinTime) if err != nil { return nil, err } - return grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + serverOptions = append(serverOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: minTime, PermitWithoutStream: config.Keepalive.PermitWithoutStream, - }), nil + })) + + // Parse ServerParameters for connection timeout control + serverParams := keepalive.ServerParameters{} + + // Timeout: How long to wait for ping response before closing connection + if config.Keepalive.Timeout != "" { + timeout, err := time.ParseDuration(config.Keepalive.Timeout) + if err != nil { + return nil, err + } + serverParams.Timeout = timeout + } else { + serverParams.Timeout = defaultGrpcTimeout + } + + // MaxConnectionIdle: Max idle time before closing connection + if config.Keepalive.MaxConnectionIdle != "" { + maxIdle, err := time.ParseDuration(config.Keepalive.MaxConnectionIdle) + if err != nil { + return nil, err + } + serverParams.MaxConnectionIdle = maxIdle + } + + // MaxConnectionAge: Max connection lifetime + if config.Keepalive.MaxConnectionAge != "" { + maxAge, err := time.ParseDuration(config.Keepalive.MaxConnectionAge) + if err != nil { + return nil, err + } + serverParams.MaxConnectionAge = maxAge + } + + // MaxConnectionAgeGrace: Grace period after max age + if config.Keepalive.MaxConnectionAgeGrace != "" { + maxAgeGrace, err := time.ParseDuration(config.Keepalive.MaxConnectionAgeGrace) + if err != nil { + return nil, err + } + serverParams.MaxConnectionAgeGrace = maxAgeGrace + } + + // Time: How often server sends pings + if config.Keepalive.Time != "" { + time, err := time.ParseDuration(config.Keepalive.Time) + if err != nil { + return nil, err + } + serverParams.Time = time + } + + // Only add ServerParameters if at least one parameter is set + if serverParams != (keepalive.ServerParameters{}) { + serverOptions = append(serverOptions, grpc.KeepaliveParams(serverParams)) + } + + return serverOptions, nil } diff --git a/internal/config/types.go b/internal/config/types.go index d3665b0e..4810e32f 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -28,8 +28,16 @@ type Grpc struct { } type Keepalive struct { + // EnforcementPolicy parameters MinTime string `json:"minTime"` PermitWithoutStream bool `json:"permitWithoutStream"` + + // ServerParameters for connection timeout control + Timeout string `json:"timeout,omitempty"` // How long to wait for ping response before closing + MaxConnectionIdle string `json:"maxConnectionIdle,omitempty"` // Max idle time before closing + MaxConnectionAge string `json:"maxConnectionAge,omitempty"` // Max connection lifetime + MaxConnectionAgeGrace string `json:"maxConnectionAgeGrace,omitempty"` // Grace period after max age + Time string `json:"time,omitempty"` // How often server sends pings } type Router map[string]RouterEntry diff --git a/internal/service/controller_service.go b/internal/service/controller_service.go index b85f4478..8fe3a017 100644 --- a/internal/service/controller_service.go +++ b/internal/service/controller_service.go @@ -71,14 +71,14 @@ import ( // ControllerService exposes a gRPC service type ControllerService struct { pb.UnimplementedControllerServiceServer - Client client.WithWatch - Scheme *runtime.Scheme - Authn authentication.ContextAuthenticator - Authz authorizer.Authorizer - Attr authorization.ContextAttributesGetter - ServerOption grpc.ServerOption - Router config.Router - listenQueues sync.Map + Client client.WithWatch + Scheme *runtime.Scheme + Authn authentication.ContextAuthenticator + Authz authorizer.Authorizer + Attr authorization.ContextAttributesGetter + ServerOptions []grpc.ServerOption + Router config.Router + listenQueues sync.Map } type wrappedStream struct { @@ -700,8 +700,7 @@ func (s *ControllerService) Start(ctx context.Context) error { return err } - server := grpc.NewServer( - s.ServerOption, + serverOptions := []grpc.ServerOption{ grpc.ChainUnaryInterceptor(func( gctx context.Context, req any, @@ -718,7 +717,10 @@ func (s *ControllerService) Start(ctx context.Context) error { ) error { return handler(srv, &wrappedStream{ServerStream: ss}) }, recovery.StreamServerInterceptor()), - ) + } + serverOptions = append(serverOptions, s.ServerOptions...) + + server := grpc.NewServer(serverOptions...) pb.RegisterControllerServiceServer(server, s) cpb.RegisterClientServiceServer( diff --git a/internal/service/router_service.go b/internal/service/router_service.go index e1167c8b..4db48e21 100644 --- a/internal/service/router_service.go +++ b/internal/service/router_service.go @@ -38,8 +38,8 @@ import ( // RouterService exposes a gRPC service type RouterService struct { pb.UnimplementedRouterServiceServer - ServerOption grpc.ServerOption - pending sync.Map + ServerOptions []grpc.ServerOption + pending sync.Map } type streamContext struct { @@ -120,12 +120,14 @@ func (s *RouterService) Start(ctx context.Context) error { return err } - server := grpc.NewServer( + serverOptions := []grpc.ServerOption{ grpc.Creds(credentials.NewServerTLSFromCert(cert)), grpc.ChainUnaryInterceptor(recovery.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(recovery.StreamServerInterceptor()), - s.ServerOption, - ) + } + serverOptions = append(serverOptions, s.ServerOptions...) + + server := grpc.NewServer(serverOptions...) pb.RegisterRouterServiceServer(server, s) From 1b355cbbab85c01c3893bffc607260914d57ae1f Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 26 Sep 2025 13:22:51 +0000 Subject: [PATCH 2/5] Remove backwards compatibility setting from 0.6.0 --- internal/config/config.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 0b091391..80ab749a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,11 +3,9 @@ package config import ( "context" "fmt" - "time" "github.com/jumpstarter-dev/jumpstarter-controller/internal/oidc" "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/yaml" @@ -67,29 +65,6 @@ func LoadConfiguration( return nil, "", nil, nil, nil, err } - rawAuthenticationConfiguration, ok := configmap.Data["authentication"] - if ok { - // backwards compatibility - // TODO: remove in 0.7.0 - authenticator, prefix, err := oidc.LoadAuthenticationConfiguration( - ctx, - scheme, - []byte(rawAuthenticationConfiguration), - signer, - certificateAuthority, - ) - if err != nil { - return nil, "", nil, nil, nil, err - } - - return authenticator, prefix, router, []grpc.ServerOption{ - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 1 * time.Second, - PermitWithoutStream: true, - }), - }, &Provisioning{Enabled: false}, nil - } - rawConfig, ok := configmap.Data["config"] if !ok { return nil, "", nil, nil, nil, fmt.Errorf("LoadConfiguration: missing config section") From c6800178285e644fbb3be5b02352685be59db260 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 26 Sep 2025 13:28:09 +0000 Subject: [PATCH 3/5] Bump golangci-lint version to 2.5.0 The version we were currently usign has a missing dependency which isn't available anymore. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 72470535..5048e60f 100644 --- a/Makefile +++ b/Makefile @@ -185,7 +185,7 @@ GRPCURL = $(LOCALBIN)/grpcurl KUSTOMIZE_VERSION ?= v5.4.1 CONTROLLER_TOOLS_VERSION ?= v0.16.3 ENVTEST_VERSION ?= release-0.18 -GOLANGCI_LINT_VERSION ?= v2.1.2 +GOLANGCI_LINT_VERSION ?= v2.5.0 KIND_VERSION ?= v0.27.0 GRPCURL_VERSION ?= v1.9.2 From 9af2275d585c6607ced1dd73840ce25d81d2ed06 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 26 Sep 2025 14:05:40 +0000 Subject: [PATCH 4/5] Make exporter lastseen timeout configurable --- .github/workflows/e2e.yaml | 8 +- cmd/main.go | 9 +- .../charts/jumpstarter-controller/model.py | 10 ++ .../jumpstarter-controller/values.schema.json | 96 +++++++++++++++++++ deploy/helm/jumpstarter/values.yaml | 7 +- internal/config/config.go | 23 +++-- internal/config/grpc.go | 6 +- internal/config/types.go | 40 +++++++- internal/controller/exporter_controller.go | 34 +++++-- internal/controller/lease_controller_test.go | 11 +++ 10 files changed, 214 insertions(+), 30 deletions(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 495c94cd..150de8cd 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -19,10 +19,12 @@ jobs: controller-ref: ${{ github.ref }} # use the matching branch on the jumpstarter repo jumpstarter-ref: ${{ github.event.pull_request.base.ref }} - e2e-tests-28d6b1cc3b49ab9ae176918ab9709a2e2522c97e: + # test the current controller with the previous version of python and E2E tests + # to ensure backwards compatibility + e2e-tests-release-0-7: runs-on: ubuntu-latest steps: - - uses: jumpstarter-dev/jumpstarter-e2e@11a5ce6734be9f089ec3ea6ebf55284616f67fe8 + - uses: jumpstarter-dev/jumpstarter-e2e@release-0.7 with: controller-ref: ${{ github.ref }} - jumpstarter-ref: 28d6b1cc3b49ab9ae176918ab9709a2e2522c97e + jumpstarter-ref: release-0.7 diff --git a/cmd/main.go b/cmd/main.go index a099ba63..3de48cfc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -154,7 +154,7 @@ func main() { os.Exit(1) } - authenticator, prefix, router, option, provisioning, err := config.LoadConfiguration( + authenticator, prefix, router, option, provisioning, exporterOptions, err := config.LoadConfiguration( context.Background(), mgr.GetAPIReader(), mgr.GetScheme(), @@ -174,9 +174,10 @@ func main() { } if err = (&controller.ExporterReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Signer: oidcSigner, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Signer: oidcSigner, + ExporterOptions: *exporterOptions, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Exporter") os.Exit(1) diff --git a/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py b/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py index f8e8566d..708b3a66 100755 --- a/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py +++ b/deploy/helm/jumpstarter/charts/jumpstarter-controller/model.py @@ -203,6 +203,15 @@ class JWTAuthenticator(BaseModel): userValidationRules: Optional[List[UserValidationRule]] = None +class ExporterOptions(BaseModel): + model_config = ConfigDict(extra="forbid") + + offlineTimeout: Optional[str] = Field( + None, + description="How long to wait before marking the exporter as offline", + ) + + class Authentication(BaseModel): model_config = ConfigDict(extra="forbid") @@ -219,6 +228,7 @@ class JumpstarterConfig(BaseModel): provisioning: Optional[Provisioning] = None authentication: Optional[Authentication] = None grpc: Optional[Grpc] = None + exporterOptions: Optional[ExporterOptions] = None class Nodeport(BaseModel): diff --git a/deploy/helm/jumpstarter/charts/jumpstarter-controller/values.schema.json b/deploy/helm/jumpstarter/charts/jumpstarter-controller/values.schema.json index 2cc3fcfc..926d5ebc 100644 --- a/deploy/helm/jumpstarter/charts/jumpstarter-controller/values.schema.json +++ b/deploy/helm/jumpstarter/charts/jumpstarter-controller/values.schema.json @@ -214,6 +214,26 @@ "title": "ClaimValidationRule", "type": "object" }, + "ExporterOptions": { + "additionalProperties": false, + "properties": { + "offlineTimeout": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "How long to wait before marking the exporter as offline", + "title": "Offlinetimeout" + } + }, + "title": "ExporterOptions", + "type": "object" + }, "ExtraItem": { "additionalProperties": false, "properties": { @@ -638,6 +658,17 @@ } ], "default": null + }, + "exporterOptions": { + "anyOf": [ + { + "$ref": "#/$defs/ExporterOptions" + }, + { + "type": "null" + } + ], + "default": null } }, "title": "JumpstarterConfig", @@ -671,6 +702,71 @@ "default": null, "description": "Whether to allow keepalive pings even when there are no active streams(RPCs)", "title": "Permitwithoutstream" + }, + "timeout": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "How long the server waits for a ping response before closing the connection", + "title": "Timeout" + }, + "maxConnectionIdle": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Maximum time a connection can be idle before being closed", + "title": "Maxconnectionidle" + }, + "maxConnectionAge": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Maximum lifetime of a connection before it's closed", + "title": "Maxconnectionage" + }, + "maxConnectionAgeGrace": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Grace period after max connection age before forcible closure", + "title": "Maxconnectionagegrace" + }, + "time": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "How often the server sends keepalive pings to clients", + "title": "Time" } }, "title": "Keepalive", diff --git a/deploy/helm/jumpstarter/values.yaml b/deploy/helm/jumpstarter/values.yaml index b0a98a95..0584c5d8 100644 --- a/deploy/helm/jumpstarter/values.yaml +++ b/deploy/helm/jumpstarter/values.yaml @@ -42,6 +42,8 @@ global: ## @param jumpstarter-controller.config.authentication.internal.prefix. Prefix to add to the subject claim of the tokens issued by the builtin authenticator. ## @param jumpstarter-controller.config.authentication.jwt. External OIDC authentication, see https://kubernetes.io/docs/reference/access-authn-authz/authentication/#using-authentication-configuration for documentation +## @param jumpstarter-controller.config.exporterOptions.offlineTimeout. How long to wait before marking the exporter as offline. + ## @section Ingress And Route parameters ## @descriptionStart This section contains parameters for the Ingress and Route configurations. ## You can enable either the gRPC ingress or the OpenShift route but not both. @@ -77,6 +79,9 @@ jumpstarter-controller: namespace: "" config: + exporterOptions: + offlineTimeout: 180s # how long to wait before marking the exporter as offline + grpc: keepalive: # EnforcementPolicy parameters @@ -86,7 +91,7 @@ jumpstarter-controller: permitWithoutStream: true # ServerParameters for connection timeout control - timeout: 120s # How long to wait for ping response before closing (default: 20s) + # timeout: 180s # How long to wait for ping response before closing (default: 180s) # maxConnectionIdle: 30m # Max idle time before closing (default: infinity) # maxConnectionAge: 2h # Max connection lifetime (default: infinity) # maxConnectionAgeGrace: 30s # Grace period after max age (default: infinity) diff --git a/internal/config/config.go b/internal/config/config.go index 80ab749a..acf6be00 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,30 +49,30 @@ func LoadConfiguration( key client.ObjectKey, signer *oidc.Signer, certificateAuthority string, -) (authenticator.Token, string, Router, []grpc.ServerOption, *Provisioning, error) { +) (authenticator.Token, string, Router, []grpc.ServerOption, *Provisioning, *ExporterOptions, error) { var configmap corev1.ConfigMap if err := client.Get(ctx, key, &configmap); err != nil { - return nil, "", nil, nil, nil, err + return nil, "", nil, nil, nil, nil, err } rawRouter, ok := configmap.Data["router"] if !ok { - return nil, "", nil, nil, nil, fmt.Errorf("LoadConfiguration: missing router section") + return nil, "", nil, nil, nil, nil, fmt.Errorf("LoadConfiguration: missing router section") } var router Router if err := yaml.Unmarshal([]byte(rawRouter), &router); err != nil { - return nil, "", nil, nil, nil, err + return nil, "", nil, nil, nil, nil, err } rawConfig, ok := configmap.Data["config"] if !ok { - return nil, "", nil, nil, nil, fmt.Errorf("LoadConfiguration: missing config section") + return nil, "", nil, nil, nil, nil, fmt.Errorf("LoadConfiguration: missing config section") } var config Config if err := yaml.UnmarshalStrict([]byte(rawConfig), &config); err != nil { - return nil, "", nil, nil, nil, err + return nil, "", nil, nil, nil, nil, err } authenticator, prefix, err := LoadAuthenticationConfiguration( @@ -83,13 +83,18 @@ func LoadConfiguration( certificateAuthority, ) if err != nil { - return nil, "", nil, nil, nil, err + return nil, "", nil, nil, nil, nil, err } serverOptions, err := LoadGrpcConfiguration(config.Grpc) if err != nil { - return nil, "", nil, nil, nil, err + return nil, "", nil, nil, nil, nil, err } - return authenticator, prefix, router, serverOptions, &config.Provisioning, nil + // Preprocess configuration values (parse durations, cache expensive operations, etc.) + if err := config.ExporterOptions.PreprocessConfig(); err != nil { + return nil, "", nil, nil, nil, nil, err + } + + return authenticator, prefix, router, serverOptions, &config.Provisioning, &config.ExporterOptions, nil } diff --git a/internal/config/grpc.go b/internal/config/grpc.go index e44f8473..a6f53fd8 100644 --- a/internal/config/grpc.go +++ b/internal/config/grpc.go @@ -1,14 +1,14 @@ package config import ( + "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) -// the grpc lib default is 20 seconds -const defaultGrpcTimeout = 120 * time.Second +const defaultGrpcTimeout = 180 * time.Second func LoadGrpcConfiguration(config Grpc) ([]grpc.ServerOption, error) { var serverOptions []grpc.ServerOption @@ -31,7 +31,7 @@ func LoadGrpcConfiguration(config Grpc) ([]grpc.ServerOption, error) { if config.Keepalive.Timeout != "" { timeout, err := time.ParseDuration(config.Keepalive.Timeout) if err != nil { - return nil, err + return nil, fmt.Errorf("LoadGrpcConfiguration: failed to parse Keepalive Timeout: %w", err) } serverParams.Timeout = timeout } else { diff --git a/internal/config/types.go b/internal/config/types.go index 4810e32f..31dbd117 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -1,13 +1,17 @@ package config import ( + "fmt" + "time" + apiserverv1beta1 "k8s.io/apiserver/pkg/apis/apiserver/v1beta1" ) type Config struct { - Authentication Authentication `json:"authentication"` - Provisioning Provisioning `json:"provisioning"` - Grpc Grpc `json:"grpc"` + Authentication Authentication `json:"authentication"` + Provisioning Provisioning `json:"provisioning"` + Grpc Grpc `json:"grpc"` + ExporterOptions ExporterOptions `json:"exporterOptions"` } type Authentication struct { @@ -40,6 +44,36 @@ type Keepalive struct { Time string `json:"time,omitempty"` // How often server sends pings } +type ExporterOptions struct { + OfflineTimeout string `json:"offlineTimeout,omitempty"` // How long to wait before marking the exporter as offline + offlineTimeoutDur time.Duration // Pre-calculated duration, set during LoadConfiguration +} + +// PreprocessConfig parses and caches configuration values that require processing +// This method should be called once during configuration loading to pre-calculate +// expensive operations and cache the results for efficient retrieval +func (e *ExporterOptions) PreprocessConfig() error { + // Parse and cache the offline timeout duration + if e.OfflineTimeout == "" { + e.offlineTimeoutDur = 3 * time.Minute // Default fallback + } else { + duration, err := time.ParseDuration(e.OfflineTimeout) + if err != nil { + return fmt.Errorf("PreprocessConfig: failed to parse exporter offline timeout: %w", err) + } else { + e.offlineTimeoutDur = duration + } + } + + // Future configuration parsing can be added here + return nil +} + +// GetOfflineTimeout returns the pre-calculated offline timeout duration +func (e *ExporterOptions) GetOfflineTimeout() time.Duration { + return e.offlineTimeoutDur +} + type Router map[string]RouterEntry type RouterEntry struct { diff --git a/internal/controller/exporter_controller.go b/internal/controller/exporter_controller.go index 0862f77c..e52fc918 100644 --- a/internal/controller/exporter_controller.go +++ b/internal/controller/exporter_controller.go @@ -30,14 +30,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1" + "github.com/jumpstarter-dev/jumpstarter-controller/internal/config" "github.com/jumpstarter-dev/jumpstarter-controller/internal/oidc" ) // ExporterReconciler reconciles a Exporter object type ExporterReconciler struct { client.Client - Scheme *runtime.Scheme - Signer *oidc.Signer + Scheme *runtime.Scheme + Signer *oidc.Signer + ExporterOptions config.ExporterOptions } // +kubebuilder:rbac:groups=jumpstarter.dev,resources=exporters,verbs=get;list;watch;create;update;patch;delete @@ -158,6 +160,7 @@ func (r *ExporterReconciler) reconcileStatusConditionsOnline( exporter *jumpstarterdevv1alpha1.Exporter, ) (ctrl.Result, error) { var requeueAfter time.Duration = 0 + offlineTimeout := r.ExporterOptions.GetOfflineTimeout() if exporter.Status.LastSeen.IsZero() { meta.SetStatusCondition(&exporter.Status.Conditions, metav1.Condition{ @@ -168,13 +171,13 @@ func (r *ExporterReconciler) reconcileStatusConditionsOnline( Message: "Never seen", }) // marking the exporter offline, no need to requeue - } else if time.Since(exporter.Status.LastSeen.Time) > time.Minute { + } else if time.Since(exporter.Status.LastSeen.Time) > offlineTimeout { meta.SetStatusCondition(&exporter.Status.Conditions, metav1.Condition{ Type: string(jumpstarterdevv1alpha1.ExporterConditionTypeOnline), Status: metav1.ConditionFalse, ObservedGeneration: exporter.Generation, Reason: "Seen", - Message: "Last seen more than 1 minute ago", + Message: fmt.Sprintf("Last seen more than %v ago", offlineTimeout), }) // marking the exporter offline, no need to requeue } else { @@ -183,10 +186,27 @@ func (r *ExporterReconciler) reconcileStatusConditionsOnline( Status: metav1.ConditionTrue, ObservedGeneration: exporter.Generation, Reason: "Seen", - Message: "Last seen less than 1 minute ago", + Message: fmt.Sprintf("Last seen less than %v ago", offlineTimeout), }) - // marking the exporter online, requeue after 30 seconds - requeueAfter = time.Second * 30 + + // Calculate when the exporter will go offline + expirationTime := exporter.Status.LastSeen.Add(offlineTimeout) + timeUntilExpiration := time.Until(expirationTime) + + // Set requeue time to be just after expiration (with a small safety margin) + // This way we can definitively determine if the exporter expired or was updated + safetyMargin := 10 * time.Second + if timeUntilExpiration > 0 { + // Exporter hasn't expired yet, requeue after expiration + safety margin + requeueAfter = timeUntilExpiration + safetyMargin + // Cap the requeue time to avoid very long waits + if requeueAfter > 5*time.Minute { + requeueAfter = 5 * time.Minute + } + } else { + // Exporter should have already expired, requeue in safety margin time + requeueAfter = safetyMargin + } } if exporter.Status.Devices == nil { diff --git a/internal/controller/lease_controller_test.go b/internal/controller/lease_controller_test.go index 9421fcbe..bd603605 100644 --- a/internal/controller/lease_controller_test.go +++ b/internal/controller/lease_controller_test.go @@ -21,6 +21,7 @@ import ( "time" jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1" + "github.com/jumpstarter-dev/jumpstarter-controller/internal/config" "github.com/jumpstarter-dev/jumpstarter-controller/internal/oidc" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -199,6 +200,9 @@ var _ = Describe("Lease Controller", func() { lease.Spec.Selector.MatchLabels["dut"] = "b" ctx := context.Background() + + setExporterOnlineConditions(ctx, testExporter3DutB.Name, metav1.ConditionTrue) + Expect(k8sClient.Create(ctx, lease)).To(Succeed()) _ = reconcileLease(ctx, lease) @@ -233,6 +237,7 @@ var _ = Describe("Lease Controller", func() { lease.Spec.Duration.Duration = 500 * time.Millisecond ctx := context.Background() + setExporterOnlineConditions(ctx, testExporter3DutB.Name, metav1.ConditionTrue) Expect(k8sClient.Create(ctx, lease)).To(Succeed()) _ = reconcileLease(ctx, lease) @@ -369,6 +374,12 @@ func reconcileLease(ctx context.Context, lease *jumpstarterdevv1alpha1.Lease) re Client: k8sClient, Scheme: k8sClient.Scheme(), Signer: signer, + ExporterOptions: config.ExporterOptions{ + OfflineTimeout: "1m", + }, + } + if err := exporterReconciler.ExporterOptions.PreprocessConfig(); err != nil { + Expect(err).NotTo(HaveOccurred()) } res, err := leaseReconciler.Reconcile(ctx, reconcile.Request{ From 31165c857c48f84c82b5c316d842c1534ed3834d Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Thu, 2 Oct 2025 10:00:17 +0000 Subject: [PATCH 5/5] Refactor LoadConfiguration results into a struct --- cmd/main.go | 15 +++++++++------ internal/config/config.go | 29 +++++++++++++++++------------ internal/config/oidc.go | 9 ++++++--- internal/config/types.go | 17 +++++++++++++++++ 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 3de48cfc..ee6031a3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -154,7 +154,7 @@ func main() { os.Exit(1) } - authenticator, prefix, router, option, provisioning, exporterOptions, err := config.LoadConfiguration( + configResult, err := config.LoadConfiguration( context.Background(), mgr.GetAPIReader(), mgr.GetScheme(), @@ -177,7 +177,7 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Signer: oidcSigner, - ExporterOptions: *exporterOptions, + ExporterOptions: *configResult.ExporterOptions, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Exporter") os.Exit(1) @@ -208,15 +208,18 @@ func main() { if err = (&service.ControllerService{ Client: watchClient, Scheme: mgr.GetScheme(), - Authn: authentication.NewBearerTokenAuthenticator(authenticator), - Authz: authorization.NewBasicAuthorizer(watchClient, prefix, provisioning.Enabled), + Authn: authentication.NewBearerTokenAuthenticator(configResult.Authenticator), + Authz: authorization.NewBasicAuthorizer( + watchClient, + configResult.InternalAuthenticatorPrefix, + configResult.Provisioning.Enabled), Attr: authorization.NewMetadataAttributesGetter(authorization.MetadataAttributesGetterConfig{ NamespaceKey: "jumpstarter-namespace", ResourceKey: "jumpstarter-kind", NameKey: "jumpstarter-name", }), - Router: router, - ServerOptions: option, + Router: configResult.Router, + ServerOptions: configResult.ServerOptions, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create service", "service", "Controller") os.Exit(1) diff --git a/internal/config/config.go b/internal/config/config.go index acf6be00..5a159b6b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,7 +9,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/apiserver/pkg/authentication/authenticator" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -49,33 +48,33 @@ func LoadConfiguration( key client.ObjectKey, signer *oidc.Signer, certificateAuthority string, -) (authenticator.Token, string, Router, []grpc.ServerOption, *Provisioning, *ExporterOptions, error) { +) (*ConfigurationResult, error) { var configmap corev1.ConfigMap if err := client.Get(ctx, key, &configmap); err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } rawRouter, ok := configmap.Data["router"] if !ok { - return nil, "", nil, nil, nil, nil, fmt.Errorf("LoadConfiguration: missing router section") + return nil, fmt.Errorf("LoadConfiguration: missing router section") } var router Router if err := yaml.Unmarshal([]byte(rawRouter), &router); err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } rawConfig, ok := configmap.Data["config"] if !ok { - return nil, "", nil, nil, nil, nil, fmt.Errorf("LoadConfiguration: missing config section") + return nil, fmt.Errorf("LoadConfiguration: missing config section") } var config Config if err := yaml.UnmarshalStrict([]byte(rawConfig), &config); err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } - authenticator, prefix, err := LoadAuthenticationConfiguration( + authResult, err := LoadAuthenticationConfiguration( ctx, scheme, config.Authentication, @@ -83,18 +82,24 @@ func LoadConfiguration( certificateAuthority, ) if err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } serverOptions, err := LoadGrpcConfiguration(config.Grpc) if err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } // Preprocess configuration values (parse durations, cache expensive operations, etc.) if err := config.ExporterOptions.PreprocessConfig(); err != nil { - return nil, "", nil, nil, nil, nil, err + return nil, err } - return authenticator, prefix, router, serverOptions, &config.Provisioning, &config.ExporterOptions, nil + return &ConfigurationResult{ + AuthenticationConfigResult: *authResult, + Router: router, + ServerOptions: serverOptions, + Provisioning: &config.Provisioning, + ExporterOptions: &config.ExporterOptions, + }, nil } diff --git a/internal/config/oidc.go b/internal/config/oidc.go index 3a79e907..be0804a4 100644 --- a/internal/config/oidc.go +++ b/internal/config/oidc.go @@ -20,7 +20,7 @@ func LoadAuthenticationConfiguration( config Authentication, signer *oidc.Signer, certificateAuthority string, -) (authenticator.Token, string, error) { +) (*AuthenticationConfigResult, error) { if config.Internal.Prefix == "" { config.Internal.Prefix = "internal:" } @@ -45,10 +45,13 @@ func LoadAuthenticationConfiguration( config, ) if err != nil { - return nil, "", err + return nil, err } - return authn, config.Internal.Prefix, nil + return &AuthenticationConfigResult{ + Authenticator: authn, + InternalAuthenticatorPrefix: config.Internal.Prefix, + }, nil } // Reference: https://github.com/kubernetes/kubernetes/blob/v1.32.1/pkg/kubeapiserver/authenticator/config.go#L244 diff --git a/internal/config/types.go b/internal/config/types.go index 31dbd117..b687a926 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -4,7 +4,9 @@ import ( "fmt" "time" + "google.golang.org/grpc" apiserverv1beta1 "k8s.io/apiserver/pkg/apis/apiserver/v1beta1" + "k8s.io/apiserver/pkg/authentication/authenticator" ) type Config struct { @@ -80,3 +82,18 @@ type RouterEntry struct { Endpoint string `json:"endpoint"` Labels map[string]string `json:"labels"` } + +// AuthenticationConfigResult contains the authentication components loaded by LoadAuthenticationConfiguration +type AuthenticationConfigResult struct { + Authenticator authenticator.Token `json:"-"` + InternalAuthenticatorPrefix string `json:"-"` +} + +// ConfigurationResult contains all the configuration components loaded by LoadConfiguration +type ConfigurationResult struct { + AuthenticationConfigResult + Router Router `json:"router"` + ServerOptions []grpc.ServerOption `json:"-"` + Provisioning *Provisioning `json:"provisioning"` // if we want authenticated users to be automatically created + ExporterOptions *ExporterOptions `json:"exporterOptions"` +}