diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index e96e96d08..f7d43c12e 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -15,6 +15,7 @@ import ( v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/notifications" "github.com/okzk/sdnotify" "github.com/pkg/errors" "go.uber.org/zap" @@ -168,13 +169,32 @@ func run() int { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - go func() { - logger.Info("Starting history sync") + { + var extraStages map[string]history.StageFunc + if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { + logger.Info("Starting Icinga Notifications source") + + notificationsSource, err := notifications.NewNotificationsClient( + ctx, + db, + rc, + logs.GetChildLogger("notifications-source"), + cfg) + if err != nil { + logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err)) + } - if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) { - logger.Fatalf("%+v", err) + extraStages = notificationsSource.SyncExtraStages() } - }() + + go func() { + logger.Info("Starting history sync") + + if err := hs.Sync(ctx, extraStages); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + }() + } // Main loop for { diff --git a/config.example.yml b/config.example.yml index a8e66b9a8..a110be19f 100644 --- a/config.example.yml +++ b/config.example.yml @@ -139,3 +139,15 @@ redis: # flapping: # notification: # state: + +# Icinga DB can act as an event source for Icinga Notifications. If the following block is not empty, Icinga DB will +# submit events to the Icinga Notifications API. +#notifications-source: + # URL to the API root. +# api-base-url: http://localhost:5680 + + # Username to authenticate against the Icinga Notifications API. +# username: icingadb + + # Password for the defined user. +# password: insecureinsecure diff --git a/doc/01-About.md b/doc/01-About.md index c880ac00c..fea23d846 100644 --- a/doc/01-About.md +++ b/doc/01-About.md @@ -34,6 +34,10 @@ Icinga DB Web also connects to the Icinga 2 API with its Command Transport to ac These are the components of Icinga DB embedded into an Icinga setup with Icinga 2 and Icinga Web 2. +Since the Icinga DB daemon always receives the latest information from RedisĀ®, it is an ideal candidate to distribute information further. +In addition to inserting data into a relational database, Icinga DB can also forward events to [Icinga Notifications](https://icinga.com/docs/icinga-notifications/), +as described in the [configuration section](03-Configuration.md#notifications-source-configuration). + ## Installation To install Icinga DB see [Installation](02-Installation.md). diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index cccfd2233..a1a3a0693 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -146,7 +146,7 @@ ICINGADB_LOGGING_OPTIONS=database:error,high-availability:debug | runtime-updates | Runtime updates of config objects after the initial config synchronization. | | telemetry | Reporting of Icinga DB status to Icinga 2 via RedisĀ® (for monitoring purposes). | -## Retention +## Retention Configuration By default, no historical data is deleted, which means that the longer the data is retained, the more disk space is required to store it. History retention is an optional feature that allows to @@ -174,6 +174,20 @@ ICINGADB_RETENTION_OPTIONS=comment:356 | count | **Optional.** Number of old historical data a single query can delete in a `"DELETE FROM ... LIMIT count"` manner. Defaults to `5000`. | | options | **Optional.** Map of history category to number of days to retain its data. Available categories are `acknowledgement`, `comment`, `downtime`, `flapping`, `notification` and `state`. | +## Notifications Source Configuration + +Icinga DB can act as an event source for [Icinga Notifications](https://icinga.com/docs/icinga-notifications/). +If configured, Icinga DB will submit events to the Icinga Notifications API. + +For YAML configuration, the options are part of the `notifications-source` dictionary. +For environment variables, each option is prefixed with `ICINGADB_NOTIFICATIONS_SOURCE_`. + +| Option | Description | +|--------------|-----------------------------------------------------------------------------------| +| api-base-url | **Optional.** Icinga Notifications API base URL, such as `http://localhost:5680`. | +| username | **Optional.** Icinga Notifications API user for this source. | +| password | **Optional.** Icinga Notifications API user password. | + ## Appendix ### Duration String diff --git a/go.mod b/go.mod index cb6a0ac41..b3675c116 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/goccy/go-yaml v1.13.0 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 - github.com/icinga/icinga-go-library v0.7.2 + github.com/icinga/icinga-go-library v0.7.3-0.20251029100725-d59f989509ea github.com/jessevdk/go-flags v1.6.1 github.com/jmoiron/sqlx v1.4.0 github.com/mattn/go-sqlite3 v1.14.32 @@ -34,7 +34,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/redis/go-redis/v9 v9.10.0 // indirect + github.com/redis/go-redis/v9 v9.16.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index c849198af..190bc7471 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/icinga/icinga-go-library v0.7.2 h1:6ilUeE9F9OqxxJXNR9URWDf6zOqsdhjjR9w1MUXY9Kg= -github.com/icinga/icinga-go-library v0.7.2/go.mod h1:HZTiYD+N+9FZIVpPdUEJWJnc6sLvrIRO03jvkdkmUEU= +github.com/icinga/icinga-go-library v0.7.3-0.20251029100725-d59f989509ea h1:aCp3iiDJnfK0E1deHUhR2aRDgeLENoYArliP7jgo5HY= +github.com/icinga/icinga-go-library v0.7.3-0.20251029100725-d59f989509ea/go.mod h1:2QwCwu10qdiPhYu2lbAmcCmPzMbWHLWW/vcvRf/MXnI= github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4= github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= @@ -63,8 +63,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= -github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= +github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/internal/config/config.go b/internal/config/config.go index 503e6dbf6..359cf052e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,6 +4,7 @@ import ( "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icingadb/pkg/icingadb/history" "github.com/pkg/errors" @@ -15,10 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml" // Config defines Icinga DB config. type Config struct { - Database database.Config `yaml:"database" envPrefix:"DATABASE_"` - Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` - Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` - Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + Database database.Config `yaml:"database" envPrefix:"DATABASE_"` + Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"` + Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"` + Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"` + NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"` } func (c *Config) SetDefaults() { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 51976f1a1..6c547815c 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "golang.org/x/sync/errgroup" "reflect" + "slices" "sync" ) @@ -37,7 +38,10 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync } // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. -func (s Sync) Sync(ctx context.Context) error { +// +// The optional extraStages parameter allows specifying an additional extra stage for each pipeline, identified by their +// key. This stage is executed after every other stage, but before the entry gets deleted from Redis. +func (s Sync) Sync(ctx context.Context, extraStages map[string]StageFunc) error { g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines { @@ -62,6 +66,18 @@ func (s Sync) Sync(ctx context.Context) error { // it has processed it, even if the stage itself does not do anything with this specific entry. It should only // forward the entry after it has completed its own sync so that later stages can rely on previous stages being // executed successfully. + // + // If an extra stage exists for this key, it will be appended to the pipeline. Thus, it is executed after every + // other pipeline action, but before deleteFromRedis. + + // Shadowed variable to allow appending custom callbacks. + pipeline := pipeline + if extraStages != nil { + extraStage, ok := extraStages[key] + if ok { + pipeline = append(slices.Clip(pipeline), extraStage) + } + } ch := make([]chan redis.XMessage, len(pipeline)+1) for i := range ch { @@ -152,26 +168,25 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } counter.Add(uint64(len(ids))) - telemetry.Stats.History.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } } } -// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop +// StageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop // once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information // about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history // events from and an out channel to forward history entries to after processing them successfully. A stage function // is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On // error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis // and can be processed at a later time. -type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error +type StageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error -// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface. +// writeOneEntityStage creates a StageFunc from a pointer to a struct implementing the v1.UpserterEntity interface. // For each history event it receives, it parses that event into a new instance of that entity type and writes it to // the database. It writes exactly one entity to the database for each history event. -func writeOneEntityStage(structPtr any) stageFunc { +func writeOneEntityStage(structPtr any) StageFunc { structifier := structify.MakeMapStructifier( reflect.TypeOf(structPtr).Elem(), "json", @@ -190,9 +205,9 @@ func writeOneEntityStage(structPtr any) stageFunc { }) } -// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a +// writeMultiEntityStage creates a StageFunc from a function that takes a history event as an input and returns a // (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database. -func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc { +func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) StageFunc { return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { type State struct { Message redis.XMessage // Original event from Redis. @@ -304,7 +319,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse } } -// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed +// userNotificationStage is a specialized StageFunc that populates the user_notification_history table. It is executed // on the notification history stream and uses the users_notified_ids attribute to create an entry in the // user_notification_history relation table for each user ID. func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { @@ -361,32 +376,70 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re })(ctx, s, key, in, out) } -var syncPipelines = map[string][]stageFunc{ - "notification": { - writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history - userNotificationStage, // user_notification_history (depends on notification_history) - writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) +// countElementStage increments the [Stats.History] counter. +// +// This StageFunc should be called last in each syncPipeline. Thus, it is executed before the final +// Sync.deleteFromRedis call in Sync.Sync, but before optional extra stages, potentially blocking. +func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + defer close(out) + + for { + select { + case msg, ok := <-in: + if !ok { + return nil + } + + telemetry.Stats.History.Add(1) + out <- msg + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +const ( + SyncPipelineAcknowledgement = "acknowledgement" + SyncPipelineComment = "comment" + SyncPipelineDowntime = "downtime" + SyncPipelineFlapping = "flapping" + SyncPipelineNotification = "notification" + SyncPipelineState = "state" +) + +var syncPipelines = map[string][]StageFunc{ + SyncPipelineAcknowledgement: { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + countElementStage, }, - "state": { - writeOneEntityStage((*v1.StateHistory)(nil)), // state_history - writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) - writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + SyncPipelineComment: { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) + countElementStage, }, - "downtime": { + SyncPipelineDowntime: { writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime + countElementStage, }, - "comment": { - writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history - writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) - }, - "flapping": { + SyncPipelineFlapping: { writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) + countElementStage, }, - "acknowledgement": { - writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history - writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + SyncPipelineNotification: { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + countElementStage, + }, + SyncPipelineState: { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + countElementStage, }, } diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go index 969cd4728..e43f918d7 100644 --- a/pkg/icingadb/v1/history/downtime.go +++ b/pkg/icingadb/v1/history/downtime.go @@ -88,6 +88,15 @@ func (*HistoryDowntime) TableName() string { return "history" } +// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory. +// +// It is used in the notifications package and became necessary as values of both structs were required. +type DowntimeHistoryMeta struct { + DowntimeHistoryEntity `json:",inline"` + DowntimeHistory `json:",inline"` + HistoryMeta `json:",inline"` +} + type SlaHistoryDowntime struct { DowntimeHistoryEntity `json:",inline"` HistoryTableMeta `json:",inline"` diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index 78f5c5c67..3a5a4fdb4 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -14,17 +14,23 @@ import ( var Stats struct { // Config & co. are to be increased by the T sync once for every T object synced. - Config, State, History, Overdue, HistoryCleanup com.Counter + Config com.Counter + State com.Counter + History com.Counter + Overdue com.Counter + HistoryCleanup com.Counter + NotificationSync com.Counter } // WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) { counters := map[string]*com.Counter{ - "config_sync": &Stats.Config, - "state_sync": &Stats.State, - "history_sync": &Stats.History, - "overdue_sync": &Stats.Overdue, - "history_cleanup": &Stats.HistoryCleanup, + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, + "notification_sync": &Stats.NotificationSync, } periodic.Start(ctx, time.Second, func(_ periodic.Tick) { diff --git a/pkg/notifications/fetch.go b/pkg/notifications/fetch.go new file mode 100644 index 000000000..58b1a0d7a --- /dev/null +++ b/pkg/notifications/fetch.go @@ -0,0 +1,171 @@ +package notifications + +import ( + "context" + "encoding/json" + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/retry" + "github.com/icinga/icinga-go-library/types" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "time" +) + +// fetchHostServiceFromRedis retrieves the host and service names from Redis. +// +// If serviceId is nil, only the host name is fetched. Otherwise, both host and service name is fetched. +func (client *Client) fetchHostServiceFromRedis( + ctx context.Context, + hostId, serviceId types.Binary, +) (hostName string, serviceName string, err error) { + getNameFromRedis := func(ctx context.Context, typ, id string) (string, error) { + key := "icinga:" + typ + + var data string + err := retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + data, err = client.redisClient.HGet(ctx, key, id).Result() + return + }, + retry.Retryable, + backoff.DefaultBackoff, + retry.Settings{}, + ) + if err != nil { + return "", errors.Wrapf(err, "redis HGET %q, %q failed", key, id) + } + + var result struct { + Name string `json:"name"` + } + if err := json.Unmarshal([]byte(data), &result); err != nil { + return "", errors.Wrap(err, "failed to unmarshal redis result") + } + + return result.Name, nil + } + + hostName, err = getNameFromRedis(ctx, "host", hostId.String()) + if err != nil { + return + } + + if serviceId != nil { + serviceName, err = getNameFromRedis(ctx, "service", serviceId.String()) + if err != nil { + return + } + } + + return +} + +// customVar is used as an internal representation in Client.fetchCustomVarFromSql. +type customVar struct { + Name string `db:"name"` + Value types.String `db:"value"` +} + +// getValue returns this customvar's value as a string, transforming SQL NULLs to empty strings. +func (cv customVar) getValue() string { + if cv.Value.Valid { + return cv.Value.String + } + return "" +} + +// fetchCustomVarFromSql retrieves custom variables for the host and service from SQL. +// +// If serviceId is nil, only the host custom vars are fetched. Otherwise, both host and service custom vars are fetched. +func (client *Client) fetchCustomVarFromSql( + ctx context.Context, + hostId, serviceId types.Binary, +) (map[string]string, error) { + getCustomVarsFromSql := func(ctx context.Context, typ string, id types.Binary) ([]customVar, error) { + stmt, err := client.db.Preparex(client.db.Rebind( + `SELECT + customvar_flat.flatname AS name, + customvar_flat.flatvalue AS value + FROM ` + typ + `_customvar + JOIN customvar_flat + ON ` + typ + `_customvar.customvar_id = customvar_flat.customvar_id + WHERE ` + typ + `_customvar.` + typ + `_id = ?`)) + if err != nil { + return nil, err + } + + var customVars []customVar + if err := stmt.SelectContext(ctx, &customVars, id); err != nil { + return nil, err + } + + return customVars, nil + } + + customVars := make(map[string]string) + + hostVars, err := getCustomVarsFromSql(ctx, "host", hostId) + if err != nil { + return nil, err + } + + for _, hostVar := range hostVars { + customVars["host.vars."+hostVar.Name] = hostVar.getValue() + } + + if serviceId != nil { + serviceVars, err := getCustomVarsFromSql(ctx, "service", serviceId) + if err != nil { + return nil, err + } + + for _, serviceVar := range serviceVars { + customVars["service.vars."+serviceVar.Name] = serviceVar.getValue() + } + } + + return customVars, nil +} + +// hostServiceInformation contains the host name, an optional service name, and all custom variables. +// +// Returned from Client.fetchHostServiceData. +type hostServiceInformation struct { + hostName string + serviceName string + customVars map[string]string +} + +// fetchHostServiceData resolves the object names and fetches the associated custom variables. +// +// If serviceId is not nil, both host and service data will be queried. Otherwise, only host information is fetched. To +// acquire the information, the fetchHostServiceFromRedis and fetchCustomVarFromSql methods are used concurrently with +// a timeout of three seconds. +func (client *Client) fetchHostServiceData( + ctx context.Context, + hostId, serviceId types.Binary, +) (*hostServiceInformation, error) { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + ret := &hostServiceInformation{} + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + var err error + ret.hostName, ret.serviceName, err = client.fetchHostServiceFromRedis(ctx, hostId, serviceId) + return err + }) + g.Go(func() error { + var err error + ret.customVars, err = client.fetchCustomVarFromSql(ctx, hostId, serviceId) + return err + }) + + if err := g.Wait(); err != nil { + return nil, err + } + + return ret, nil +} diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go new file mode 100644 index 000000000..d79ee8bce --- /dev/null +++ b/pkg/notifications/notifications.go @@ -0,0 +1,534 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/notifications/event" + "github.com/icinga/icinga-go-library/notifications/source" + "github.com/icinga/icinga-go-library/redis" + "github.com/icinga/icinga-go-library/structify" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-go-library/utils" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/history" + v1history "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "reflect" + "sync" +) + +// Client is an Icinga Notifications compatible client implementation to push events to Icinga Notifications. +// +// A new Client should be created by the NewNotificationsClient function. New history entries can be submitted by +// calling the Client.Submit method. +type Client struct { + source.Config + + db *database.DB + logger *logging.Logger + + rulesInfo *source.RulesInfo // rulesInfo holds the latest rulesInfo fetched from Icinga Notifications. + + ctx context.Context + + notificationsClient *source.Client // The Icinga Notifications client used to interact with the API. + redisClient *redis.Client // redisClient is the Redis client used to fetch host and service names for events. + + submissionMutex sync.Mutex // submissionMutex protects not concurrent safe struct fields in Client.Submit, i.e., rulesInfo. +} + +// NewNotificationsClient creates a new Client connected to an existing database and logger. +func NewNotificationsClient( + ctx context.Context, + db *database.DB, + rc *redis.Client, + logger *logging.Logger, + cfg source.Config, +) (*Client, error) { + notificationsClient, err := source.NewClient(cfg, "Icinga DB "+internal.Version.Version) + if err != nil { + return nil, err + } + + return &Client{ + Config: cfg, + + db: db, + logger: logger, + + ctx: ctx, + + rulesInfo: &source.RulesInfo{}, + + notificationsClient: notificationsClient, + redisClient: rc, + }, nil +} + +// evaluateRulesForObject checks each rule against the Icinga DB SQL database and returns matching rule IDs. +// +// Within the Icinga Notifications relation database, the rules are stored in rule.object_filter as a JSON object +// created by Icinga DB Web. This object contains SQL queries with bindvars for the Icinga DB relational database, to be +// executed with the given host, service and environment IDs. If this query returns at least one row, the rule is +// considered as matching. +// +// Icinga DB Web's JSON structure is described in: +// - https://github.com/Icinga/icingadb-web/pull/1289 +// - https://github.com/Icinga/icingadb/pull/998#issuecomment-3442298348 +func (client *Client) evaluateRulesForObject(ctx context.Context, hostId, serviceId, environmentId types.Binary) ([]string, error) { + const icingaDbWebRuleVersion = 1 + + type IcingaDbWebQuery struct { + Query string `json:"query"` + Parameters []any `json:"parameters"` + } + + type IcingaDbWebRule struct { + Version int `json:"version"` // expect icingaDbWebRuleVersion + Queries struct { + Host *IcingaDbWebQuery `json:"host,omitempty"` + Service *IcingaDbWebQuery `json:"service,omitempty"` + } `json:"queries"` + } + + outRuleIds := make([]string, 0, len(client.rulesInfo.Rules)) + + for id, filterExpr := range client.rulesInfo.Rules { + if filterExpr == "" { + outRuleIds = append(outRuleIds, id) + continue + } + + var webRule IcingaDbWebRule + if err := json.Unmarshal([]byte(filterExpr), &webRule); err != nil { + return nil, errors.Wrap(err, "cannot decode rule filter expression as JSON into struct") + } + if version := webRule.Version; version != icingaDbWebRuleVersion { + return nil, errors.Errorf("decoded rule filter expression .Version is %d, %d expected", version, icingaDbWebRuleVersion) + } + + var webQuery IcingaDbWebQuery + if !serviceId.Valid() { + // Evaluate rule for a host object + if webRule.Queries.Host == nil { + continue + } + webQuery = *webRule.Queries.Host + } else { + // Evaluate rule for a service object + if webRule.Queries.Service == nil { + continue + } + webQuery = *webRule.Queries.Service + } + + queryArgs := make([]any, 0, len(webQuery.Parameters)) + for _, param := range webQuery.Parameters { + switch param { + case ":host_id": + queryArgs = append(queryArgs, hostId) + case ":service_id": + if !serviceId.Valid() { + return nil, errors.New("host rule filter expression contains :service_id for replacement") + } + queryArgs = append(queryArgs, serviceId) + case ":environment_id": + queryArgs = append(queryArgs, environmentId) + default: + queryArgs = append(queryArgs, param) + } + } + + matches, err := func() (bool, error) { + rows, err := client.db.QueryContext(ctx, client.db.Rebind(webQuery.Query), queryArgs...) + if err != nil { + return false, err + } + defer func() { _ = rows.Close() }() + + return rows.Next(), nil + }() + if err != nil { + return nil, errors.Wrapf(err, "cannot fetch rule %q from %q", id, filterExpr) + } else if !matches { + continue + } + outRuleIds = append(outRuleIds, id) + } + + return outRuleIds, nil +} + +// buildCommonEvent creates an event.Event based on Host and (optional) Service IDs. +// +// This function is used by all event builders to create a common event structure that includes the host and service +// names, an Icinga DB Web reference, and the tags for the event. +// Any event type-specific information (like severity, message, etc.) is added by the specific event builders. +func (client *Client) buildCommonEvent( + ctx context.Context, + hostId, serviceId types.Binary, +) (*event.Event, *hostServiceInformation, error) { + info, err := client.fetchHostServiceData(ctx, hostId, serviceId) + if err != nil { + return nil, nil, err + } + + var ( + objectName string + objectUrl string + objectTags map[string]string + ) + + if info.serviceName != "" { + objectName = info.hostName + "!" + info.serviceName + objectUrl = "/icingadb/service?name=" + utils.RawUrlEncode(info.serviceName) + "&host.name=" + utils.RawUrlEncode(info.hostName) + objectTags = map[string]string{ + "host": info.hostName, + "service": info.serviceName, + } + } else { + objectName = info.hostName + objectUrl = "/icingadb/host?name=" + utils.RawUrlEncode(info.hostName) + objectTags = map[string]string{ + "host": info.hostName, + } + } + + return &event.Event{ + Name: objectName, + URL: objectUrl, + Tags: objectTags, + ExtraTags: info.customVars, + }, info, nil +} + +// buildStateHistoryEvent builds a fully initialized event.Event from a state history entry. +// +// The resulted event will have all the necessary information for a state change event, and must +// not be further modified by the caller. +func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) { + ev, info, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + ev.Type = event.TypeState + + if info.serviceName != "" { + switch h.HardState { + case 0: + ev.Severity = event.SeverityOK + case 1: + ev.Severity = event.SeverityWarning + case 2: + ev.Severity = event.SeverityCrit + case 3: + ev.Severity = event.SeverityErr + default: + return nil, fmt.Errorf("unexpected service state %d", h.HardState) + } + } else { + switch h.HardState { + case 0: + ev.Severity = event.SeverityOK + case 1: + ev.Severity = event.SeverityCrit + default: + return nil, fmt.Errorf("unexpected host state %d", h.HardState) + } + } + + if h.Output.Valid { + ev.Message = h.Output.String + } + if h.LongOutput.Valid { + ev.Message += "\n" + h.LongOutput.String + } + + return ev, nil +} + +// buildDowntimeHistoryMetaEvent from a downtime history entry. +func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + switch h.EventType { + case "downtime_start": + ev.Type = event.TypeDowntimeStart + ev.Username = h.Author + ev.Message = h.Comment + ev.Mute = types.MakeBool(true) + ev.MuteReason = "Checkable is in downtime" + + case "downtime_end": + ev.Mute = types.MakeBool(false) + if h.HasBeenCancelled.Valid && h.HasBeenCancelled.Bool { + ev.Type = event.TypeDowntimeRemoved + ev.Message = "Downtime was cancelled" + + if h.CancelledBy.Valid { + ev.Username = h.CancelledBy.String + } + } else { + ev.Type = event.TypeDowntimeEnd + ev.Message = "Downtime expired" + } + + default: + return nil, fmt.Errorf("unexpected event type %q", h.EventType) + } + + return ev, nil +} + +// buildFlappingHistoryEvent from a flapping history entry. +func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + if h.PercentStateChangeEnd.Valid { + ev.Type = event.TypeFlappingEnd + ev.Message = fmt.Sprintf( + "Checkable stopped flapping (Current flapping value %.2f%% < low threshold %.2f%%)", + h.PercentStateChangeEnd.Float64, h.FlappingThresholdLow) + ev.Mute = types.MakeBool(false) + } else if h.PercentStateChangeStart.Valid { + ev.Type = event.TypeFlappingStart + ev.Message = fmt.Sprintf( + "Checkable started flapping (Current flapping value %.2f%% > high threshold %.2f%%)", + h.PercentStateChangeStart.Float64, h.FlappingThresholdHigh) + ev.Mute = types.MakeBool(true) + ev.MuteReason = "Checkable is flapping" + } else { + return nil, errors.New("flapping history entry has neither percent_state_change_start nor percent_state_change_end") + } + + return ev, nil +} + +// buildAcknowledgementHistoryEvent from an acknowledgment history entry. +func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) { + ev, _, err := client.buildCommonEvent(ctx, h.HostId, h.ServiceId) + if err != nil { + return nil, errors.Wrapf(err, "cannot build event for %q,%q", h.HostId, h.ServiceId) + } + + if !h.ClearTime.Time().IsZero() { + ev.Type = event.TypeAcknowledgementCleared + ev.Message = "Acknowledgement was cleared" + ev.Mute = types.MakeBool(false) + + if h.ClearedBy.Valid { + ev.Username = h.ClearedBy.String + } + } else if !h.SetTime.Time().IsZero() { + ev.Type = event.TypeAcknowledgementSet + ev.Mute = types.MakeBool(true) + ev.MuteReason = "Checkable was acknowledged" + + if h.Comment.Valid { + ev.Message = h.Comment.String + } else { + ev.Message = "Checkable was acknowledged" + } + + if h.Author.Valid { + ev.Username = h.Author.String + } + } else { + return nil, errors.New("acknowledgment history entry has neither a set_time nor a clear_time") + } + + return ev, nil +} + +// Submit this [database.Entity] to the Icinga Notifications API. +// +// Based on the entity's type, a different kind of event will be constructed. The event will be sent to the API in a +// blocking fashion. +// +// Returns true if this entity was processed or cannot be processed any further. Returns false if this entity should be +// retried later. +// +// This method usees the Client's logger. +func (client *Client) Submit(entity database.Entity) bool { + if client.ctx.Err() != nil { + client.logger.Errorw("Cannot process submitted entity as client context is done", zap.Error(client.ctx.Err())) + return true + } + + var ( + ev *event.Event + eventErr error + metaHistory v1history.HistoryTableMeta + ) + + // Keep the type switch in sync with the values of SyncKeyStructPtrs below. + switch h := entity.(type) { + case *v1history.AcknowledgementHistory: + ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.DowntimeHistoryMeta: + ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.FlappingHistory: + ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + case *v1history.StateHistory: + if h.StateType != common.HardState { + return true + } + ev, eventErr = client.buildStateHistoryEvent(client.ctx, h) + metaHistory = h.HistoryTableMeta + + default: + client.logger.Error("Cannot process unsupported type", zap.String("type", fmt.Sprintf("%T", h))) + return true + } + + if eventErr != nil { + client.logger.Errorw("Cannot build event from history entry", + zap.String("type", fmt.Sprintf("%T", entity)), + zap.Error(eventErr)) + return true + } else if ev == nil { + // This really should not happen. + client.logger.Errorw("No event was built, but no error was reported", + zap.String("type", fmt.Sprintf("%T", entity))) + return true + } + + eventLogger := client.logger.With(zap.Object( + "event", + zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error { + encoder.AddString("name", ev.Name) + encoder.AddString("type", ev.Type.String()) + return nil + }), + )) + + // The following code accesses Client.rulesInfo. + client.submissionMutex.Lock() + defer client.submissionMutex.Unlock() + + // This loop allows resubmitting an event if the rules have changed. The first try would be the rule update, the + // second try would be the resubmit, and the third try would be for bad luck, e.g., when a second rule update just + // crept in between. If there are three subsequent rule updates, something is wrong. + for try := 0; try < 3; try++ { + eventRuleIds, err := client.evaluateRulesForObject( + client.ctx, + metaHistory.HostId, + metaHistory.ServiceId, + metaHistory.EnvironmentId) + if err != nil { + // While returning false would be more correct, this would result in never being able to refetch new rule + // versions. Consider an invalid object filter expression, which is now impossible to get rid of. + eventLogger.Errorw("Cannot evaluate rules for event, assuming no rule matched", zap.Error(err)) + eventRuleIds = []string{} + } + + ev.RulesVersion = client.rulesInfo.Version + ev.RuleIds = eventRuleIds + + newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev) + if errors.Is(err, source.ErrRulesOutdated) { + eventLogger.Infow("Received a rule update from Icinga Notification, resubmitting event", + zap.String("old_rules_version", client.rulesInfo.Version), + zap.String("new_rules_version", newEventRules.Version)) + + client.rulesInfo = newEventRules + + continue + } else if err != nil { + eventLogger.Errorw("Cannot submit event to Icinga Notifications, will be retried", + zap.String("rules_version", client.rulesInfo.Version), + zap.Any("rules", eventRuleIds), + zap.Error(err)) + return false + } + + eventLogger.Debugw("Successfully submitted event to Icinga Notifications", zap.Any("rules", eventRuleIds)) + return true + } + + eventLogger.Error("Received three rule updates from Icinga Notifications in a row, event will be retried") + return false +} + +// SyncExtraStages returns a map of history sync keys to [history.StageFunc] to be used for [history.Sync]. +// +// Passing the return value of this method as the extraStages parameter to [history.Sync] results in forwarding events +// from the Icinga DB history stream to Icinga Notifications after being resorted via the StreamSorter. +func (client *Client) SyncExtraStages() map[string]history.StageFunc { + var syncKeyStructPtrs = map[string]any{ + history.SyncPipelineAcknowledgement: (*v1history.AcknowledgementHistory)(nil), + history.SyncPipelineDowntime: (*v1history.DowntimeHistoryMeta)(nil), + history.SyncPipelineFlapping: (*v1history.FlappingHistory)(nil), + history.SyncPipelineState: (*v1history.StateHistory)(nil), + } + + sorterCallbackFn := func(msg redis.XMessage, key string) bool { + makeEntity := func(key string, values map[string]interface{}) (database.Entity, error) { + structPtr, ok := syncKeyStructPtrs[key] + if !ok { + return nil, fmt.Errorf("key is not part of keyStructPtrs") + } + + structifier := structify.MakeMapStructifier( + reflect.TypeOf(structPtr).Elem(), + "json", + contracts.SafeInit) + val, err := structifier(values) + if err != nil { + return nil, errors.Wrapf(err, "can't structify values %#v for %q", values, key) + } + + entity, ok := val.(database.Entity) + if !ok { + return nil, fmt.Errorf("structifier returned %T which does not implement database.Entity", val) + } + + return entity, nil + } + + entity, err := makeEntity(key, msg.Values) + if err != nil { + client.logger.Errorw("Failed to create database.Entity out of Redis stream message", + zap.Error(err), + zap.String("key", key), + zap.String("id", msg.ID)) + return false + } + + success := client.Submit(entity) + if success { + telemetry.Stats.NotificationSync.Add(1) + } + return success + } + + pipelineFn := NewStreamSorter(client.ctx, client.logger, sorterCallbackFn).PipelineFunc + + extraStages := make(map[string]history.StageFunc) + for k := range syncKeyStructPtrs { + extraStages[k] = pipelineFn + } + + return extraStages +} diff --git a/pkg/notifications/sorter.go b/pkg/notifications/sorter.go new file mode 100644 index 000000000..4fb6aa54c --- /dev/null +++ b/pkg/notifications/sorter.go @@ -0,0 +1,417 @@ +package notifications + +import ( + "container/heap" + "context" + "fmt" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/redis" + "github.com/icinga/icingadb/pkg/icingadb/history" + "github.com/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "strconv" + "strings" + "time" +) + +// parseRedisStreamId parses a Redis Stream ID and returns the timestamp in ms and the sequence number, or an error. +func parseRedisStreamId(redisStreamId string) (int64, int64, error) { + dashPos := strings.IndexRune(redisStreamId, '-') + if dashPos <= 0 { + return 0, 0, errors.Errorf("value %q does not satisfy Redis Stream ID pattern", redisStreamId) + } + + ms, err := strconv.ParseInt(redisStreamId[:dashPos], 10, 64) + if err != nil { + return 0, 0, errors.Wrapf( + err, + "timestamp part of the Redis Stream ID %q cannot be parsed to int", redisStreamId) + } + + seq, err := strconv.ParseInt(redisStreamId[dashPos+1:], 10, 64) + if err != nil { + return 0, 0, errors.Wrapf( + err, + "sequence number of the Redis Stream ID %q cannot be parsed to int", redisStreamId) + } + + return ms, seq, nil +} + +// streamSorterSubmission is one submission to a StreamSorter, allowing to be sorted by the Redis Stream ID - both via +// timestamp and the sequence number as a fallback - as well as the submission timestamp for duplicates if milliseconds +// are not precise enough. +type streamSorterSubmission struct { + // msg is the Redis message to be forwarded to out after this submission was sorted. + msg redis.XMessage + key string + out chan<- redis.XMessage + + // Required for sorting. + streamIdMs int64 // streamIdMs is the Redis Stream ID timestamp part (milliseconds) + streamIdSeq int64 // streamIdSeq is the Redis Stream ID sequence number + submitTime time.Time // submitTime is the timestamp when the element was submitted +} + +// MarshalLogObject implements [zapcore.ObjectMarshaler]. +func (sub *streamSorterSubmission) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("redis-id-ms", sub.streamIdMs) + encoder.AddInt64("redis-id-seq", sub.streamIdSeq) + encoder.AddTime("submit-time", sub.submitTime) + encoder.AddString("out", fmt.Sprint(sub.out)) + + return nil +} + +// streamSorterSubmissions implements [heap.Interface] for []streamSorterSubmission. +type streamSorterSubmissions []*streamSorterSubmission + +// Len implements [sort.Interface] required by [heap.Interface]. +func (subs streamSorterSubmissions) Len() int { return len(subs) } + +// Swap implements [sort.Interface] required by [heap.Interface]. +func (subs streamSorterSubmissions) Swap(i, j int) { subs[i], subs[j] = subs[j], subs[i] } + +// Less implements [sort.Interface] required by [heap.Interface]. +func (subs streamSorterSubmissions) Less(i, j int) bool { + a, b := subs[i], subs[j] + if a.streamIdMs != b.streamIdMs { + return a.streamIdMs < b.streamIdMs + } + if a.streamIdSeq != b.streamIdSeq { + return a.streamIdSeq < b.streamIdSeq + } + return a.submitTime.Before(b.submitTime) +} + +// Push implements [heap.Interface]. +func (subs *streamSorterSubmissions) Push(x any) { + sub, ok := x.(*streamSorterSubmission) + if !ok { + panic(fmt.Sprintf("streamSorterSubmissions.Push received x of %T", x)) + } + + *subs = append(*subs, sub) +} + +// Pop implements [heap.Interface]. +func (subs *streamSorterSubmissions) Pop() any { + old := *subs + n := len(old) + x := old[n-1] + *subs = old[0 : n-1] + return x +} + +// Peek returns the smallest element from the heap without removing it, or nil if the heap is empty. +func (subs streamSorterSubmissions) Peek() *streamSorterSubmission { + if len(subs) > 0 { + return subs[0] + } else { + return nil + } +} + +// StreamSorter is a helper that can used to intercept messages from different history sync pipelines and passes them +// to a callback in the order given by their Redis Stream ID (sorted across all involved streams). +// +// After a message is received, it is kept in a priority queue for three seconds to wait for possible messages from +// another stream with a smaller ID. Thus, if a message is received delayed for more than three seconds, it will be +// relayed out of order. The StreamSorter is only able to ensure order to a certain degree of chaos. +// +// The callback function receives the [redis.XMessage] together with the Redis stream name (key) for additional +// context. The callback function is supposed to return true on success. Otherwise, the callback will be retried until +// it succeeds. +type StreamSorter struct { + ctx context.Context + logger *logging.Logger + callbackFn func(redis.XMessage, string) bool + submissionCh chan *streamSorterSubmission + + // registerOutCh is used by PipelineFunc() to register output channels with worker() + registerOutCh chan chan<- redis.XMessage + + // closeOutCh is used by PipelineFunc() to signal to worker() that there will be no more submissions destined for + // that output channel and it can be closed by the worker after it processed all pending submissions for it. + closeOutCh chan chan<- redis.XMessage + + // The following fields should only be changed for the tests. + + // callbackMaxDelay is the maximum delay for continuously failing callbacks. Defaults to 10s. + callbackMaxDelay time.Duration + // submissionMinAge is the minimum age for a submission before being forwarded. Defaults to 3s. + submissionMinAge time.Duration + + // isVerbose implies a isVerbose debug logging. Don't think one want to have this outside the tests. + isVerbose bool +} + +// NewStreamSorter creates a StreamSorter honoring the given context and returning elements to the callback function. +func NewStreamSorter( + ctx context.Context, + logger *logging.Logger, + callbackFn func(msg redis.XMessage, key string) bool, +) *StreamSorter { + sorter := &StreamSorter{ + ctx: ctx, + logger: logger, + callbackFn: callbackFn, + submissionCh: make(chan *streamSorterSubmission), + registerOutCh: make(chan chan<- redis.XMessage), + closeOutCh: make(chan chan<- redis.XMessage), + callbackMaxDelay: 10 * time.Second, + submissionMinAge: 3 * time.Second, + } + + go sorter.worker() + + return sorter +} + +// verbose produces a debug log messages if StreamSorter.isVerbose is set. +func (sorter *StreamSorter) verbose(msg string, keysAndValues ...any) { + // When used in tests and the test context is done, using the logger results in a data race. Since there are a few + // log messages which might occur after the test has finished, better not log at all. + // https://github.com/uber-go/zap/issues/687#issuecomment-473382859 + if sorter.ctx.Err() != nil { + return + } + + if !sorter.isVerbose { + return + } + + sorter.logger.Debugw(msg, keysAndValues...) +} + +// startCallback initiates the callback in a background goroutine and returns a channel that is closed once the callback +// has succeeded. It retries the callback with a backoff until it signal success by returning true. +func (sorter *StreamSorter) startCallback(msg redis.XMessage, key string) <-chan struct{} { + callbackCh := make(chan struct{}) + + go func() { + defer close(callbackCh) + + callbackDelay := time.Duration(0) + + for try := 0; ; try++ { + select { + case <-sorter.ctx.Done(): + return + case <-time.After(callbackDelay): + } + + start := time.Now() + success := sorter.callbackFn(msg, key) + + sorter.verbose("startCallback: finished executing callbackFn", + zap.String("id", msg.ID), + zap.Bool("success", success), + zap.Int("try", try), + zap.Duration("duration", time.Since(start)), + zap.Duration("next-delay", callbackDelay)) + + if success { + return + } else { + callbackDelay = min(max(time.Millisecond, 2*callbackDelay), sorter.callbackMaxDelay) + } + } + }() + + return callbackCh +} + +// worker is the background worker, started in a goroutine from NewStreamSorter, reacts upon messages from the channels, +// and runs until the StreamSorter.ctx is done. +func (sorter *StreamSorter) worker() { + // When a streamSorterSubmission is created in the submit method, the current time.Time is added to the struct. + // Only if the submission was at least three seconds (submissionMinAge) ago, a popped submission from the heap will + // be passed to startCallback in its own goroutine to execute the callback function. + var submissionHeap streamSorterSubmissions + + // Each registered output is stored in the registeredOutputs map, mapping output channels to the following struct. + // It counts pending submissions in the heap for each received submission from submissionCh and can be marked as + // closed to be cleaned up after its work is done. + type OutputState struct { + pending int + close bool + } + + registeredOutputs := make(map[chan<- redis.XMessage]*OutputState) + + // Close all registered outputs when we exit. + defer func() { + for out := range registeredOutputs { + close(out) + } + }() + + // If a submission is currently given to the callback via startCallback, these two variables are not nil. After the + // callback has finished, the channel will be closed. + var runningSubmission *streamSorterSubmission + var runningCallbackCh <-chan struct{} + + for { + if (runningSubmission == nil) != (runningCallbackCh == nil) { + panic(fmt.Sprintf("inconsistent state: runningSubmission=%#v and runningCallbackCh=%#v", + runningSubmission, runningCallbackCh)) + } + + var nextSubmissionDue <-chan time.Time + if runningCallbackCh == nil { + if next := submissionHeap.Peek(); next != nil { + if submissionAge := time.Since(next.submitTime); submissionAge >= sorter.submissionMinAge { + runningCallbackCh = sorter.startCallback(next.msg, next.key) + runningSubmission = next + heap.Pop(&submissionHeap) + } else { + nextSubmissionDue = time.After(sorter.submissionMinAge - submissionAge) + } + } + } + + select { + case out := <-sorter.registerOutCh: + sorter.verbose("worker: register output", zap.String("out", fmt.Sprint(out))) + if _, ok := registeredOutputs[out]; ok { + panic("attempting to register the same output channel twice") + } + registeredOutputs[out] = &OutputState{} + // This function is now responsible for closing out. + + case out := <-sorter.closeOutCh: + if state := registeredOutputs[out]; state == nil { + panic("requested to close unknown output channel") + } else if state.pending > 0 { + // Still pending work, mark the output and wait for it to complete. + state.close = true + } else { + // Output can be closed and unregistered immediately + close(out) + delete(registeredOutputs, out) + } + + case sub := <-sorter.submissionCh: + sorter.verbose("worker: push submission to heap", zap.Object("submission", sub)) + + if state := registeredOutputs[sub.out]; state == nil { + panic("submission for an unknown output channel") + } else { + state.pending++ + heap.Push(&submissionHeap, sub) + } + + case <-nextSubmissionDue: + // Loop start processing of the next submission. + continue + + case <-runningCallbackCh: + out := runningSubmission.out + out <- runningSubmission.msg + state := registeredOutputs[out] + state.pending-- + if state.close && state.pending == 0 { + close(out) + delete(registeredOutputs, out) + } + + runningCallbackCh = nil + runningSubmission = nil + + case <-sorter.ctx.Done(): + return + } + } +} + +// submit a [redis.XMessage] to the StreamSorter. +func (sorter *StreamSorter) submit(msg redis.XMessage, key string, out chan<- redis.XMessage) error { + ms, seq, err := parseRedisStreamId(msg.ID) + if err != nil { + return errors.Wrap(err, "cannot parse Redis Stream ID") + } + + submission := &streamSorterSubmission{ + msg: msg, + key: key, + out: out, + streamIdMs: ms, + streamIdSeq: seq, + submitTime: time.Now(), + } + + select { + case sorter.submissionCh <- submission: + return nil + + case <-time.After(time.Second): + return errors.New("submission timed out") + + case <-sorter.ctx.Done(): + return sorter.ctx.Err() + } +} + +// PipelineFunc implements the [history.StageFunc] type expected for a history sync pipeline stage. +// +// This method of a single StreamSorter can be inserted into multiple history sync pipelines and will forward all +// messages from in to out as expected from a pipeline stage. In between, all messages are processed by the +// StreamSorter, which correlates the messages from different pipelines and additionally passes them to a callback +// according to its specification (see the comment on the StreamSorter type). +func (sorter *StreamSorter) PipelineFunc( + ctx context.Context, + _ history.Sync, + key string, + in <-chan redis.XMessage, + out chan<- redis.XMessage, +) error { + + // Register output channel with worker. + select { + case sorter.registerOutCh <- out: + // Success, worker is now responsible for closing the channel. + + case <-ctx.Done(): + close(out) + return ctx.Err() + + case <-sorter.ctx.Done(): + close(out) + return sorter.ctx.Err() + } + + // If we exit, signal to the worker that no more work for this channel will be submitted. + defer func() { + select { + case sorter.closeOutCh <- out: + // Success, worker will close the output channel eventually. + + case <-sorter.ctx.Done(): + // Worker will quit entirely, closing all output channels. + } + }() + + for { + select { + case msg, ok := <-in: + if !ok { + return nil + } + + err := sorter.submit(msg, key, out) + if err != nil { + sorter.logger.Errorw("Failed to submit Redis stream event to stream sorter", + zap.String("key", key), + zap.Error(err)) + } + + case <-ctx.Done(): + return ctx.Err() + + case <-sorter.ctx.Done(): + return sorter.ctx.Err() + } + } +} diff --git a/pkg/notifications/sorter_test.go b/pkg/notifications/sorter_test.go new file mode 100644 index 000000000..21f490796 --- /dev/null +++ b/pkg/notifications/sorter_test.go @@ -0,0 +1,369 @@ +// #nosec G404 -- Allow math/rand for the tests +package notifications + +import ( + "cmp" + "context" + "fmt" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/redis" + "github.com/icinga/icingadb/pkg/icingadb/history" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "math/rand" + "sort" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +func Test_redisStreamIdToMs(t *testing.T) { + tests := []struct { + name string + input string + wantMs int64 + wantSeq int64 + wantErr bool + }{ + { + name: "epoch", + input: "0-0", + }, + { + name: "valid", + input: "1761658169701-0", + wantMs: 1761658169701, + }, + { + name: "valid sequence", + input: "1761658169701-23", + wantMs: 1761658169701, + wantSeq: 23, + }, + { + name: "invalid format", + input: "23-42-23", + wantErr: true, + }, + { + name: "missing first part", + input: "-23", + wantErr: true, + }, + { + name: "missing second part", + input: "23-", + wantErr: true, + }, + { + name: "only dash", + input: "-", + wantErr: true, + }, + { + name: "just invalid", + input: "oops", + wantErr: true, + }, + { + name: "invalid field types", + input: "0x23-0x42", + wantErr: true, + }, + { + name: "number too big", + input: "22222222222222222222222222222222222222222222222222222222222222222222222222222222222222222-0", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotMs, gotSeq, err := parseRedisStreamId(tt.input) + require.Equal(t, tt.wantErr, err != nil, "error differs %v", err) + require.Equal(t, tt.wantMs, gotMs, "ms from Redis Stream ID differs") + require.Equal(t, tt.wantSeq, gotSeq, "seq from Redis Stream ID differs") + }) + } +} + +func Test_streamSorterSubmissions(t *testing.T) { + mkSubmitTime := func(offset int) time.Time { + return time.Date(2009, 11, 10, 23, 0, 0, offset, time.UTC) + } + submissions := streamSorterSubmissions{ + {streamIdMs: 0, streamIdSeq: 0, submitTime: mkSubmitTime(0)}, + {streamIdMs: 1, streamIdSeq: 0, submitTime: mkSubmitTime(0)}, + {streamIdMs: 1, streamIdSeq: 1, submitTime: mkSubmitTime(0)}, + {streamIdMs: 2, streamIdSeq: 0, submitTime: mkSubmitTime(0)}, + {streamIdMs: 2, streamIdSeq: 0, submitTime: mkSubmitTime(1)}, + {streamIdMs: 3, streamIdSeq: 0, submitTime: mkSubmitTime(0)}, + {streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(0)}, + {streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(1)}, + {streamIdMs: 3, streamIdSeq: 1, submitTime: mkSubmitTime(2)}, + } + + submissionsRand := make(streamSorterSubmissions, 0, len(submissions)) + for _, i := range rand.Perm(len(submissions)) { + submissionsRand = append(submissionsRand, submissions[i]) + } + + sort.Sort(submissionsRand) + require.Equal(t, submissions, submissionsRand) +} + +func TestStreamSorter(t *testing.T) { + tests := []struct { + name string + messages int + producers int + producersEarlyClose int + callbackMaxDelayMs int + callbackSuccessPercent int + expectTimeout bool + outMaxDelayMs int + }{ + { + name: "baseline", + messages: 10, + producers: 1, + callbackSuccessPercent: 100, + }, + { + name: "simple", + messages: 100, + producers: 10, + callbackSuccessPercent: 100, + }, + { + name: "many producers", + messages: 100, + producers: 100, + callbackSuccessPercent: 100, + }, + { + name: "many messages", + messages: 1000, + producers: 10, + callbackSuccessPercent: 100, + }, + { + name: "callback a bit unreliable", + messages: 50, + producers: 10, + callbackSuccessPercent: 70, + }, + { + name: "callback coin flip", + messages: 50, + producers: 10, + callbackSuccessPercent: 50, + }, + { + name: "callback unreliable", + messages: 25, + producers: 5, + callbackSuccessPercent: 30, + }, + { + name: "callback total rejection", + messages: 10, + producers: 1, + callbackSuccessPercent: 0, + expectTimeout: true, + }, + { + name: "callback slow", + messages: 100, + producers: 10, + callbackMaxDelayMs: 3000, + callbackSuccessPercent: 100, + }, + { + name: "out slow", + messages: 100, + producers: 10, + callbackSuccessPercent: 100, + outMaxDelayMs: 1000, + }, + { + name: "producer out early close", + messages: 100, + producers: 10, + producersEarlyClose: 5, + callbackMaxDelayMs: 1000, + callbackSuccessPercent: 100, + }, + { + name: "pure chaos", + messages: 50, + producers: 10, + callbackMaxDelayMs: 3000, + callbackSuccessPercent: 50, + outMaxDelayMs: 1000, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Callback functions after reordering + var ( + callbackCollection []string + callbackCollectionMutex sync.Mutex + callbackFn = func(msg redis.XMessage, _ string) bool { + if tt.callbackMaxDelayMs > 0 { + time.Sleep(time.Duration(rand.Int63n(int64(tt.callbackMaxDelayMs))) * time.Microsecond) + } + + if rand.Int63n(100)+1 > int64(tt.callbackSuccessPercent) { + return false + } + + callbackCollectionMutex.Lock() + defer callbackCollectionMutex.Unlock() + callbackCollection = append(callbackCollection, msg.ID) + return true + } + ) + + // Out channel after reordering and callback + var ( + outCounterCh = make(chan struct{}) + outConsumer = func(out chan redis.XMessage) { + for { + if tt.outMaxDelayMs > 0 { + time.Sleep(time.Duration(rand.Int63n(int64(tt.outMaxDelayMs))) * time.Microsecond) + } + + _, ok := <-out + if !ok { + return + } + outCounterCh <- struct{}{} + } + } + ) + + // Decreasing counter for messages to be sent + var ( + inCounter = tt.messages + inCounterMutex sync.Mutex + ) + + sorter := NewStreamSorter( + t.Context(), + logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second), + callbackFn) + sorter.callbackMaxDelay = 100 * time.Millisecond + sorter.submissionMinAge = 30 * time.Millisecond + sorter.isVerbose = true + + for i := range tt.producers { + earlyClose := i < tt.producersEarlyClose + + in := make(chan redis.XMessage) + out := make(chan redis.XMessage) + go func() { + require.NoError(t, sorter.PipelineFunc(context.Background(), history.Sync{}, "", in, out)) + }() + + if !earlyClose { + defer close(in) // no leakage, general cleanup + } + + go func() { + for { + time.Sleep(time.Duration(rand.Int63n(250)) * time.Microsecond) + + inCounterMutex.Lock() + isFin := inCounter <= 0 + if !isFin { + inCounter-- + } + inCounterMutex.Unlock() + + if isFin { + return + } + + ms := time.Now().UnixMilli() + rand.Int63n(20) - 10 + seq := rand.Int63n(1_000) + + // Add 10% time travelers + if rand.Int63n(10) == 9 { + distanceMs := int64(5) + if rand.Int63n(2) > 0 { + // Don't go back too far. Otherwise, elements would be out of order - submissionMinAge. + ms -= distanceMs + } else { + ms += distanceMs + } + } + + msg := redis.XMessage{ID: fmt.Sprintf("%d-%d", ms, seq)} + in <- msg + + // 25% chance of closing for early closing producers + if earlyClose && rand.Int63n(4) == 3 { + close(in) + t.Log("closed producer early") + return + } + } + }() + + go outConsumer(out) + } + + var outCounter int + breakFor: + for { + select { + case <-outCounterCh: + outCounter++ + if outCounter == tt.messages { + break breakFor + } + + case <-time.After(3 * time.Second): + if tt.expectTimeout { + return + } + t.Fatalf("Collecting messages timed out after receiving %d out of %d messages", + outCounter, tt.messages) + } + } + if tt.expectTimeout { + t.Fatal("Timeout was expected") + } + + callbackCollectionMutex.Lock() + for i := 0; i < len(callbackCollection)-1; i++ { + parse := func(id string) (int64, int64) { + parts := strings.Split(id, "-") + ms, err1 := strconv.ParseInt(parts[0], 10, 64) + seq, err2 := strconv.ParseInt(parts[1], 10, 64) + require.NoError(t, cmp.Or(err1, err2)) + return ms, seq + } + + a, b := callbackCollection[i], callbackCollection[i+1] + aMs, aSeq := parse(a) + bMs, bSeq := parse(b) + + switch { + case aMs < bMs: + case aMs == bMs: + if aSeq > bSeq { + t.Errorf("collection in wrong order: %q before %q", a, b) + } + case aMs > bMs: + t.Errorf("collection in wrong order: %q before %q", a, b) + } + } + callbackCollectionMutex.Unlock() + }) + } +}