Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 69 additions & 14 deletions profiler/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,29 @@ func getZstdLevelOrDefault(level int) zstd.EncoderLevel {
return zstd.SpeedDefault
}

// newCompressionPipeline returns a compressor that converts the data written to
// it from the expected input compression to the given output compression.
func newCompressionPipeline(in compression, out compression) (compressor, error) {
type compressionPipelineBuilder struct {
zstdEncoders map[zstd.EncoderLevel]*sharedZstdEncoder
}

func (b *compressionPipelineBuilder) getZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) {
if b.zstdEncoders == nil {
b.zstdEncoders = make(map[zstd.EncoderLevel]*sharedZstdEncoder)
}
encoder, ok := b.zstdEncoders[level]
if !ok {
var err error
encoder, err = newSharedZstdEncoder(level)
if err != nil {
return nil, err
}
b.zstdEncoders[level] = encoder
}
return encoder, nil
}

// Build returns a compressor that converts the data written to it from the
// expected input compression to the given output compression.
func (b *compressionPipelineBuilder) Build(in compression, out compression) (compressor, error) {
if in == out {
return newPassthroughCompressor(), nil
}
Expand All @@ -149,11 +169,15 @@ func newCompressionPipeline(in compression, out compression) (compressor, error)
}

if in == noCompression && out.algorithm == compressionAlgorithmZstd {
return zstd.NewWriter(nil, zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level)))
return b.getZstdEncoder(getZstdLevelOrDefault(out.level))
}

if in.algorithm == compressionAlgorithmGzip && out.algorithm == compressionAlgorithmZstd {
return newZstdRecompressor(getZstdLevelOrDefault(out.level))
encoder, err := b.getZstdEncoder(getZstdLevelOrDefault(out.level))
if err != nil {
return nil, err
}
return newZstdRecompressor(encoder), nil
}

return nil, fmt.Errorf("unsupported recompression: %s -> %s", in, out)
Expand All @@ -164,8 +188,11 @@ func newCompressionPipeline(in compression, out compression) (compressor, error)
// the data from one format and then re-compresses it into another format.
type compressor interface {
io.Writer
io.Closer
// Reset resets the compressor to the given writer. It may also acquire a
// shared underlying resource, so callers must always call Close().
Reset(w io.Writer)
// Close closes the compressor and releases any shared underlying resource.
Close() error
}

// newPassthroughCompressor returns a compressor that simply passes all data
Expand All @@ -186,21 +213,16 @@ func (r *passthroughCompressor) Close() error {
return nil
}

