Skip to content

Commit 5f43ea7

Browse files
committed
feat: post insertion rate limitation
1 parent c9c63f5 commit 5f43ea7

File tree

7 files changed

+242
-303
lines changed

7 files changed

+242
-303
lines changed

db/changes.go

Lines changed: 1 addition & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,6 @@
11
package db
22

3-
import (
4-
"sync"
5-
"time"
6-
7-
sw "github.com/RussellLuo/slidingwindow"
8-
"github.com/patrickmn/go-cache"
9-
"github.com/samber/lo"
10-
11-
"github.com/flanksource/commons/collections"
12-
"github.com/flanksource/config-db/api"
13-
"github.com/flanksource/config-db/db/models"
14-
"github.com/flanksource/config-db/pkg/ratelimit"
15-
)
16-
17-
const (
18-
rateLimitWindow = time.Hour * 4
19-
maxChangesInWindow = 100
20-
21-
ChangeTypeTooManyChanges = "TooManyChanges"
22-
)
23-
24-
var configChangesCache = cache.New(time.Hour*24, time.Hour*24)
3+
import "github.com/flanksource/config-db/api"
254

265
func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
276
var count int64
@@ -31,208 +10,3 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
3110
Error
3211
return count, err
3312
}
34-
35-
var (
36-
scraperLocks = sync.Map{}
37-
configRateLimiters = map[string]*sw.Limiter{}
38-
39-
// List of configs that have been rate limited.
40-
// It's used to avoid inserting mutliple "TooManyChanges" changes for the same config.
41-
rateLimitedConfigsPerScraper = sync.Map{}
42-
)
43-
44-
func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) {
45-
if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil {
46-
return newChanges, nil, nil
47-
}
48-
49-
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
50-
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
51-
scraperID := ctx.ScrapeConfig().GetPersistedID().String()
52-
53-
lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{})
54-
lock.(*sync.Mutex).Lock()
55-
defer lock.(*sync.Mutex).Unlock()
56-
57-
_rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{})
58-
rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{})
59-
60-
if !loaded {
61-
// This is the initial sync of the rate limiter with the database.
62-
// Happens only once for each scrape config.
63-
if err := syncWindow(ctx, max, window); err != nil {
64-
return nil, nil, err
65-
}
66-
67-
if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil {
68-
return nil, nil, err
69-
} else {
70-
rateLimitedConfigs = rlConfigs
71-
}
72-
}
73-
74-
rateLimitedThisRun := map[string]struct{}{}
75-
passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges))
76-
for _, change := range newChanges {
77-
rateLimiter, ok := configRateLimiters[change.ConfigID]
78-
if !ok {
79-
rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
80-
return sw.NewLocalWindow()
81-
})
82-
configRateLimiters[change.ConfigID] = rl
83-
rateLimiter = rl
84-
}
85-
86-
if !rateLimiter.Allow() {
87-
ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType)
88-
rateLimitedThisRun[change.ConfigID] = struct{}{}
89-
continue
90-
} else {
91-
delete(rateLimitedConfigs, change.ConfigID)
92-
}
93-
94-
passingNewChanges = append(passingNewChanges, change)
95-
}
96-
97-
var newlyRateLimited []string
98-
for configID := range rateLimitedThisRun {
99-
if _, ok := rateLimitedConfigs[configID]; !ok {
100-
newlyRateLimited = append(newlyRateLimited, configID)
101-
}
102-
}
103-
104-
rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun)
105-
rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs)
106-
107-
return passingNewChanges, newlyRateLimited, nil
108-
}
109-
110-
func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) {
111-
query := `WITH latest_changes AS (
112-
SELECT
113-
DISTINCT ON (config_id) config_id,
114-
change_type
115-
FROM
116-
config_changes
117-
LEFT JOIN config_items ON config_items.id = config_changes.config_id
118-
WHERE
119-
scraper_id = ?
120-
AND NOW() - config_changes.created_at <= ?
121-
ORDER BY
122-
config_id,
123-
config_changes.created_at DESC
124-
) SELECT config_id FROM latest_changes WHERE change_type = ?`
125-
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows()
126-
if err != nil {
127-
return nil, err
128-
}
129-
defer rows.Close()
130-
131-
output := make(map[string]struct{})
132-
for rows.Next() {
133-
var id string
134-
if err := rows.Scan(&id); err != nil {
135-
return nil, err
136-
}
137-
138-
ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id)
139-
output[id] = struct{}{}
140-
}
141-
142-
return output, rows.Err()
143-
}
144-
145-
// syncWindow syncs the rate limit window for the scraper with the changes in the db.
146-
func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
147-
query := `SELECT
148-
config_id,
149-
COUNT(*),
150-
MIN(config_changes.created_at) AS min_created_at
151-
FROM
152-
config_changes
153-
LEFT JOIN config_items ON config_items.id = config_changes.config_id
154-
WHERE
155-
scraper_id = ?
156-
AND change_type != ?
157-
AND NOW() - config_changes.created_at <= ?
158-
GROUP BY
159-
config_id`
160-
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows()
161-
if err != nil {
162-
return err
163-
}
164-
defer rows.Close()
165-
166-
for rows.Next() {
167-
var configID string
168-
var count int
169-
var earliest time.Time
170-
if err := rows.Scan(&configID, &count, &earliest); err != nil {
171-
return err
172-
}
173-
ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window)
174-
175-
rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
176-
win, stopper := ratelimit.NewLocalWindow()
177-
if count > 0 {
178-
win.SetStart(earliest)
179-
win.AddCount(int64(count))
180-
}
181-
return win, stopper
182-
})
183-
configRateLimiters[configID] = rateLimiter
184-
}
185-
186-
return rows.Err()
187-
}
188-
189-
// filterOutPersistedChanges returns only those changes that weren't seen in the db.
190-
func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) {
191-
// use cache to filter out ones that we've already seen before
192-
changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
193-
_, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId)
194-
if found {
195-
_ = found
196-
}
197-
return !found
198-
})
199-
200-
if len(changes) == 0 {
201-
return nil, nil
202-
}
203-
204-
query := `SELECT config_id, external_change_id
205-
FROM config_changes
206-
WHERE (config_id, external_change_id) IN ?`
207-
args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string {
208-
return []string{c.ConfigID, c.ExternalChangeId}
209-
})
210-
211-
rows, err := ctx.DB().Raw(query, args).Rows()
212-
if err != nil {
213-
return nil, err
214-
}
215-
defer rows.Close()
216-
217-
existing := make(map[string]struct{})
218-
for rows.Next() {
219-
var configID, externalChangeID string
220-
if err := rows.Scan(&configID, &externalChangeID); err != nil {
221-
return nil, err
222-
}
223-
224-
configChangesCache.SetDefault(configID+externalChangeID, struct{}{})
225-
existing[configID+externalChangeID] = struct{}{}
226-
}
227-
228-
newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
229-
_, found := existing[c.ConfigID+c.ExternalChangeId]
230-
return !found
231-
})
232-
233-
if len(newOnes) > 0 {
234-
_ = query
235-
}
236-
237-
return newOnes, nil
238-
}

0 commit comments

Comments
 (0)