Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 80 additions & 72 deletions cmds/contest/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,55 @@ import (
"github.com/linuxboot/contest/plugins/listeners/httplistener"
)

var (
flagSet *flag.FlagSet
flagDBURI *string
flagListenAddr *string
flagServerID *string
flagProcessTimeout *time.Duration
flagTargetLocker *string
flagInstanceTag *string
flagPauseTimeout *time.Duration
flagResumeJobs *bool
flagTargetLockDuration *time.Duration
type flags struct {
DBURI string
ListenAddr string
ServerID string
ProcessTimeout time.Duration
TargetLocker string
InstanceTag string
PauseTimeout time.Duration
ResumeJobs bool
TargetLockDuration time.Duration
// http logger parameters
flagAdminServerAddr *string
flagHttpLoggerBufferSize *int
flagHttpLoggerMaxBatchSize *int
flagHttpLoggerMaxBatchCount *int
flagHttpLoggerBatchSendFreq *time.Duration
flagHttpLoggerTimeout *time.Duration
logLevel = logger.LevelDebug
)
AdminServerAddr string
HttpLoggerBufferSize int
HttpLoggerMaxBatchSize int
HttpLoggerMaxBatchCount int
HttpLoggerBatchSendFreq time.Duration
HttpLoggerTimeout time.Duration
LogLevel logger.Level
}

