Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions indexer/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Controller struct {
*indexer.Opts
// the manifest this controller is working on. populated on Scan() call
manifest *claircore.Manifest
layers []*claircore.Layer
files []claircore.ReadAtCloser
// the result of this scan. each stateFunc manipulates this field.
report *claircore.IndexReport
// a fatal error halting the scanning process
Expand Down
8 changes: 5 additions & 3 deletions indexer/controller/fetchlayers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
func fetchLayers(ctx context.Context, s *Controller) (State, error) {
zlog.Info(ctx).Msg("layers fetch start")
defer zlog.Info(ctx).Msg("layers fetch done")
toFetch, err := reduce(ctx, s.Store, s.Vscnrs, s.manifest.Layers)
var err error
s.layers, err = reduce(ctx, s.Store, s.Vscnrs, s.manifest.Layers)
if err != nil {
return Terminal, fmt.Errorf("failed to determine layers to fetch: %w", err)
}
zlog.Debug(ctx).
Int("count", len(toFetch)).
Int("count", len(s.layers)).
Msg("fetching layers")
if err := s.Realizer.Realize(ctx, toFetch); err != nil {
s.files, err = s.Realizer.Realize(ctx, s.layers)
if err != nil {
zlog.Warn(ctx).
Err(err).
Msg("layers fetch failure")
Expand Down
151 changes: 151 additions & 0 deletions indexer/controller/layerindexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package controller

import (
"context"
"errors"
"fmt"
"strings"

"github.com/quay/zlog"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
"github.com/quay/claircore/pkg/tarfs"
)

type LayerIndexer struct {
sem *semaphore.Weighted
perIndex int64

ps []indexer.PackageScanner
ds []indexer.DistributionScanner
rs []indexer.RepositoryScanner
}

func NewLayerIndexer() (*LayerIndexer, error) {
return nil, nil
}

type IndexRequest struct {
Manifest claircore.Digest
Layer []*claircore.Layer
Content []claircore.ReadAtCloser
}

type IndexOption uint

const (
_ IndexOption = iota
OptionSkipCache
OptionSkipStore
)

func (l *LayerIndexer) Index(ctx context.Context, req *IndexRequest, opts ...IndexOption) error {
ctx = zlog.ContextWithValues(ctx,
"component", "indexer/controller/LayerIndexer.Index",
"manifest", req.Manifest.String())
if err := l.sem.Acquire(ctx, l.perIndex); err != nil {
return fmt.Errorf("layerindexer: %w", err)
}
defer l.sem.Release(l.perIndex)
wsem := semaphore.NewWeighted(l.perIndex)

fses := make([]*tarfs.FS, len(req.Content))
fsErr := make([]error, len(req.Content))
for i := range req.Content {
go func(i int) {
if err := wsem.Acquire(ctx, 1); err != nil {
fsErr[i] = err
return
}
fses[i], fsErr[i] = tarfs.New(req.Content[i])
wsem.Release(1)
}(i)
}
var es errSlice
for _, e := range fsErr {
es = append(es, e)
}
if es != nil {
return es
}

eg, ctx := errgroup.WithContext(ctx)

// ...

var ie indexError
ie.Inner = eg.Wait()
for i, c := range req.Content {
if err := c.Close(); err != nil {
ie.Close = append(ie.Close, closeError{
Err: err,
Which: req.Layer[i].Hash.String(),
})
}
}
if !ie.empty() {
return &ie
}
return nil
}

type errSlice []error

func (es errSlice) Error() string {
return ""
}

func (es errSlice) Unwrap() error {
if len(es) > 1 {
return es[1:]
}
return nil
}
func (es errSlice) Is(tgt error) bool {
return len(es) > 0 && errors.Is(es[0], tgt)
}

type indexError struct {
Inner error
Close []closeError
}

type closeError struct {
Which string
Err error
}

func (e *indexError) empty() bool {
return errors.Is(e.Inner, nil) && len(e.Close) == 0
}

func (e *indexError) Error() string {
var b strings.Builder
orig, close := !errors.Is(e.Inner, nil), len(e.Close) != 0
if orig {
b.WriteString(e.Inner.Error())
}
if orig && close {
b.WriteString(" (while closing layer contents:")
}
for i, cl := range e.Close {
if i != 0 {
b.WriteByte(';')
}
b.WriteByte(' ')
b.WriteString(cl.Which)
b.WriteString(": ")
b.WriteString(cl.Err.Error())
}
if orig && close {
b.WriteByte(')')
}
return b.String()
}

func (e *indexError) Unwrap() error {
return e.Inner
}
2 changes: 1 addition & 1 deletion indexer/controller/scanlayers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func scanLayers(ctx context.Context, c *Controller) (State, error) {
zlog.Info(ctx).Msg("layers scan start")
defer zlog.Info(ctx).Msg("layers scan done")
err := c.LayerScanner.Scan(ctx, c.manifest.Hash, c.manifest.Layers)
err := c.LayerScanner.Scan(ctx, c.manifest.Hash, c.layers, c.files)
if err != nil {
return Terminal, fmt.Errorf("failed to scan all layer contents: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion indexer/controller/scanlayers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestScanLayers(t *testing.T) {
ls := indexer.NewMockLayerScanner(ctrl)
s := indexer.NewMockStore(ctrl)

ls.EXPECT().Scan(gomock.Any(), gomock.Any(), gomock.Any()).MaxTimes(1).MinTimes(1).Return(nil)
ls.EXPECT().Scan(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).MaxTimes(1).MinTimes(1).Return(nil)
s.EXPECT().LayerScanned(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(true, nil)
return ls, s
},
Expand Down
2 changes: 1 addition & 1 deletion indexer/layerscanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (
// discovered items into the persistence layer. scanning mechanics (concurrency, ordering, etc...)
// will be defined by implementations.
type LayerScanner interface {
Scan(ctx context.Context, manifest claircore.Digest, layers []*claircore.Layer) error
Scan(ctx context.Context, manifest claircore.Digest, layers []*claircore.Layer, contents []claircore.ReadAtCloser) error
}
81 changes: 70 additions & 11 deletions indexer/layerscanner/layerscanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package layerscanner

import (
"context"
"errors"
"fmt"
"io"
"runtime"
"strings"

"github.com/quay/zlog"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -143,7 +146,7 @@ func configAndFilter(ctx context.Context, opts *indexer.Opts, s indexer.Versione
//
// The provided Context controls cancellation for all scanners. The first error
// reported halts all work and is returned from Scan.
func (ls *layerScanner) Scan(ctx context.Context, manifest claircore.Digest, layers []*claircore.Layer) error {
func (ls *layerScanner) Scan(ctx context.Context, manifest claircore.Digest, layers []*claircore.Layer, contents []claircore.ReadAtCloser) error {
ctx = zlog.ContextWithValues(ctx,
"component", "datastore/layerscannner/layerScanner.Scan",
"manifest", manifest.String())
Expand All @@ -152,38 +155,94 @@ func (ls *layerScanner) Scan(ctx context.Context, manifest claircore.Digest, lay
g, ctx := errgroup.WithContext(ctx)
// Launch is a closure to capture the loop variables and then call the
// scanLayer method.
launch := func(l *claircore.Layer, s indexer.VersionedScanner) func() error {
launch := func(l *claircore.Layer, s indexer.VersionedScanner, c io.ReaderAt) func() error {
return func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
return ls.scanLayer(ctx, l, s)
return ls.scanLayer(ctx, l, c, s)
}
}
dedupe := make(map[string]struct{})
for _, l := range layers {
for i, l := range layers {
if _, ok := dedupe[l.Hash.String()]; ok {
continue
}
dedupe[l.Hash.String()] = struct{}{}
c := contents[i]
for _, s := range ls.ps {
g.Go(launch(l, s))
g.Go(launch(l, s, c))
}
for _, s := range ls.ds {
g.Go(launch(l, s))
g.Go(launch(l, s, c))
}
for _, s := range ls.rs {
g.Go(launch(l, s))
g.Go(launch(l, s, c))
}
}

return g.Wait()
var ie indexError
ie.Inner = g.Wait()
for i, c := range contents {
if err := c.Close(); err != nil {
ie.Close = append(ie.Close, closeError{
Err: err,
Which: layers[i].Hash.String(),
})
}
}
if !ie.empty() {
return &ie
}
return nil
}

type indexError struct {
Inner error
Close []closeError
}

type closeError struct {
Which string
Err error
}

func (e *indexError) empty() bool {
return errors.Is(e.Inner, nil) && len(e.Close) == 0
}

func (e *indexError) Error() string {
var b strings.Builder
orig, close := !errors.Is(e.Inner, nil), len(e.Close) != 0
if orig {
b.WriteString(e.Inner.Error())
}
if orig && close {
b.WriteString(" (while closing layer contents:")
}
for i, cl := range e.Close {
if i != 0 {
b.WriteByte(';')
}
b.WriteByte(' ')
b.WriteString(cl.Which)
b.WriteString(": ")
b.WriteString(cl.Err.Error())
}
if orig && close {
b.WriteByte(')')
}
return b.String()
}

func (e *indexError) Unwrap() error {
return e.Inner
}

// ScanLayer (along with the result type) handles an individual (scanner, layer)
// pair.
func (ls *layerScanner) scanLayer(ctx context.Context, l *claircore.Layer, s indexer.VersionedScanner) error {
func (ls *layerScanner) scanLayer(ctx context.Context, l *claircore.Layer, c io.ReaderAt, s indexer.VersionedScanner) error {
ctx = zlog.ContextWithValues(ctx,
"component", "indexer/layerscannner/layerScanner.scan",
"scanner", s.Name(),
Expand All @@ -202,7 +261,7 @@ func (ls *layerScanner) scanLayer(ctx context.Context, l *claircore.Layer, s ind
}

var result result
if err := result.Do(ctx, s, l); err != nil {
if err := result.Do(ctx, s, l, c); err != nil {
return err
}

Expand All @@ -223,7 +282,7 @@ type result struct {
// Do asserts the Scanner back to having a Scan method, and then calls it.
//
// The success value is captured and the error value is returned by Do.
func (r *result) Do(ctx context.Context, s indexer.VersionedScanner, l *claircore.Layer) error {
func (r *result) Do(ctx context.Context, s indexer.VersionedScanner, l *claircore.Layer, c io.ReaderAt) error {
var err error
switch s := s.(type) {
case indexer.PackageScanner:
Expand Down
13 changes: 12 additions & 1 deletion indexer/layerscanner/layerscanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package layerscanner
import (
"context"
"crypto/sha256"
"io"
"testing"
"time"

Expand Down Expand Up @@ -94,7 +95,17 @@ func TestScanNoErrors(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := layerscanner.Scan(ctx, d, layers); err != nil {
rc := make([]claircore.ReadAtCloser, len(layers))
for i := range rc {
rc[i] = fakeReader{}
}
if err := layerscanner.Scan(ctx, d, layers, rc); err != nil {
t.Fatalf("failed to scan test layers: %v", err)
}
}

type fakeReader struct{}

func (fakeReader) Close() error { return nil }
func (fakeReader) Read(_ []byte) (int, error) { return 0, io.EOF }
func (fakeReader) ReadAt(_ []byte, _ int64) (int, error) { return 0, io.EOF }
2 changes: 1 addition & 1 deletion indexer/realizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import (
// if necessary, and making the uncompressed tar contents available for
// reading.
type Realizer interface {
Realize(context.Context, []*claircore.Layer) error
Realize(context.Context, []*claircore.Layer) ([]claircore.ReadAtCloser, error)
Close() error
}
Loading