func newZstdRecompressor(level zstd.EncoderLevel) (*zstdRecompressor, error) {
zstdOut, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
return &zstdRecompressor{zstdOut: zstdOut, err: make(chan error)}, nil
func newZstdRecompressor(encoder *sharedZstdEncoder) *zstdRecompressor {
return &zstdRecompressor{zstdOut: encoder, err: make(chan error)}
}

type zstdRecompressor struct {
// err synchronizes finishing writes after closing pw and reports any
// error during recompression
err chan error
pw io.WriteCloser
zstdOut *zstd.Encoder
level zstd.EncoderLevel
zstdOut *sharedZstdEncoder
}

func (r *zstdRecompressor) Reset(w io.Writer) {
Expand All @@ -227,3 +249,36 @@ func (r *zstdRecompressor) Close() error {
err := <-r.err
return cmp.Or(err, r.zstdOut.Close())
}

// newSharedZstdEncoder creates a new shared Zstd encoder with the given level.
// It expects the Reset and Close method to be used in an acquire and release
// fashion.
func newSharedZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) {
encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
return &sharedZstdEncoder{encoder: encoder, sema: make(chan struct{}, 1)}, nil
}

type sharedZstdEncoder struct {
encoder *zstd.Encoder
sema chan struct{}
}

// Reset acquires the semaphore and resets the encoder to the given writer.
func (s *sharedZstdEncoder) Reset(w io.Writer) {
s.sema <- struct{}{}
s.encoder.Reset(w)
}

func (s *sharedZstdEncoder) Write(p []byte) (int, error) {
return s.encoder.Write(p)
}

// Close releases the semaphore and closes the encoder.
func (s *sharedZstdEncoder) Close() error {
err := s.encoder.Close()
<-s.sema
return err
}
10 changes: 8 additions & 2 deletions profiler/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func TestNewCompressionPipeline(t *testing.T) {

for _, test := range tests {
t.Run(fmt.Sprintf("%s->%s", test.in, test.out), func(t *testing.T) {
pipeline, err := newCompressionPipeline(test.in, test.out)
var pipelineBuilder compressionPipelineBuilder
pipeline, err := pipelineBuilder.Build(test.in, test.out)
require.NoError(t, err)
buf := &bytes.Buffer{}
pipeline.Reset(buf)
Expand Down Expand Up @@ -172,8 +173,13 @@ func BenchmarkRecompression(b *testing.B) {
b.Run(fmt.Sprintf("%s-%s", in.inAlg.String(), in.outLevel), func(b *testing.B) {
data := compressData(b, inputdata, in.inAlg)
b.ResetTimer()
var pipelineBuilder compressionPipelineBuilder
for i := 0; i < b.N; i++ {
z := &zstdRecompressor{level: in.outLevel}
encoder, err := pipelineBuilder.getZstdEncoder(in.outLevel)
if err != nil {
b.Fatal(err)
}
z := newZstdRecompressor(encoder)
z.Reset(io.Discard)
if _, err := z.Write(data); err != nil {
b.Fatal(err)
Expand Down
70 changes: 37 additions & 33 deletions profiler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var profileTypes = map[ProfileType]profileType{
Filename: "cpu.pprof",
Collect: func(p *profiler) ([]byte, error) {
var buf bytes.Buffer
var outBuf bytes.Buffer
// Start the CPU profiler at the end of the profiling
// period so that we're sure to capture the CPU usage of
// this library, which mostly happens at the end
Expand All @@ -101,9 +102,7 @@ var profileTypes = map[ProfileType]profileType{
runtime.SetCPUProfileRate(p.cfg.cpuProfileRate)
}

compressor := p.compressors[CPUProfile]
compressor.Reset(&buf)
if err := p.startCPUProfile(compressor); err != nil {
if err := p.startCPUProfile(&outBuf); err != nil {
return nil, err
}
p.interruptibleSleep(p.cfg.cpuDuration)
Expand All @@ -113,10 +112,12 @@ var profileTypes = map[ProfileType]profileType{
// the other profile types
p.pendingProfiles.Wait()
p.stopCPUProfile()
if err := compressor.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil

c := p.compressors[CPUProfile]
c.Reset(&buf)
_, writeErr := outBuf.WriteTo(c)
closeErr := c.Close()
return buf.Bytes(), cmp.Or(writeErr, closeErr)
},
},
// HeapProfile is complex due to how the Go runtime exposes it. It contains 4
Expand Down Expand Up @@ -175,10 +176,10 @@ var profileTypes = map[ProfileType]profileType{
return nil, err
}

compressor := p.compressors[expGoroutineWaitProfile]
compressor.Reset(pprof)
err := goroutineDebug2ToPprof(text, compressor, now)
err = cmp.Or(err, compressor.Close())
c := p.compressors[expGoroutineWaitProfile]
c.Reset(pprof)
err := goroutineDebug2ToPprof(text, c, now)
err = cmp.Or(err, c.Close())
return pprof.Bytes(), err
},
},
Expand All @@ -187,11 +188,11 @@ var profileTypes = map[ProfileType]profileType{
Filename: "metrics.json",
Collect: func(p *profiler) ([]byte, error) {
var buf bytes.Buffer
compressor := p.compressors[MetricsProfile]
compressor.Reset(&buf)
c := p.compressors[MetricsProfile]
c.Reset(&buf)
interrupted := p.interruptibleSleep(p.cfg.period)
err := p.met.report(now(), compressor)
err = cmp.Or(err, compressor.Close())
err := p.met.report(now(), c)
err = cmp.Or(err, c.Close())
if err != nil && interrupted {
err = errProfilerStopped
}
Expand All @@ -204,9 +205,8 @@ var profileTypes = map[ProfileType]profileType{
Collect: func(p *profiler) ([]byte, error) {
p.lastTrace = time.Now()
buf := new(bytes.Buffer)
compressor := p.compressors[executionTrace]
compressor.Reset(buf)
lt := newLimitedTraceCollector(compressor, int64(p.cfg.traceConfig.Limit))
outBuf := new(bytes.Buffer)
lt := newLimitedTraceCollector(outBuf, int64(p.cfg.traceConfig.Limit))
if err := trace.Start(lt); err != nil {
return nil, err
}
Expand All @@ -217,10 +217,12 @@ var profileTypes = map[ProfileType]profileType{
case <-lt.done: // The trace size limit was exceeded
}
trace.Stop()
if err := compressor.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil

c := p.compressors[executionTrace]
c.Reset(buf)
_, writeErr := outBuf.WriteTo(c)
closeErr := c.Close()
return buf.Bytes(), cmp.Or(writeErr, closeErr)
},
},
}
Expand Down Expand Up @@ -284,10 +286,10 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt
var buf bytes.Buffer
dp, ok := p.deltas[pt]
if !ok || !p.cfg.deltaProfiles {
compressor := p.compressors[pt]
compressor.Reset(&buf)
err := p.lookupProfile(name, compressor, 0)
err = cmp.Or(err, compressor.Close())
c := p.compressors[pt]
c.Reset(&buf)
err := p.lookupProfile(name, c, 0)
err = cmp.Or(err, c.Close())
return buf.Bytes(), err
}

Expand Down Expand Up @@ -435,13 +437,15 @@ func (fdp *fastDeltaProfiler) Delta(data []byte) (b []byte, err error) {
}

fdp.buf.Reset()
fdp.compressor.Reset(&fdp.buf)

if err = fdp.dc.Delta(data, fdp.compressor); err != nil {
return nil, fmt.Errorf("error computing delta: %s", err.Error())
}
if err = fdp.compressor.Close(); err != nil {
return nil, fmt.Errorf("error flushing gzip writer: %s", err.Error())
c := fdp.compressor
c.Reset(&fdp.buf)

deltaErr := fdp.dc.Delta(data, c)
closeErr := c.Close()
if deltaErr != nil {
return nil, fmt.Errorf("error computing delta: %w", deltaErr)
} else if closeErr != nil {
return nil, fmt.Errorf("error flushing compressor: %w", closeErr)
}
// The returned slice will be retained in case the profile upload fails,
// so we need to return a copy of the buffer's bytes to avoid a data
Expand Down
3 changes: 2 additions & 1 deletion profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,11 @@ func newProfiler(opts ...Option) (*profiler, error) {
if p.cfg.traceConfig.Enabled {
types = append(types, executionTrace)
}
var pipelineBuilder compressionPipelineBuilder
for _, pt := range types {
isDelta := p.cfg.deltaProfiles && len(profileTypes[pt].DeltaValues) > 0
in, out := compressionStrategy(pt, isDelta, p.cfg.compressionConfig)
compressor, err := newCompressionPipeline(in, out)
compressor, err := pipelineBuilder.Build(in, out)
if err != nil {
return nil, err
}
Expand Down
Loading