func initFlags(cmd string) {
flagSet = flag.NewFlagSet(cmd, flag.ContinueOnError)
flagDBURI = flagSet.String("dbURI", config.DefaultDBURI, "Database URI")
flagListenAddr = flagSet.String("listenAddr", ":8080", "Listen address and port")
flagAdminServerAddr = flagSet.String("adminServerAddr", "", "Addr of the admin server to connect to")
flagHttpLoggerBufferSize = flagSet.Int("loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
flagHttpLoggerMaxBatchSize = flagSet.Int("loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
flagHttpLoggerMaxBatchCount = flagSet.Int("loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
flagHttpLoggerBatchSendFreq = flagSet.Duration("loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
flagHttpLoggerTimeout = flagSet.Duration("loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
flagServerID = flagSet.String("serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
flagProcessTimeout = flagSet.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout")
flagTargetLocker = flagSet.String("targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
flagInstanceTag = flagSet.String("instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
flagSet.Var(&logLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
flagPauseTimeout = flagSet.Duration("pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
flagResumeJobs = flagSet.Bool("resumeJobs", false, "Attempt to resume paused jobs")
flagTargetLockDuration = flagSet.Duration("targetLockDuration", config.DefaultTargetLockDuration,
func parseFlags(cmd string, args ...string) (*flags, error) {
f := &flags{
LogLevel: logger.LevelDebug,
}
flagSet := flag.NewFlagSet(cmd, flag.ContinueOnError)
flagSet.StringVar(&f.DBURI, "dbURI", config.DefaultDBURI, "Database URI")
flagSet.StringVar(&f.ListenAddr, "listenAddr", ":8080", "Listen address and port")
flagSet.StringVar(&f.AdminServerAddr, "adminServerAddr", "", "Addr of the admin server to connect to")
flagSet.IntVar(&f.HttpLoggerBufferSize, "loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook")
flagSet.IntVar(&f.HttpLoggerMaxBatchSize, "loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it")
flagSet.IntVar(&f.HttpLoggerMaxBatchCount, "loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch")
flagSet.DurationVar(&f.HttpLoggerBatchSendFreq, "loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq")
flagSet.DurationVar(&f.HttpLoggerTimeout, "loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout")
flagSet.StringVar(&f.ServerID, "serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default")
flagSet.DurationVar(&f.ProcessTimeout, "processTimeout", api.DefaultEventTimeout, "API request processing timeout")
flagSet.StringVar(&f.TargetLocker, "targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting")
flagSet.StringVar(&f.InstanceTag, "instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
flagSet.Var(&f.LogLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
flagSet.DurationVar(&f.PauseTimeout, "pauseTimeout", 0, "SIGINT/SIGTERM shutdown timeout (seconds), after which pause will be escalated to cancellaton; -1 - no escalation, 0 - do not pause, cancel immediately")
flagSet.BoolVar(&f.ResumeJobs, "resumeJobs", false, "Attempt to resume paused jobs")
flagSet.DurationVar(&f.TargetLockDuration, "targetLockDuration", config.DefaultTargetLockDuration,
"The amount of time target lock is extended by while the job is running. "+
"This is the maximum amount of time a job can stay paused safely.")

if err := flagSet.Parse(args); err != nil {
return nil, err
}

return f, nil
}

var userFunctions = []map[string]interface{}{
Expand Down Expand Up @@ -153,24 +161,24 @@ func registerPlugins(pluginRegistry *pluginregistry.PluginRegistry, pluginConfig

// Main is the main function that executes the ConTest server.
func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error {
initFlags(cmd)
if err := flagSet.Parse(args); err != nil {
return err
flags, err := parseFlags(cmd, args...)
if err != nil {
return fmt.Errorf("unable to parse the flags: %w", err)
}

clk := clock.New()

ctx, cancel := context.WithCancel(context.Background())
ctx = logging.WithBelt(ctx, logLevel)
ctx = logging.WithBelt(ctx, flags.LogLevel)

if *flagAdminServerAddr != "" {
if flags.AdminServerAddr != "" {
httpHook, err := loggerhook.NewHttpHook(loggerhook.Config{
Addr: *flagAdminServerAddr,
BufferSize: *flagHttpLoggerBufferSize,
MaxBatchSize: *flagHttpLoggerMaxBatchSize,
MaxBatchCount: *flagHttpLoggerMaxBatchCount,
BatchSendFreq: *flagHttpLoggerBatchSendFreq,
LogTimeout: *flagHttpLoggerTimeout,
Addr: flags.AdminServerAddr,
BufferSize: flags.HttpLoggerBufferSize,
MaxBatchSize: flags.HttpLoggerMaxBatchSize,
MaxBatchCount: flags.HttpLoggerMaxBatchCount,
BatchSendFreq: flags.HttpLoggerBatchSendFreq,
LogTimeout: flags.HttpLoggerTimeout,
})
errmon.ObserveErrorCtx(ctx, err)
if httpHook != nil {
Expand Down Expand Up @@ -200,8 +208,8 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}()

// primary storage initialization
if *flagDBURI != "" {
primaryDBURI := *flagDBURI
if flags.DBURI != "" {
primaryDBURI := flags.DBURI
log.Infof("Using database URI for primary storage: %s", primaryDBURI)
s, err := rdbms.New(primaryDBURI)
if err != nil {
Expand All @@ -221,7 +229,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.

// replica storage initialization
// pointing to main database for now but can be used to point to replica
replicaDBURI := *flagDBURI
replicaDBURI := flags.DBURI
log.Infof("Using database URI for replica storage: %s", replicaDBURI)
r, err := rdbms.New(replicaDBURI)
if err != nil {
Expand Down Expand Up @@ -258,54 +266,54 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}

// set Locker engine
if *flagTargetLocker == "auto" {
if *flagDBURI != "" {
*flagTargetLocker = dblocker.Name
if flags.TargetLocker == "auto" {
if flags.DBURI != "" {
flags.TargetLocker = dblocker.Name
} else {
*flagTargetLocker = inmemory.Name
flags.TargetLocker = inmemory.Name
}
log.Infof("Locker engine set to auto, using %s", *flagTargetLocker)
log.Infof("Locker engine set to auto, using %s", flags.TargetLocker)
}
switch *flagTargetLocker {
switch flags.TargetLocker {
case inmemory.Name:
target.SetLocker(inmemory.New(clk))
case dblocker.Name:
if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil {
if l, err := dblocker.New(flags.DBURI, dblocker.WithClock(clk)); err == nil {
target.SetLocker(l)
} else {
log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err)
log.Fatalf("Failed to create locker %q: %v", flags.TargetLocker, err)
}
default:
log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
log.Fatalf("Invalid target locker name %q", flags.TargetLocker)
}

// spawn JobManager
listener := httplistener.New(*flagListenAddr)
listener := httplistener.New(flags.ListenAddr)

opts := []jobmanager.Option{
jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
jobmanager.APIOption(api.OptionEventTimeout(flags.ProcessTimeout)),
}
if *flagServerID != "" {
opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID)))
if flags.ServerID != "" {
opts = append(opts, jobmanager.APIOption(api.OptionServerID(flags.ServerID)))
}
if *flagInstanceTag != "" {
opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
if flags.InstanceTag != "" {
opts = append(opts, jobmanager.OptionInstanceTag(flags.InstanceTag))
}
if *flagTargetLockDuration != 0 {
opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration))
if flags.TargetLockDuration != 0 {
opts = append(opts, jobmanager.OptionTargetLockDuration(flags.TargetLockDuration))
}

jm, err := jobmanager.New(listener, pluginRegistry, storageEngineVault, opts...)
if err != nil {
log.Fatalf("%v", err)
}

pauseTimeout := *flagPauseTimeout
pauseTimeout := flags.PauseTimeout

go func() {
intLevel := 0
// cancel immediately if pauseTimeout is zero
if *flagPauseTimeout == 0 {
if flags.PauseTimeout == 0 {
intLevel = 1
}
for {
Expand All @@ -325,7 +333,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
if intLevel == 0 {
log.Infof("Signal %q, pausing jobs", sig)
pause()
if *flagPauseTimeout > 0 {
if flags.PauseTimeout > 0 {
go func() {
select {
case <-ctx.Done():
Expand All @@ -344,7 +352,7 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.
}
}()

err = jm.Run(ctx, *flagResumeJobs)
err = jm.Run(ctx, flags.ResumeJobs)

target.SetLocker(nil)

Expand Down