Skip to content

Commit 42eb8de

Browse files
authored
Fix Analyze Org Data Race (#198)
* reworked opa initialization. We initialize the compiler only once after the instance is created and the config is loaded instead of on every eval call * fix tests * reworked analyze org to fix data race when accumulating scanned packages in the inventory to then proceed to the finalization of the analysis
1 parent b4927ee commit 42eb8de

File tree

7 files changed

+104
-41
lines changed

7 files changed

+104
-41
lines changed

analyze/analyze.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,23 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine
9292
log.Debug().Msgf("Starting repository analysis for organization: %s on %s", org, provider)
9393
bar := a.progressBar(0, "Analyzing repositories")
9494

95-
var wg sync.WaitGroup
95+
var reposWg sync.WaitGroup
9696
errChan := make(chan error, 1)
9797
maxGoroutines := 2
9898
if numberOfGoroutines != nil {
9999
maxGoroutines = *numberOfGoroutines
100100
}
101-
sem := semaphore.NewWeighted(int64(maxGoroutines))
101+
goRoutineLimitSem := semaphore.NewWeighted(int64(maxGoroutines))
102+
103+
pkgChan := make(chan *models.PackageInsights)
104+
pkgWg := sync.WaitGroup{}
105+
pkgWg.Add(1)
106+
go func() {
107+
defer pkgWg.Done()
108+
for pkg := range pkgChan {
109+
inventory.Packages = append(inventory.Packages, pkg)
110+
}
111+
}()
102112

103113
for repoBatch := range orgReposBatches {
104114
if repoBatch.Err != nil {
@@ -113,15 +123,15 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine
113123
bar.ChangeMax(repoBatch.TotalCount - 1)
114124
continue
115125
}
116-
if err := sem.Acquire(ctx, 1); err != nil {
126+
if err := goRoutineLimitSem.Acquire(ctx, 1); err != nil {
117127
close(errChan)
118128
return fmt.Errorf("failed to acquire semaphore: %w", err)
119129
}
120130

121-
wg.Add(1)
131+
reposWg.Add(1)
122132
go func(repo Repository) {
123-
defer sem.Release(1)
124-
defer wg.Done()
133+
defer goRoutineLimitSem.Release(1)
134+
defer reposWg.Done()
125135
repoNameWithOwner := repo.GetRepoIdentifier()
126136
tempDir, err := a.cloneRepoToTemp(ctx, repo.BuildGitURL(a.ScmClient.GetProviderBaseURL()), a.ScmClient.GetToken(), "HEAD")
127137
if err != nil {
@@ -136,9 +146,16 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine
136146
return
137147
}
138148

139-
err = inventory.AddPackage(ctx, pkg, tempDir)
149+
scannedPkg, err := inventory.ScanPackage(ctx, pkg, tempDir)
140150
if err != nil {
141-
log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to add package to inventory")
151+
log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to scan package")
152+
return
153+
}
154+
155+
select {
156+
case pkgChan <- scannedPkg:
157+
case <-ctx.Done():
158+
log.Error().Msg("Context canceled while sending package to channel")
142159
return
143160
}
144161
_ = bar.Add(1)
@@ -147,10 +164,13 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine
147164
}
148165

149166
go func() {
150-
wg.Wait()
167+
reposWg.Wait()
168+
close(pkgChan)
151169
close(errChan)
152170
}()
153171

172+
pkgWg.Wait()
173+
154174
for err := range errChan {
155175
if err != nil {
156176
return err

cmd/root.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,11 @@ func GetAnalyzer(ctx context.Context, command string) (*analyze.Analyzer, error)
186186
}
187187

188188
func newOpa(ctx context.Context) (*opa.Opa, error) {
189-
opaClient, err := opa.NewOpa()
189+
opaClient, err := opa.NewOpa(ctx, config)
190190
if err != nil {
191191
log.Error().Err(err).Msg("Failed to create OPA client")
192192
return nil, err
193193
}
194-
_ = opaClient.WithConfig(ctx, config)
195194

196195
return opaClient, nil
197196
}

opa/opa.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,27 @@ type Opa struct {
3030
LoadPaths []string
3131
}
3232

33-
func NewOpa() (*Opa, error) {
33+
func NewOpa(ctx context.Context, config *models.Config) (*Opa, error) {
3434
registerBuiltinFunctions()
3535

36-
return &Opa{
36+
newOpa := &Opa{
3737
Store: inmem.NewFromObject(map[string]interface {
3838
}{
3939
"config": models.DefaultConfig(),
4040
}),
41-
}, nil
41+
}
42+
43+
err := newOpa.WithConfig(ctx, config)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed to set opa with config: %w", err)
46+
}
47+
48+
err = newOpa.Compile(ctx)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to initialize opa compiler: %w", err)
51+
}
52+
53+
return newOpa, nil
4254
}
4355

4456
func (o *Opa) Print(ctx print.Context, s string) error {
@@ -117,14 +129,7 @@ func (o *Opa) Compile(ctx context.Context) error {
117129
}
118130

119131
func (o *Opa) Eval(ctx context.Context, query string, input map[string]interface{}, result interface{}) error {
120-
if o.Compiler == nil {
121-
if err := o.Compile(ctx); err != nil {
122-
log.Debug().Msg(err.Error())
123-
return err
124-
}
125-
}
126-
127-
rego := rego.New(
132+
regoInstance := rego.New(
128133
rego.Query(query),
129134
rego.Compiler(o.Compiler),
130135
rego.PrintHook(o),
@@ -133,7 +138,7 @@ func (o *Opa) Eval(ctx context.Context, query string, input map[string]interface
133138
rego.Store(o.Store),
134139
)
135140

136-
rs, err := rego.Eval(ctx)
141+
rs, err := regoInstance.Eval(ctx)
137142
if err != nil {
138143
return err
139144
}

opa/opa_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ func TestOpaBuiltins(t *testing.T) {
4242
},
4343
}
4444

45-
opa, err := NewOpa()
45+
opa, err := NewOpa(context.TODO(), &models.Config{
46+
Include: []models.ConfigInclude{},
47+
})
4648
noOpaErrors(t, err)
4749

4850
for _, c := range cases {
@@ -77,7 +79,9 @@ func TestSemverConstraintCheck(t *testing.T) {
7779
},
7880
}
7981

80-
opa, err := NewOpa()
82+
opa, err := NewOpa(context.TODO(), &models.Config{
83+
Include: []models.ConfigInclude{},
84+
})
8185
noOpaErrors(t, err)
8286

8387
for _, c := range cases {
@@ -114,7 +118,9 @@ func TestJobUsesSelfHostedRunner(t *testing.T) {
114118
"random-name": true,
115119
}
116120

117-
opa, err := NewOpa()
121+
opa, err := NewOpa(context.TODO(), &models.Config{
122+
Include: []models.ConfigInclude{},
123+
})
118124
noOpaErrors(t, err)
119125

120126
for runner, expected := range cases {
@@ -136,7 +142,9 @@ func TestJobUsesSelfHostedRunner(t *testing.T) {
136142
}
137143

138144
func TestWithConfig(t *testing.T) {
139-
o, err := NewOpa()
145+
o, err := NewOpa(context.TODO(), &models.Config{
146+
Include: []models.ConfigInclude{},
147+
})
140148
noOpaErrors(t, err)
141149
ctx := context.TODO()
142150

@@ -181,7 +189,9 @@ func TestCapabilities(t *testing.T) {
181189
}
182190

183191
func TestRulesMetadataLevel(t *testing.T) {
184-
opa, err := NewOpa()
192+
opa, err := NewOpa(context.TODO(), &models.Config{
193+
Include: []models.ConfigInclude{},
194+
})
185195
noOpaErrors(t, err)
186196

187197
query := `{rule_id: rule.level |
@@ -202,7 +212,9 @@ func TestRulesMetadataLevel(t *testing.T) {
202212
}
203213

204214
func TestWithRulesConfig(t *testing.T) {
205-
o, err := NewOpa()
215+
o, err := NewOpa(context.TODO(), &models.Config{
216+
Include: []models.ConfigInclude{},
217+
})
206218
noOpaErrors(t, err)
207219
ctx := context.TODO()
208220

scanner/inventory.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,25 @@ func NewInventory(opa *opa.Opa, pkgsupplyClient ReputationClient, provider strin
3232
}
3333

3434
func (i *Inventory) AddPackage(ctx context.Context, pkg *models.PackageInsights, workdir string) error {
35+
scannedPackage, err := i.ScanPackage(ctx, pkg, workdir)
36+
if err != nil {
37+
return err
38+
}
39+
40+
i.Packages = append(i.Packages, scannedPackage)
41+
return nil
42+
}
43+
44+
func (i *Inventory) ScanPackage(ctx context.Context, pkg *models.PackageInsights, workdir string) (*models.PackageInsights, error) {
3545
s := NewScanner(workdir)
3646
s.Package = pkg
3747

3848
err := s.Run(ctx, i.opa)
3949
if err != nil {
40-
return err
50+
return nil, err
4151
}
4252

43-
i.Packages = append(i.Packages, s.Package)
44-
return nil
53+
return s.Package, nil
4554
}
4655

4756
func (i *Inventory) Purls() []string {

scanner/inventory_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
)
1111

1212
func TestPurls(t *testing.T) {
13-
o, _ := opa.NewOpa()
13+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
14+
Include: []models.ConfigInclude{},
15+
})
1416
i := NewInventory(o, nil, "", "")
1517
pkg := &models.PackageInsights{
1618
Purl: "pkg:github/org/owner",
@@ -53,7 +55,9 @@ func TestPurls(t *testing.T) {
5355
}
5456

5557
func TestFindings(t *testing.T) {
56-
o, _ := opa.NewOpa()
58+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
59+
Include: []models.ConfigInclude{},
60+
})
5761
i := NewInventory(o, nil, "gitlab", "")
5862
purl := "pkg:github/org/owner"
5963
pkg := &models.PackageInsights{
@@ -426,7 +430,9 @@ func TestFindings(t *testing.T) {
426430
}
427431

428432
func TestSkipRule(t *testing.T) {
429-
o, _ := opa.NewOpa()
433+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
434+
Include: []models.ConfigInclude{},
435+
})
430436
i := NewInventory(o, nil, "", "")
431437
ctx := context.TODO()
432438
purl := "pkg:github/org/owner"
@@ -470,7 +476,9 @@ func TestSkipRule(t *testing.T) {
470476
}
471477

472478
func TestRulesConfig(t *testing.T) {
473-
o, _ := opa.NewOpa()
479+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
480+
Include: []models.ConfigInclude{},
481+
})
474482
i := NewInventory(o, nil, "", "")
475483
ctx := context.TODO()
476484
purl := "pkg:github/org/owner"

scanner/scanner_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010

1111
func TestGithubWorkflows(t *testing.T) {
1212
s := NewScanner("testdata")
13-
o, _ := opa.NewOpa()
13+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
14+
Include: []models.ConfigInclude{},
15+
})
1416
err := s.Run(context.TODO(), o)
1517
workflows := s.Package.GithubActionsWorkflows
1618

@@ -33,7 +35,9 @@ func TestGithubWorkflows(t *testing.T) {
3335

3436
func TestGithubWorkflowsNotFound(t *testing.T) {
3537
s := NewScanner("testdata/.github")
36-
o, _ := opa.NewOpa()
38+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
39+
Include: []models.ConfigInclude{},
40+
})
3741
err := s.Run(context.TODO(), o)
3842
workflows := s.Package.GithubActionsWorkflows
3943

@@ -43,7 +47,9 @@ func TestGithubWorkflowsNotFound(t *testing.T) {
4347

4448
func TestGithubActionsMetadata(t *testing.T) {
4549
s := NewScanner("testdata")
46-
o, _ := opa.NewOpa()
50+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
51+
Include: []models.ConfigInclude{},
52+
})
4753
err := s.Run(context.TODO(), o)
4854

4955
metadata := s.Package.GithubActionsMetadata
@@ -58,7 +64,9 @@ func TestGithubActionsMetadata(t *testing.T) {
5864

5965
func TestRun(t *testing.T) {
6066
s := NewScanner("testdata")
61-
o, _ := opa.NewOpa()
67+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
68+
Include: []models.ConfigInclude{},
69+
})
6270
s.Package.Purl = "pkg:github/org/owner"
6371

6472
err := s.Run(context.TODO(), o)
@@ -73,7 +81,9 @@ func TestRun(t *testing.T) {
7381

7482
func TestPipelineAsCodeTekton(t *testing.T) {
7583
s := NewScanner("testdata")
76-
o, _ := opa.NewOpa()
84+
o, _ := opa.NewOpa(context.TODO(), &models.Config{
85+
Include: []models.ConfigInclude{},
86+
})
7787
err := s.Run(context.TODO(), o)
7888
assert.NoError(t, err)
7989

0 commit comments

Comments
 (0)