diff --git a/db/changes_rate_limit.go b/db/changes_rate_limit.go new file mode 100644 index 000000000..15c41faf2 --- /dev/null +++ b/db/changes_rate_limit.go @@ -0,0 +1,162 @@ +package db + +import ( + "sync" + "time" + + "github.com/flanksource/commons/collections" + sw "github.com/flanksource/slidingwindow" + + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db/models" +) + +const ( + rateLimitWindow = time.Hour * 4 + maxChangesInWindow = 100 + + ChangeTypeTooManyChanges = "TooManyChanges" +) + +var ( + scraperLocks = sync.Map{} + configRateLimiters = map[string]*sw.Limiter{} + + // List of configs that are currently in being rate limited. + // It's used to avoid inserting multiple "TooManyChanges" changes for the same config. + currentlyRateLimitedConfigsPerScraper = sync.Map{} +) + +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { + if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { + return newChanges, nil, nil + } + + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + scraperID := ctx.ScrapeConfig().GetPersistedID().String() + + lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + _rateLimitedConfigs, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) + rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) + + if !loaded { + // This is the initial sync of the rate limiter with the database. + // Happens only once for each scrape config. + if err := syncWindow(ctx, max, window); err != nil { + return nil, nil, err + } + + if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { + return nil, nil, err + } else { + rateLimitedConfigs = rlConfigs + } + } + + var rateLimitedConfigsThisRun = make(map[string]struct{}) + var passingNewChanges = make([]*models.ConfigChange, 0, len(newChanges)) + for _, change := range newChanges { + if _, ok := rateLimitedConfigs[change.ConfigID]; ok { + rateLimitedConfigsThisRun[change.ConfigID] = struct{}{} + } else { + passingNewChanges = append(passingNewChanges, change) + } + } + + // Find those changes that were rate limited only this run but + // weren't previously in the rate limited state. + var newlyRateLimited []string + for configID := range rateLimitedConfigsThisRun { + if _, ok := rateLimitedConfigs[configID]; !ok { + newlyRateLimited = append(newlyRateLimited, configID) + } + } + + rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedConfigsThisRun) + currentlyRateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) + + return passingNewChanges, newlyRateLimited, nil +} + +func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { + query := `WITH latest_changes AS ( + SELECT + DISTINCT ON (config_id) config_id, + change_type + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND NOW() - config_changes.created_at <= ? + ORDER BY + config_id, + config_changes.created_at DESC + ) SELECT config_id FROM latest_changes WHERE change_type = ?` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + output := make(map[string]struct{}) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + + ctx.Logger.V(3).Infof("config %s is currently found to be rate limited", id) + output[id] = struct{}{} + } + + return output, rows.Err() +} + +// syncWindow syncs the rate limit window for the scraper with the changes in the db. +func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { + query := `SELECT + config_id, + COUNT(*), + MIN(config_changes.created_at) AS min_created_at + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND change_type != ? + AND NOW() - config_changes.created_at <= ? + GROUP BY + config_id` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return err + } + ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := sw.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper + }) + configRateLimiters[configID] = rateLimiter + } + + return rows.Err() +} diff --git a/db/models/config_change.go b/db/models/config_change.go index 87aae48bb..8545db14f 100644 --- a/db/models/config_change.go +++ b/db/models/config_change.go @@ -5,7 +5,6 @@ import ( "time" v1 "github.com/flanksource/config-db/api/v1" - "github.com/google/uuid" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -15,8 +14,8 @@ type ConfigChange struct { ExternalID string `gorm:"-"` ConfigType string `gorm:"-"` ExternalChangeId string `gorm:"column:external_change_id" json:"external_change_id"` - ID string `gorm:"primaryKey;unique_index;not null;column:id" json:"id"` - ConfigID string `gorm:"column:config_id;default:''" json:"config_id"` + ID string `gorm:"primaryKey;unique_index;not null;column:id;default:generate_ulid()" json:"id"` + ConfigID string `gorm:"column:config_id" json:"config_id"` ChangeType string `gorm:"column:change_type" json:"change_type"` Diff *string `gorm:"column:diff" json:"diff,omitempty"` Severity string `gorm:"column:severity" json:"severity"` @@ -62,10 +61,6 @@ func NewConfigChangeFromV1(result v1.ScrapeResult, change v1.ChangeResult) *Conf } func (c *ConfigChange) BeforeCreate(tx *gorm.DB) (err error) { - if c.ID == "" { - c.ID = uuid.New().String() - } - tx.Statement.AddClause(clause.OnConflict{DoNothing: true}) return } diff --git a/db/update.go b/db/update.go index ee8ca64ea..e07fb9c7b 100644 --- a/db/update.go +++ b/db/update.go @@ -6,6 +6,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/aws/smithy-go/ptr" @@ -21,6 +22,7 @@ import ( dutyContext "github.com/flanksource/duty/context" dutyModels "github.com/flanksource/duty/models" "github.com/flanksource/gomplate/v3" + sw "github.com/flanksource/slidingwindow" "github.com/google/uuid" "github.com/hexops/gotextdiff" "github.com/hexops/gotextdiff/myers" @@ -31,7 +33,11 @@ import ( "gorm.io/gorm/clause" ) -const configItemsBulkInsertSize = 200 +const ( + configItemsBulkInsertSize = 200 + + configChangesBulkInsertSize = 200 +) func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} @@ -356,8 +362,67 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to update last scraped time: %w", err) } - if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { - return fmt.Errorf("failed to create config changes: %w", err) + newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges) + if err != nil { + return fmt.Errorf("failed to rate limit changes: %w", err) + } + + // NOTE: Returning just the needed columns didn't work in gorm for some reason. + // So, returning * for now. + // returnClause := clause.Returning{Columns: []clause.Column{{Name: "id"}, {Name: "config_id"}, {Name: "external_change_id"}}} + returnClause := clause.Returning{} + + // NOTE: Ran into a weird gorm behavior where CreateInBatches combined with a return clause + // panics. So, handling batching manually. + var rateLimitedAfterInsertion = map[string]struct{}{} + for _, batch := range lo.Chunk(newChanges, configChangesBulkInsertSize) { + if err := ctx.DB().Clauses(returnClause).Create(&batch).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(batch) != 0 { + // the `batch` slice now only contains those changes that were actually inserted. + // we increase the usage count on the rate limiter for those changes. + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + for _, b := range batch { + rateLimiter, ok := configRateLimiters[b.ConfigID] + if !ok { + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + rateLimiter, _ = sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + return sw.NewLocalWindow() + }) + configRateLimiters[b.ConfigID] = rateLimiter + } + + if rateLimiter.Allow() { + rateLimitedAfterInsertion[b.ConfigID] = struct{}{} + } + } + lock.Unlock() + } + } + + syncCurrentlyRatelimitedConfigMap(ctx, newChanges, rateLimitedAfterInsertion) + + // For all the rate limited configs, we add a new "TooManyChanges" change. + // This is intentionally inserted in a different batch from the new changes + // as "ChangeTypeTooManyChanges" will have the same created_at timestamp. + // We want these changes to be newer than the actual changes. + var rateLimitedChanges []*models.ConfigChange + for _, configID := range rateLimitedThisRun { + rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{ + ConfigID: configID, + Summary: "Changes on this config has been rate limited", + ChangeType: ChangeTypeTooManyChanges, + ExternalChangeId: uuid.New().String(), + }) + } + + if err := ctx.DB().CreateInBatches(rateLimitedChanges, configChangesBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create rate limited config changes: %w", err) } if len(changesToUpdate) != 0 { @@ -411,6 +476,27 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return nil } +func syncCurrentlyRatelimitedConfigMap(ctx api.ScrapeContext, insertedChanged []*models.ConfigChange, rateLimitedAfterInsertion map[string]struct{}) { + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + + m, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), map[string]struct{}{}) + mm := m.(map[string]struct{}) + for _, c := range insertedChanged { + if _, ok := rateLimitedAfterInsertion[c.ConfigID]; !ok { + // All the configs that weren't rate limited, will need to be removed from the + // "currently rate limited" map + delete(mm, c.ConfigID) + } else { + mm[c.ConfigID] = struct{}{} + } + } + + currentlyRateLimitedConfigsPerScraper.Store(ctx.ScrapeConfig().GetPersistedID().String(), mm) + lock.Unlock() +} + func updateLastScrapedTime(ctx api.ScrapeContext, ids []string) error { if len(ids) == 0 { return nil diff --git a/go.mod b/go.mod index 0471ced33..d79179cce 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/flanksource/is-healthy v1.0.7 github.com/flanksource/ketall v1.1.6 github.com/flanksource/mapstructure v1.6.0 + github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a diff --git a/go.sum b/go.sum index c8ae9bc94..f080de42b 100644 --- a/go.sum +++ b/go.sum @@ -872,6 +872,8 @@ github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhG github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps= github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c= github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b h1:zB2nVrRAUgrZQb2eutgKzWd6ld7syPacrY/XQmz7Wks= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b/go.mod h1:+hHPT8Yx+8I6i4SF6WwvwRso532uHlPJ1T029dtHFak= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=