Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions bytespool/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ func AcquireWriterSize(out io.Writer, size int) *Writer {
}
}

func FlushReleaseWriter(w *Writer) error {
err := w.Flush()
if err != nil {
return err
}
func ReleaseWriter(w *Writer) {
Release(w.Buf)
w.Buf = nil
w.out = nil
writerPool.Put(w)
}

func FlushReleaseWriter(w *Writer) error {
if err := w.Flush(); err != nil {
return err
}
ReleaseWriter(w)
return nil
}

Expand Down
14 changes: 9 additions & 5 deletions cmd/distribution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/seq"
Expand Down Expand Up @@ -58,8 +59,10 @@ func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) {
return data, nil
}

func loadInfo(path string) *frac.Info {
func loadInfo(path string) *common.Info {
indexReader, f := getReader(path)
defer f.Close()

result, err := readBlock(indexReader, 0)
if err != nil {
logger.Fatal("error reading block", zap.String("file", path), zap.Error(err))
Expand All @@ -69,7 +72,7 @@ func loadInfo(path string) *frac.Info {
logger.Fatal("seq-db index file header corrupted", zap.String("file", path))
}

b := frac.BlockInfo{}
b := sealed.BlockInfo{}
err = b.Unpack(result)
if err != nil {
logger.Fatal("can't unpack info bloc of index file", zap.String("file", path), zap.Error(err))
Expand All @@ -84,8 +87,9 @@ func loadInfo(path string) *frac.Info {
return b.Info
}

func buildDist(dist *seq.MIDsDistribution, path string, _ *frac.Info) {
blocksReader, _ := getReader(path)
func buildDist(dist *seq.MIDsDistribution, path string, _ *common.Info) {
blocksReader, f := getReader(path)
defer f.Close()

// skip tokens
blockIndex := uint32(1)
Expand Down
9 changes: 6 additions & 3 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/alecthomas/units"
"go.uber.org/zap"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/fracmanager"
Expand Down Expand Up @@ -91,8 +91,11 @@ func analyzeIndex(
}

// load info
b := frac.BlockInfo{}
_ = b.Unpack(readBlock())
var b sealed.BlockInfo
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}

docsCount := int(b.Info.DocsTotal)

// load tokens
Expand Down
3 changes: 2 additions & 1 deletion cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/mappingprovider"
Expand Down Expand Up @@ -259,7 +260,7 @@ func startStore(
MaintenanceDelay: 0,
CacheGCDelay: 0,
CacheCleanupDelay: 0,
SealParams: frac.SealParams{
SealParams: common.SealParams{
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
Expand Down
3 changes: 1 addition & 2 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ const (
// DummyMID is used in aggregations when we do not need to build time series.
DummyMID = 0

IDsBlockSize = int(4 * units.KiB)
RegularBlockSize = int(16 * units.KiB)
IDsPerBlock = int(4 * units.KiB)
LIDBlockCap = int(64 * units.KiB)
RegularBlockSize = int(16 * units.KiB)

DefaultMaintenanceDelay = time.Second
DefaultCacheGCDelay = 1 * time.Second
Expand Down
7 changes: 4 additions & 3 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/metric"
"github.com/ozontech/seq-db/metric/stopwatch"
Expand All @@ -38,7 +39,7 @@ type Active struct {
released bool

infoMu sync.RWMutex
info *Info
info *common.Info

MIDs *UInt64s
RIDs *UInt64s
Expand Down Expand Up @@ -103,7 +104,7 @@ func NewActive(
writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),

BaseFileName: baseFileName,
info: NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
Config: cfg,
}

Expand Down Expand Up @@ -300,7 +301,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
}
}

func (f *Active) Info() *Info {
func (f *Active) Info() *common.Info {
f.infoMu.RLock()
defer f.infoMu.RUnlock()

Expand Down
33 changes: 23 additions & 10 deletions frac/active_docs_positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ import (
)

type DocsPositions struct {
mu sync.RWMutex
positions map[seq.ID]seq.DocPos
mu sync.RWMutex
idToPos map[seq.ID]seq.DocPos
lidToPos []seq.DocPos
}

func NewSyncDocsPositions() *DocsPositions {
return &DocsPositions{
positions: make(map[seq.ID]seq.DocPos),
dp := DocsPositions{
lidToPos: make([]seq.DocPos, 0),
idToPos: make(map[seq.ID]seq.DocPos),
}
dp.lidToPos = append(dp.lidToPos, 0) // systemID
return &dp
}

func (dp *DocsPositions) Get(id seq.ID) seq.DocPos {
if val, ok := dp.positions[id]; ok {
if val, ok := dp.idToPos[id]; ok {
return val
}
return seq.DocPosNotFound
Expand All @@ -36,13 +40,22 @@ func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID {
dp.mu.Lock()
defer dp.mu.Unlock()

appended := make([]seq.ID, 0)
appended := make([]seq.ID, 0, len(ids))
for i, id := range ids {
// Positions may be equal in case of nested index.
if savedPos, ok := dp.positions[id]; !ok || savedPos == pos[i] {
dp.positions[id] = pos[i]
appended = append(appended, id)
p, ok := dp.idToPos[id]

if ok {
if p != pos[i] {
// same ID but different position
// this is a duplicate ID, we can't append it
continue
}
} else {
dp.idToPos[id] = pos[i]
}

dp.lidToPos = append(dp.lidToPos, pos[i])
appended = append(appended, id)
}
return appended
}
3 changes: 2 additions & 1 deletion frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frac
import (
"context"

"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/metric/stopwatch"
Expand All @@ -15,7 +16,7 @@ import (
type activeDataProvider struct {
ctx context.Context
config *Config
info *Info
info *common.Info

mids *UInt64s
rids *UInt64s
Expand Down
Loading
Loading