diff --git a/cmds/contest/server/server.go b/cmds/contest/server/server.go index 3196c57f..1ed8dea1 100644 --- a/cmds/contest/server/server.go +++ b/cmds/contest/server/server.go @@ -42,47 +42,55 @@ import ( "github.com/linuxboot/contest/plugins/listeners/httplistener" ) -var ( - flagSet *flag.FlagSet - flagDBURI *string - flagListenAddr *string - flagServerID *string - flagProcessTimeout *time.Duration - flagTargetLocker *string - flagInstanceTag *string - flagPauseTimeout *time.Duration - flagResumeJobs *bool - flagTargetLockDuration *time.Duration +type flags struct { + DBURI string + ListenAddr string + ServerID string + ProcessTimeout time.Duration + TargetLocker string + InstanceTag string + PauseTimeout time.Duration + ResumeJobs bool + TargetLockDuration time.Duration // http logger parameters - flagAdminServerAddr *string - flagHttpLoggerBufferSize *int - flagHttpLoggerMaxBatchSize *int - flagHttpLoggerMaxBatchCount *int - flagHttpLoggerBatchSendFreq *time.Duration - flagHttpLoggerTimeout *time.Duration - logLevel = logger.LevelDebug -) + AdminServerAddr string + HttpLoggerBufferSize int + HttpLoggerMaxBatchSize int + HttpLoggerMaxBatchCount int + HttpLoggerBatchSendFreq time.Duration + HttpLoggerTimeout time.Duration + LogLevel logger.Level +} -func initFlags(cmd string) { - flagSet = flag.NewFlagSet(cmd, flag.ContinueOnError) - flagDBURI = flagSet.String("dbURI", config.DefaultDBURI, "Database URI") - flagListenAddr = flagSet.String("listenAddr", ":8080", "Listen address and port") - flagAdminServerAddr = flagSet.String("adminServerAddr", "", "Addr of the admin server to connect to") - flagHttpLoggerBufferSize = flagSet.Int("loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook") - flagHttpLoggerMaxBatchSize = flagSet.Int("loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it") - flagHttpLoggerMaxBatchCount = flagSet.Int("loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch") - flagHttpLoggerBatchSendFreq = flagSet.Duration("loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq") - flagHttpLoggerTimeout = flagSet.Duration("loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout") - flagServerID = flagSet.String("serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default") - flagProcessTimeout = flagSet.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout") - flagTargetLocker = flagSet.String("targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting") - flagInstanceTag = flagSet.String("instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.") - flagSet.Var(&logLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal") - flagPauseTimeout = flagSet.Duration("pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately") - flagResumeJobs = flagSet.Bool("resumeJobs", false, "Attempt to resume paused jobs") - flagTargetLockDuration = flagSet.Duration("targetLockDuration", config.DefaultTargetLockDuration, +func parseFlags(cmd string, args ...string) (*flags, error) { + f := &flags{ + LogLevel: logger.LevelDebug, + } + flagSet := flag.NewFlagSet(cmd, flag.ContinueOnError) + flagSet.StringVar(&f.DBURI, "dbURI", config.DefaultDBURI, "Database URI") + flagSet.StringVar(&f.ListenAddr, "listenAddr", ":8080", "Listen address and port") + flagSet.StringVar(&f.AdminServerAddr, "adminServerAddr", "", "Addr of the admin server to connect to") + flagSet.IntVar(&f.HttpLoggerBufferSize, "loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook") + flagSet.IntVar(&f.HttpLoggerMaxBatchSize, "loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it") + flagSet.IntVar(&f.HttpLoggerMaxBatchCount, "loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch") + flagSet.DurationVar(&f.HttpLoggerBatchSendFreq, "loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq") + flagSet.DurationVar(&f.HttpLoggerTimeout, "loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout") + flagSet.StringVar(&f.ServerID, "serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default") + flagSet.DurationVar(&f.ProcessTimeout, "processTimeout", api.DefaultEventTimeout, "API request processing timeout") + flagSet.StringVar(&f.TargetLocker, "targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting") + flagSet.StringVar(&f.InstanceTag, "instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.") + flagSet.Var(&f.LogLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal") + flagSet.DurationVar(&f.PauseTimeout, "pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately") + flagSet.BoolVar(&f.ResumeJobs, "resumeJobs", false, "Attempt to resume paused jobs") + flagSet.DurationVar(&f.TargetLockDuration, "targetLockDuration", config.DefaultTargetLockDuration, "The amount of time target lock is extended by while the job is running. "+ "This is the maximum amount of time a job can stay paused safely.") + + if err := flagSet.Parse(args); err != nil { + return nil, err + } + + return f, nil } var userFunctions = []map[string]interface{}{ @@ -153,24 +161,24 @@ func registerPlugins(pluginRegistry *pluginregistry.PluginRegistry, pluginConfig // Main is the main function that executes the ConTest server. func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error { - initFlags(cmd) - if err := flagSet.Parse(args); err != nil { - return err + flags, err := parseFlags(cmd, args...) + if err != nil { + return fmt.Errorf("unable to parse the flags: %w", err) } clk := clock.New() ctx, cancel := context.WithCancel(context.Background()) - ctx = logging.WithBelt(ctx, logLevel) + ctx = logging.WithBelt(ctx, flags.LogLevel) - if *flagAdminServerAddr != "" { + if flags.AdminServerAddr != "" { httpHook, err := loggerhook.NewHttpHook(loggerhook.Config{ - Addr: *flagAdminServerAddr, - BufferSize: *flagHttpLoggerBufferSize, - MaxBatchSize: *flagHttpLoggerMaxBatchSize, - MaxBatchCount: *flagHttpLoggerMaxBatchCount, - BatchSendFreq: *flagHttpLoggerBatchSendFreq, - LogTimeout: *flagHttpLoggerTimeout, + Addr: flags.AdminServerAddr, + BufferSize: flags.HttpLoggerBufferSize, + MaxBatchSize: flags.HttpLoggerMaxBatchSize, + MaxBatchCount: flags.HttpLoggerMaxBatchCount, + BatchSendFreq: flags.HttpLoggerBatchSendFreq, + LogTimeout: flags.HttpLoggerTimeout, }) errmon.ObserveErrorCtx(ctx, err) if httpHook != nil { @@ -200,8 +208,8 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. }() // primary storage initialization - if *flagDBURI != "" { - primaryDBURI := *flagDBURI + if flags.DBURI != "" { + primaryDBURI := flags.DBURI log.Infof("Using database URI for primary storage: %s", primaryDBURI) s, err := rdbms.New(primaryDBURI) if err != nil { @@ -221,7 +229,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. // replica storage initialization // pointing to main database for now but can be used to point to replica - replicaDBURI := *flagDBURI + replicaDBURI := flags.DBURI log.Infof("Using database URI for replica storage: %s", replicaDBURI) r, err := rdbms.New(replicaDBURI) if err != nil { @@ -258,41 +266,41 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. } // set Locker engine - if *flagTargetLocker == "auto" { - if *flagDBURI != "" { - *flagTargetLocker = dblocker.Name + if flags.TargetLocker == "auto" { + if flags.DBURI != "" { + flags.TargetLocker = dblocker.Name } else { - *flagTargetLocker = inmemory.Name + flags.TargetLocker = inmemory.Name } - log.Infof("Locker engine set to auto, using %s", *flagTargetLocker) + log.Infof("Locker engine set to auto, using %s", flags.TargetLocker) } - switch *flagTargetLocker { + switch flags.TargetLocker { case inmemory.Name: target.SetLocker(inmemory.New(clk)) case dblocker.Name: - if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil { + if l, err := dblocker.New(flags.DBURI, dblocker.WithClock(clk)); err == nil { target.SetLocker(l) } else { - log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err) + log.Fatalf("Failed to create locker %q: %v", flags.TargetLocker, err) } default: - log.Fatalf("Invalid target locker name %q", *flagTargetLocker) + log.Fatalf("Invalid target locker name %q", flags.TargetLocker) } // spawn JobManager - listener := httplistener.New(*flagListenAddr) + listener := httplistener.New(flags.ListenAddr) opts := []jobmanager.Option{ - jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)), + jobmanager.APIOption(api.OptionEventTimeout(flags.ProcessTimeout)), } - if *flagServerID != "" { - opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID))) + if flags.ServerID != "" { + opts = append(opts, jobmanager.APIOption(api.OptionServerID(flags.ServerID))) } - if *flagInstanceTag != "" { - opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag)) + if flags.InstanceTag != "" { + opts = append(opts, jobmanager.OptionInstanceTag(flags.InstanceTag)) } - if *flagTargetLockDuration != 0 { - opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration)) + if flags.TargetLockDuration != 0 { + opts = append(opts, jobmanager.OptionTargetLockDuration(flags.TargetLockDuration)) } jm, err := jobmanager.New(listener, pluginRegistry, storageEngineVault, opts...) @@ -300,12 +308,12 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. log.Fatalf("%v", err) } - pauseTimeout := *flagPauseTimeout + pauseTimeout := flags.PauseTimeout go func() { intLevel := 0 // cancel immediately if pauseTimeout is zero - if *flagPauseTimeout == 0 { + if flags.PauseTimeout == 0 { intLevel = 1 } for { @@ -325,7 +333,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. if intLevel == 0 { log.Infof("Signal %q, pausing jobs", sig) pause() - if *flagPauseTimeout > 0 { + if flags.PauseTimeout > 0 { go func() { select { case <-ctx.Done(): @@ -344,7 +352,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. } }() - err = jm.Run(ctx, *flagResumeJobs) + err = jm.Run(ctx, flags.ResumeJobs) target.SetLocker(nil)