Skip to content

Commit defccec

Browse files
committed
improve config_changes lookup for existing ones
1 parent fe21c62 commit defccec

File tree

3 files changed

+68
-41
lines changed

3 files changed

+68
-41
lines changed

api/cache.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,38 +78,6 @@ func (t TempCache) Insert(item models.ConfigItem) {
7878
t.items[strings.ToLower(item.ID)] = item
7979
}
8080

81-
func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) {
82-
if configID == "" || externalChangeID == "" {
83-
return false, nil
84-
}
85-
86-
configID = strings.ToLower(configID)
87-
externalChangeID = strings.ToLower(externalChangeID)
88-
89-
if t.changes == nil {
90-
t.changes = make(map[string]struct{})
91-
}
92-
93-
if _, ok := t.changes[configID+externalChangeID]; ok {
94-
return true, nil
95-
}
96-
97-
var result models.ConfigChange
98-
if err := t.ctx.DB().Select("id").Where("config_id = ?", configID).
99-
Where("external_change_id = ?", externalChangeID).
100-
Limit(1).
101-
Find(&result).Error; err != nil {
102-
return false, err
103-
}
104-
105-
if result.ID != "" {
106-
t.changes[configID+externalChangeID] = struct{}{}
107-
return true, nil
108-
}
109-
110-
return false, nil
111-
}
112-
11381
func (t TempCache) Get(id string) (*models.ConfigItem, error) {
11482
id = strings.ToLower(id)
11583
if id == "" {

db/changes.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66

77
sw "github.com/RussellLuo/slidingwindow"
8+
"github.com/patrickmn/go-cache"
9+
"github.com/samber/lo"
810

911
"github.com/flanksource/commons/collections"
1012
"github.com/flanksource/config-db/api"
@@ -19,6 +21,8 @@ const (
1921
ChangeTypeTooManyChanges = "TooManyChanges"
2022
)
2123

24+
var configChangesCache = cache.New(time.Hour*24, time.Hour*24)
25+
2226
func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
2327
var count int64
2428
err := ctx.DB().Table("config_changes").
@@ -122,6 +126,7 @@ func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration
122126
if err != nil {
123127
return nil, err
124128
}
129+
defer rows.Close()
125130

126131
output := make(map[string]struct{})
127132
for rows.Next() {
@@ -156,6 +161,7 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
156161
if err != nil {
157162
return err
158163
}
164+
defer rows.Close()
159165

160166
for rows.Next() {
161167
var configID string
@@ -179,3 +185,54 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
179185

180186
return rows.Err()
181187
}
188+
189+
// filterOutPersistedChanges returns only those changes that weren't seen in the db.
190+
func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) {
191+
// use cache to filter out ones that we've already seen before
192+
changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
193+
_, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId)
194+
if found {
195+
_ = found
196+
}
197+
return !found
198+
})
199+
200+
if len(changes) == 0 {
201+
return nil, nil
202+
}
203+
204+
query := `SELECT config_id, external_change_id
205+
FROM config_changes
206+
WHERE (config_id, external_change_id) IN ?`
207+
args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string {
208+
return []string{c.ConfigID, c.ExternalChangeId}
209+
})
210+
211+
rows, err := ctx.DB().Raw(query, args).Rows()
212+
if err != nil {
213+
return nil, err
214+
}
215+
defer rows.Close()
216+
217+
existing := make(map[string]struct{})
218+
for rows.Next() {
219+
var configID, externalChangeID string
220+
if err := rows.Scan(&configID, &externalChangeID); err != nil {
221+
return nil, err
222+
}
223+
224+
configChangesCache.SetDefault(configID+externalChangeID, struct{}{})
225+
existing[configID+externalChangeID] = struct{}{}
226+
}
227+
228+
newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
229+
_, found := existing[c.ConfigID+c.ExternalChangeId]
230+
return !found
231+
})
232+
233+
if len(newOnes) > 0 {
234+
_ = query
235+
}
236+
237+
return newOnes, nil
238+
}

db/update.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult)
216216

217217
func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) {
218218
var (
219-
newOnes = []*models.ConfigChange{}
220-
updates = []*models.ConfigChange{}
219+
toInsert = []*models.ConfigChange{}
220+
toUpdate = []*models.ConfigChange{}
221221
)
222222

223223
changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)
@@ -274,17 +274,19 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
274274
}
275275

276276
if changeResult.UpdateExisting {
277-
updates = append(updates, change)
277+
toUpdate = append(toUpdate, change)
278278
} else {
279-
if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil {
280-
return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err)
281-
} else if !ok {
282-
newOnes = append(newOnes, change)
283-
}
279+
toInsert = append(toInsert, change)
284280
}
285281
}
286282

287-
return newOnes, updates, nil
283+
// Remove the changes that have already been inserted.
284+
newOnes, err := filterOutPersistedChanges(ctx, toInsert)
285+
if err != nil {
286+
return nil, nil, err
287+
}
288+
289+
return newOnes, toUpdate, nil
288290
}
289291

290292
func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {

0 commit comments

Comments
 (0)