55	"time" 
66
77	sw "github.com/RussellLuo/slidingwindow" 
8- 	"github.com/google/uuid" 
98
9+ 	"github.com/flanksource/commons/collections" 
1010	"github.com/flanksource/config-db/api" 
1111	"github.com/flanksource/config-db/db/models" 
1212	"github.com/flanksource/config-db/pkg/ratelimit" 
@@ -15,6 +15,8 @@ import (
1515const  (
1616	rateLimitWindow     =  time .Hour  *  4 
1717	maxChangesInWindow  =  100 
18+ 
19+ 	ChangeTypeTooManyChanges  =  "TooManyChanges" 
1820)
1921
2022func  GetWorkflowRunCount (ctx  api.ScrapeContext , workflowID  string ) (int64 , error ) {
@@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
2931var  (
3032	scraperLocks        =  sync.Map {}
3133	configRateLimiters  =  map [string ]* sw.Limiter {}
34+ 
35+ 	// List of configs that have been rate limited. 
36+ 	// It's used to avoid inserting mutliple "TooManyChanges" changes for the same config. 
37+ 	rateLimitedConfigsPerScraper  =  sync.Map {}
3238)
3339
34- func  rateLimitChanges (ctx  api.ScrapeContext , newChanges  []* models.ConfigChange ) ([]* models.ConfigChange , error ) {
35- 	if  len (newChanges ) ==  0  {
36- 		return  nil , nil 
40+ func  rateLimitChanges (ctx  api.ScrapeContext , newChanges  []* models.ConfigChange ) ([]* models.ConfigChange , [] string ,  error ) {
41+ 	if  len (newChanges ) ==  0  ||   ctx . ScrapeConfig (). GetPersistedID ()  ==   nil   {
42+ 		return  newChanges ,  nil , nil 
3743	}
3844
39- 	lock , loaded  :=  scraperLocks .LoadOrStore (ctx .ScrapeConfig ().GetPersistedID (), & sync.Mutex {})
45+ 	window  :=  ctx .Properties ().Duration ("changes.max.window" , rateLimitWindow )
46+ 	max  :=  ctx .Properties ().Int ("changes.max.count" , maxChangesInWindow )
47+ 	scraperID  :=  ctx .ScrapeConfig ().GetPersistedID ().String ()
48+ 
49+ 	lock , loaded  :=  scraperLocks .LoadOrStore (scraperID , & sync.Mutex {})
4050	lock .(* sync.Mutex ).Lock ()
4151	defer  lock .(* sync.Mutex ).Unlock ()
4252
43- 	window   :=  ctx . Properties (). Duration ( "changes.max.window" ,  rateLimitWindow )
44- 	max  :=  ctx . Properties (). Int ( "changes.max.count" ,  maxChangesInWindow )
53+ 	_rateLimitedConfigs ,  _   :=  rateLimitedConfigsPerScraper . LoadOrStore ( scraperID ,  map [ string ] struct {}{} )
54+ 	rateLimitedConfigs  :=  _rateLimitedConfigs .( map [ string ] struct {} )
4555
4656	if  ! loaded  {
47- 		// populate the rate limit window for the scraper 
48- 		query  :=  `SELECT config_id, COUNT(*), min(created_at) FROM config_changes  
49- 		WHERE change_type != 'TooManyChanges' 
50- 		AND NOW() - created_at <= ? GROUP BY config_id` 
51- 		rows , err  :=  ctx .DB ().Raw (query , window ).Rows ()
52- 		if  err  !=  nil  {
53- 			return  nil , err 
57+ 		// This is the initial sync of the rate limiter with the database. 
58+ 		// Happens only once for each scrape config. 
59+ 		if  err  :=  syncWindow (ctx , max , window ); err  !=  nil  {
60+ 			return  nil , nil , err 
5461		}
5562
56- 		for  rows .Next () {
57- 			var  configID  string 
58- 			var  count  int 
59- 			var  earliest  time.Time 
60- 			if  err  :=  rows .Scan (& configID , & count , & earliest ); err  !=  nil  {
61- 				return  nil , err 
62- 			}
63- 
64- 			rateLimiter , _  :=  sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
65- 				win , stopper  :=  ratelimit .NewLocalWindow ()
66- 				if  count  >  0  {
67- 					win .SetStart (earliest )
68- 					win .AddCount (int64 (count ))
69- 				}
70- 				return  win , stopper 
71- 			})
72- 			configRateLimiters [configID ] =  rateLimiter 
63+ 		if  rlConfigs , err  :=  syncCurrentlyRateLimitedConfigs (ctx , window ); err  !=  nil  {
64+ 			return  nil , nil , err 
65+ 		} else  {
66+ 			rateLimitedConfigs  =  rlConfigs 
7367		}
7468	}
7569
70+ 	rateLimitedThisRun  :=  map [string ]struct {}{}
7671	passingNewChanges  :=  make ([]* models.ConfigChange , 0 , len (newChanges ))
77- 	rateLimited  :=  map [string ]struct {}{}
7872	for  _ , change  :=  range  newChanges  {
7973		rateLimiter , ok  :=  configRateLimiters [change .ConfigID ]
8074		if  ! ok  {
@@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange)
8680		}
8781
8882		if  ! rateLimiter .Allow () {
89- 			ctx .Logger .V (2 ).Infof ("change rate limited (config=%s)" , change .ConfigID )
90- 			rateLimited [change .ConfigID ] =  struct {}{}
83+ 			ctx .Logger .V (1 ).Infof ("change rate limited (config=%s, external_id=%s, type=%s )" , change .ConfigID ,  change . ExternalChangeId ,  change . ChangeType )
84+ 			rateLimitedThisRun [change .ConfigID ] =  struct {}{}
9185			continue 
86+ 		} else  {
87+ 			delete (rateLimitedConfigs , change .ConfigID )
9288		}
9389
9490		passingNewChanges  =  append (passingNewChanges , change )
9591	}
9692
97- 	// For all the rate limited configs, we add a new "TooManyChanges" change 
98- 	for  configID  :=  range  rateLimited  {
99- 		passingNewChanges  =  append (passingNewChanges , & models.ConfigChange {
100- 			ConfigID :         configID ,
101- 			Summary :          "Changes on this config has been rate limited" ,
102- 			ChangeType :       "TooManyChanges" ,
103- 			ExternalChangeId : uuid .New ().String (),
93+ 	var  newlyRateLimited  []string 
94+ 	for  configID  :=  range  rateLimitedThisRun  {
95+ 		if  _ , ok  :=  rateLimitedConfigs [configID ]; ! ok  {
96+ 			newlyRateLimited  =  append (newlyRateLimited , configID )
97+ 		}
98+ 	}
99+ 
100+ 	rateLimitedConfigs  =  collections .MergeMap (rateLimitedConfigs , rateLimitedThisRun )
101+ 	rateLimitedConfigsPerScraper .Store (scraperID , rateLimitedConfigs )
102+ 
103+ 	return  passingNewChanges , newlyRateLimited , nil 
104+ }
105+ 
106+ func  syncCurrentlyRateLimitedConfigs (ctx  api.ScrapeContext , window  time.Duration ) (map [string ]struct {}, error ) {
107+ 	query  :=  `WITH latest_changes AS ( 
108+ 		SELECT 
109+ 			DISTINCT ON (config_id) config_id, 
110+ 			change_type 
111+ 		FROM 
112+ 			config_changes 
113+ 		LEFT JOIN config_items ON config_items.id = config_changes.config_id 
114+ 			WHERE 
115+ 				scraper_id = ? 
116+ 				AND NOW() - config_changes.created_at <= ? 
117+ 		ORDER BY 
118+ 			config_id, 
119+ 			config_changes.created_at DESC 
120+ 		) SELECT config_id FROM latest_changes WHERE change_type = ?` 
121+ 	rows , err  :=  ctx .DB ().Raw (query , ctx .ScrapeConfig ().GetPersistedID (), window , ChangeTypeTooManyChanges ).Rows ()
122+ 	if  err  !=  nil  {
123+ 		return  nil , err 
124+ 	}
125+ 
126+ 	output  :=  make (map [string ]struct {})
127+ 	for  rows .Next () {
128+ 		var  id  string 
129+ 		if  err  :=  rows .Scan (& id ); err  !=  nil  {
130+ 			return  nil , err 
131+ 		}
132+ 
133+ 		ctx .Logger .V (1 ).Infof ("config %s is currently found to be rate limited" , id )
134+ 		output [id ] =  struct {}{}
135+ 	}
136+ 
137+ 	return  output , rows .Err ()
138+ }
139+ 
140+ // syncWindow syncs the rate limit window for the scraper with the changes in the db. 
141+ func  syncWindow (ctx  api.ScrapeContext , max  int , window  time.Duration ) error  {
142+ 	query  :=  `SELECT 
143+ 			config_id, 
144+ 			COUNT(*), 
145+ 			MIN(config_changes.created_at) AS min_created_at 
146+ 		FROM 
147+ 			config_changes 
148+ 		LEFT JOIN config_items ON config_items.id = config_changes.config_id 
149+ 		WHERE 
150+ 			scraper_id = ? 
151+ 			AND change_type != ? 
152+ 			AND NOW() - config_changes.created_at <= ? 
153+ 		GROUP BY 
154+ 			config_id` 
155+ 	rows , err  :=  ctx .DB ().Raw (query , ctx .ScrapeConfig ().GetPersistedID (), ChangeTypeTooManyChanges , window ).Rows ()
156+ 	if  err  !=  nil  {
157+ 		return  err 
158+ 	}
159+ 
160+ 	for  rows .Next () {
161+ 		var  configID  string 
162+ 		var  count  int 
163+ 		var  earliest  time.Time 
164+ 		if  err  :=  rows .Scan (& configID , & count , & earliest ); err  !=  nil  {
165+ 			return  err 
166+ 		}
167+ 		ctx .Logger .V (3 ).Infof ("config %s has %d changes in the last %s" , configID , count , window )
168+ 
169+ 		rateLimiter , _  :=  sw .NewLimiter (window , int64 (max ), func () (sw.Window , sw.StopFunc ) {
170+ 			win , stopper  :=  ratelimit .NewLocalWindow ()
171+ 			if  count  >  0  {
172+ 				win .SetStart (earliest )
173+ 				win .AddCount (int64 (count ))
174+ 			}
175+ 			return  win , stopper 
104176		})
177+ 		configRateLimiters [configID ] =  rateLimiter 
105178	}
106179
107- 	return  passingNewChanges ,  nil 
180+ 	return  rows . Err () 
108181}
0 commit comments