diff --git a/bytespool/writer.go b/bytespool/writer.go index 411275e4..48abf4a4 100644 --- a/bytespool/writer.go +++ b/bytespool/writer.go @@ -29,15 +29,18 @@ func AcquireWriterSize(out io.Writer, size int) *Writer { } } -func FlushReleaseWriter(w *Writer) error { - err := w.Flush() - if err != nil { - return err - } +func ReleaseWriter(w *Writer) { Release(w.Buf) w.Buf = nil w.out = nil writerPool.Put(w) +} + +func FlushReleaseWriter(w *Writer) error { + if err := w.Flush(); err != nil { + return err + } + ReleaseWriter(w) return nil } diff --git a/cmd/distribution/main.go b/cmd/distribution/main.go index 25ac571d..9b27e8f2 100644 --- a/cmd/distribution/main.go +++ b/cmd/distribution/main.go @@ -11,7 +11,8 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/seq" @@ -58,8 +59,10 @@ func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) { return data, nil } -func loadInfo(path string) *frac.Info { +func loadInfo(path string) *common.Info { indexReader, f := getReader(path) + defer f.Close() + result, err := readBlock(indexReader, 0) if err != nil { logger.Fatal("error reading block", zap.String("file", path), zap.Error(err)) @@ -69,7 +72,7 @@ func loadInfo(path string) *frac.Info { logger.Fatal("seq-db index file header corrupted", zap.String("file", path)) } - b := frac.BlockInfo{} + b := sealed.BlockInfo{} err = b.Unpack(result) if err != nil { logger.Fatal("can't unpack info bloc of index file", zap.String("file", path), zap.Error(err)) @@ -84,8 +87,9 @@ func loadInfo(path string) *frac.Info { return b.Info } -func buildDist(dist *seq.MIDsDistribution, path string, _ *frac.Info) { - blocksReader, _ := getReader(path) +func buildDist(dist *seq.MIDsDistribution, path string, _ *common.Info) { + blocksReader, f := getReader(path) + defer f.Close() // skip tokens blockIndex := uint32(1) diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index c0740777..3e3eb76d 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -10,7 +10,7 @@ import ( "github.com/alecthomas/units" "go.uber.org/zap" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/fracmanager" @@ -91,8 +91,11 @@ func analyzeIndex( } // load info - b := frac.BlockInfo{} - _ = b.Unpack(readBlock()) + var b sealed.BlockInfo + if err := b.Unpack(readBlock()); err != nil { + logger.Fatal("error unpacking block info", zap.Error(err)) + } + docsCount := int(b.Info.DocsTotal) // load tokens diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 1bebc5aa..4150d75e 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -22,6 +22,7 @@ import ( "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/mappingprovider" @@ -259,7 +260,7 @@ func startStore( MaintenanceDelay: 0, CacheGCDelay: 0, CacheCleanupDelay: 0, - SealParams: frac.SealParams{ + SealParams: common.SealParams{ IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel, LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel, TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel, diff --git a/consts/consts.go b/consts/consts.go index b6817b55..db0d0aa8 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -11,10 +11,9 @@ const ( // DummyMID is used in aggregations when we do not need to build time series. DummyMID = 0 - IDsBlockSize = int(4 * units.KiB) - RegularBlockSize = int(16 * units.KiB) IDsPerBlock = int(4 * units.KiB) LIDBlockCap = int(64 * units.KiB) + RegularBlockSize = int(16 * units.KiB) DefaultMaintenanceDelay = time.Second DefaultCacheGCDelay = 1 * time.Second diff --git a/frac/active.go b/frac/active.go index 09e56331..25f7c28e 100644 --- a/frac/active.go +++ b/frac/active.go @@ -16,6 +16,7 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/metric/stopwatch" @@ -38,7 +39,7 @@ type Active struct { released bool infoMu sync.RWMutex - info *Info + info *common.Info MIDs *UInt64s RIDs *UInt64s @@ -103,7 +104,7 @@ func NewActive( writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync), BaseFileName: baseFileName, - info: NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), + info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), Config: cfg, } @@ -300,7 +301,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { } } -func (f *Active) Info() *Info { +func (f *Active) Info() *common.Info { f.infoMu.RLock() defer f.infoMu.RUnlock() diff --git a/frac/active_docs_positions.go b/frac/active_docs_positions.go index 1949890c..0b4c596b 100644 --- a/frac/active_docs_positions.go +++ b/frac/active_docs_positions.go @@ -7,18 +7,22 @@ import ( ) type DocsPositions struct { - mu sync.RWMutex - positions map[seq.ID]seq.DocPos + mu sync.RWMutex + idToPos map[seq.ID]seq.DocPos + lidToPos []seq.DocPos } func NewSyncDocsPositions() *DocsPositions { - return &DocsPositions{ - positions: make(map[seq.ID]seq.DocPos), + dp := DocsPositions{ + lidToPos: make([]seq.DocPos, 0), + idToPos: make(map[seq.ID]seq.DocPos), } + dp.lidToPos = append(dp.lidToPos, 0) // systemID + return &dp } func (dp *DocsPositions) Get(id seq.ID) seq.DocPos { - if val, ok := dp.positions[id]; ok { + if val, ok := dp.idToPos[id]; ok { return val } return seq.DocPosNotFound @@ -36,13 +40,22 @@ func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID { dp.mu.Lock() defer dp.mu.Unlock() - appended := make([]seq.ID, 0) + appended := make([]seq.ID, 0, len(ids)) for i, id := range ids { - // Positions may be equal in case of nested index. - if savedPos, ok := dp.positions[id]; !ok || savedPos == pos[i] { - dp.positions[id] = pos[i] - appended = append(appended, id) + p, ok := dp.idToPos[id] + + if ok { + if p != pos[i] { + // same ID but different position + // this is a duplicate ID, we can't append it + continue + } + } else { + dp.idToPos[id] = pos[i] } + + dp.lidToPos = append(dp.lidToPos, pos[i]) + appended = append(appended, id) } return appended } diff --git a/frac/active_index.go b/frac/active_index.go index 14ba2810..c4c2f5d7 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -3,6 +3,7 @@ package frac import ( "context" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/metric/stopwatch" @@ -15,7 +16,7 @@ import ( type activeDataProvider struct { ctx context.Context config *Config - info *Info + info *common.Info mids *UInt64s rids *UInt64s diff --git a/frac/active_sealer.go b/frac/active_sealer.go deleted file mode 100644 index b8d16abd..00000000 --- a/frac/active_sealer.go +++ /dev/null @@ -1,380 +0,0 @@ -package frac - -import ( - "encoding/binary" - "fmt" - "io" - "maps" - "os" - "path/filepath" - "slices" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/alecthomas/units" - - "github.com/ozontech/seq-db/bytespool" - "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/storage" - "github.com/ozontech/seq-db/util" -) - -type SealParams struct { - IDsZstdLevel int - LIDsZstdLevel int - TokenListZstdLevel int - DocsPositionsZstdLevel int - TokenTableZstdLevel int - - DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. - DocBlockSize int // DocBlockSize is decompressed payload size of document block. -} - -func Seal(f *Active, params SealParams) (*PreloadedData, error) { - logger.Info("sealing fraction", zap.String("fraction", f.BaseFileName)) - - start := time.Now() - info := *f.info // copy - if info.To == 0 { - logger.Panic("sealing of an empty active fraction is not supported") - } - info.SealingTime = uint64(start.UnixMilli()) - - indexFile, err := os.Create(f.BaseFileName + consts.IndexTmpFileSuffix) - if err != nil { - return nil, err - } - - if _, err = indexFile.Seek(16, io.SeekStart); err != nil { // skip 16 bytes for pos and length of registry - return nil, err - } - - preloaded, err := writeSealedFraction(f, &info, indexFile, params) - if err != nil { - return nil, err - } - - if indexFile, err = syncRename(indexFile, f.BaseFileName+consts.IndexFileSuffix); err != nil { - return nil, err - } - - parentDirPath := filepath.Dir(f.BaseFileName) - util.MustSyncPath(parentDirPath) - - stat, err := indexFile.Stat() - if err != nil { - return nil, err - } - info.IndexOnDisk = uint64(stat.Size()) - - preloaded.info = &info - preloaded.indexFile = indexFile - - logger.Info( - "fraction sealed", - zap.String("fraction", f.BaseFileName), - zap.Float64("time_spent_s", util.DurationToUnit(time.Since(start), "s")), - ) - - return preloaded, nil -} - -func syncRename(f *os.File, newName string) (*os.File, error) { - if err := f.Sync(); err != nil { - return nil, err - } - if err := os.Rename(f.Name(), newName); err != nil { - return nil, err - } - if err := f.Close(); err != nil { - return nil, err - } - return os.OpenFile(newName, os.O_RDONLY, 0o776) // reopen with new name -} - -func writeSortedDocs(f *Active, params SealParams, sortedIDs []seq.ID) (*os.File, []uint64, map[seq.ID]seq.DocPos, error) { - sdocsFile, err := os.Create(f.BaseFileName + consts.SdocsTmpFileSuffix) - if err != nil { - return nil, nil, nil, err - } - - logger.Info("sorting docs...") - bw := getDocBlocksWriter(sdocsFile, params.DocBlockSize, params.DocBlocksZstdLevel) - defer putDocBlocksWriter(bw) - - if err := writeDocsInOrder(f.DocsPositions, f.DocBlocks.GetVals(), f.sortReader, sortedIDs, bw); err != nil { - return nil, nil, nil, err - } - - if sdocsFile, err = syncRename(sdocsFile, f.BaseFileName+consts.SdocsFileSuffix); err != nil { - return nil, nil, nil, err - } - - return sdocsFile, slices.Clone(bw.BlockOffsets), maps.Clone(bw.Positions), nil -} - -func writeSealedFraction(f *Active, info *Info, indexFile io.WriteSeeker, params SealParams) (*PreloadedData, error) { - var err error - - docsFile := f.docsFile - blocksOffsets := f.DocBlocks.GetVals() - positions := f.DocsPositions.positions - - sortedIDs, oldToNewLIDsIndex := sortSeqIDs(f, f.MIDs.GetVals(), f.RIDs.GetVals()) - - if !f.Config.SkipSortDocs { - if docsFile, blocksOffsets, positions, err = writeSortedDocs(f, params, sortedIDs); err != nil { - return nil, err - } - stat, err := docsFile.Stat() - if err != nil { - return nil, err - } - info.DocsOnDisk = uint64(stat.Size()) - } - - producer := NewDiskBlocksProducer() - writer := NewSealedBlockWriter(indexFile) - - logger.Info("sealing frac stats...") - info.BuildDistribution(sortedIDs) - if err := writer.writeInfoBlock(producer.getInfoBlock(info)); err != nil { - return nil, fmt.Errorf("seal info error: %w", err) - } - - var tokenTable token.Table - { - logger.Info("sealing tokens...") - generator := producer.getTokensBlocksGenerator(f.TokenList) - tokenTable, err = writer.writeTokensBlocks(params.TokenListZstdLevel, generator) - if err != nil { - return nil, fmt.Errorf("sealing tokens error: %w", err) - } - } - - { - logger.Info("sealing tokens table...") - generator := producer.getTokenTableBlocksGenerator(f.TokenList, tokenTable) - if err := writer.writeTokenTableBlocks(params.TokenTableZstdLevel, generator); err != nil { - return nil, fmt.Errorf("sealing tokens table error: %w", err) - } - } - - { - logger.Info("writing document positions block...") - idsLen := f.MIDs.Len() - generator := producer.getPositionBlock(idsLen, blocksOffsets) - if err := writer.writePositionsBlock(params.DocsPositionsZstdLevel, generator); err != nil { - return nil, fmt.Errorf("document positions block error: %w", err) - } - } - - var minBlockIDs []seq.ID - { - logger.Info("sealing ids...") - ds := DocsPositions{positions: positions} - generator := producer.getIDsBlocksGenerator(sortedIDs, &ds, consts.IDsBlockSize) - minBlockIDs, err = writer.writeIDsBlocks(params.IDsZstdLevel, generator) - if err != nil { - return nil, fmt.Errorf("seal ids error: %w", err) - } - } - - var lidsTable *lids.Table - { - logger.Info("sealing lids...") - generator := producer.getLIDsBlockGenerator(f.TokenList, oldToNewLIDsIndex, f.MIDs, f.RIDs, int(consts.LIDBlockCap)) - lidsTable, err = writer.writeLIDsBlocks(params.LIDsZstdLevel, generator) - if err != nil { - return nil, fmt.Errorf("seal lids error: %w", err) - } - } - - logger.Info("write registry...") - if err = writer.WriteRegistryBlock(); err != nil { - return nil, fmt.Errorf("write registry error: %w", err) - } - - writer.stats.WriteLogs() - - return &PreloadedData{ - docsFile: docsFile, - lidsTable: lidsTable, - tokenTable: tokenTable, - blocksOffsets: blocksOffsets, - idsTable: seqids.Table{ - MinBlockIDs: minBlockIDs, - IDsTotal: f.MIDs.Len(), - IDBlocksTotal: f.DocBlocks.Len(), - StartBlockIndex: writer.startOfIDsBlockIndex, - }, - }, nil -} - -func writeDocsInOrder(pos *DocsPositions, blocks []uint64, docsReader storage.DocsReader, ids []seq.ID, bw *docBlocksWriter) error { - // Skip system seq.ID. - if len(ids) == 0 { - panic(fmt.Errorf("BUG: ids is empty")) - } - if ids[0] != systemSeqID { - panic(fmt.Errorf("BUG: system ID expected")) - } - ids = ids[1:] - - if err := writeDocBlocksInOrder(pos, blocks, docsReader, ids, bw); err != nil { - return err - } - return nil -} - -func writeDocBlocksInOrder(pos *DocsPositions, blocks []uint64, docsReader storage.DocsReader, ids []seq.ID, bw *docBlocksWriter) error { - var prevID seq.ID - for _, id := range ids { - if id == prevID { - // IDs have duplicates in case of nested index. - // In this case we need to store the original document once. - continue - } - prevID = id - - oldPos := pos.Get(id) - if oldPos == seq.DocPosNotFound { - panic(fmt.Errorf("BUG: can't find doc position")) - } - - blockOffsetIndex, docOffset := oldPos.Unpack() - blockOffset := blocks[blockOffsetIndex] - err := docsReader.ReadDocsFunc(blockOffset, []uint64{docOffset}, func(doc []byte) error { - return bw.WriteDoc(id, doc) - }) - if err != nil { - return fmt.Errorf("writing document to block: %s", err) - } - } - if err := bw.Flush(); err != nil { - return err - } - return nil -} - -type docBlocksWriter struct { - w *bytespool.Writer - compressLevel int - minBlockSize int - - curBlockIndex int - currentBlockOffset uint64 - - docs []byte - blockBuf []byte - - BlockOffsets []uint64 - Positions map[seq.ID]seq.DocPos -} - -var docBlocksWriterPool = sync.Pool{ - New: func() any { - return &docBlocksWriter{Positions: make(map[seq.ID]seq.DocPos)} - }, -} - -func getDocBlocksWriter(w io.Writer, blockSize, compressLevel int) *docBlocksWriter { - bw := docBlocksWriterPool.Get().(*docBlocksWriter) - - if blockSize <= 0 { - blockSize = int(units.MiB) * 4 - } - - bufSize := int(units.MiB) * 32 - if bufSize < blockSize { - bufSize = blockSize - } - - *bw = docBlocksWriter{ - w: bytespool.AcquireWriterSize(w, bufSize), - compressLevel: compressLevel, - minBlockSize: blockSize, - - curBlockIndex: 0, - currentBlockOffset: 0, - - docs: bw.docs[:0], - blockBuf: bw.blockBuf[:0], - - BlockOffsets: bw.BlockOffsets[:0], - Positions: bw.Positions, - } - clear(bw.Positions) - - return bw -} - -func putDocBlocksWriter(bw *docBlocksWriter) { - err := bytespool.FlushReleaseWriter(bw.w) - if err != nil { - panic(fmt.Errorf("BUG: writer must be flushed before releasing blocks writer: %s", err)) - } - bw.w = nil - docBlocksWriterPool.Put(bw) -} - -func (w *docBlocksWriter) WriteDoc(id seq.ID, doc []byte) error { - pos := seq.PackDocPos(uint32(w.curBlockIndex), uint64(len(w.docs))) - w.Positions[id] = pos - - w.docs = binary.LittleEndian.AppendUint32(w.docs, uint32(len(doc))) - w.docs = append(w.docs, doc...) - - if len(w.docs) > w.minBlockSize { - if err := w.flushBlock(); err != nil { - return err - } - } - - return nil -} - -func (w *docBlocksWriter) flushBlock() error { - blockLen, err := w.compressWriteBlock() - if err != nil { - return err - } - - w.docs = w.docs[:0] - w.BlockOffsets = append(w.BlockOffsets, w.currentBlockOffset) - w.curBlockIndex++ - w.currentBlockOffset += uint64(blockLen) - - return nil -} - -func (w *docBlocksWriter) compressWriteBlock() (int, error) { - w.blockBuf = w.blockBuf[:0] - w.blockBuf = storage.CompressDocBlock(w.docs, w.blockBuf, w.compressLevel) - - if _, err := w.w.Write(w.blockBuf); err != nil { - return 0, err - } - - blockLen := len(w.blockBuf) - return blockLen, nil -} - -func (w *docBlocksWriter) Flush() error { - if len(w.docs) > 0 { - if err := w.flushBlock(); err != nil { - return err - } - } - if err := w.w.Flush(); err != nil { - return err - } - return nil -} diff --git a/frac/active_sealer_test.go b/frac/active_sealer_test.go deleted file mode 100644 index 8527d4d2..00000000 --- a/frac/active_sealer_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package frac - -import ( - "bytes" - "encoding/binary" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/storage" -) - -func TestDocBlocksWriter(t *testing.T) { - r := require.New(t) - - payload := []byte(`"hello world"`) - test := func(blockSize int, docsCount int, expectedBlocks int) { - t.Helper() - - buf := bytes.NewBuffer(nil) - - bw := getDocBlocksWriter(buf, blockSize, 1) - - ids := make([]seq.ID, 0, docsCount) - for range docsCount { - id := nextSeqID() - r.NoError(bw.WriteDoc(id, payload)) - ids = append(ids, id) - } - r.NoError(bw.Flush()) - - r.Equal(expectedBlocks, len(bw.BlockOffsets)) - r.Equal(len(bw.Positions), len(ids)) - - docBlocks := buf.Bytes() - assertBlocks(t, docBlocks, expectedBlocks) - - for _, id := range ids { - pos := bw.Positions[id] - blockIndex, docOffset := pos.Unpack() - blockOffset := bw.BlockOffsets[blockIndex] - - blockHeader := storage.DocBlock(docBlocks[blockOffset:]) - block := storage.DocBlock(docBlocks[blockOffset : blockOffset+blockHeader.FullLen()]) - - binDocs, err := block.DecompressTo(nil) - r.NoError(err) - - binDoc := binDocs[docOffset:] - docLen := binary.LittleEndian.Uint32(binDoc) - doc := binDoc[4 : 4+docLen] - r.Equal(string(payload), string(doc)) - } - } - - test(0, 4, 1) - test(len(payload), 4, 4) - test(len(payload)+2, 4, 4) - test(len(payload)*2, 4, 2) - test(len(payload)*4, 4, 1) - test(len(payload)*4, 8, 2) -} - -func assertBlocks(t *testing.T, docBlocks []byte, numBlocks int) { - t.Helper() - var n int - for ; len(docBlocks) > 0; n++ { - header := storage.DocBlock(docBlocks[:storage.DocBlockHeaderLen]) - docBlock := storage.DocBlock(docBlocks[:header.FullLen()]) - _, err := docBlock.DecompressTo(nil) - require.NoError(t, err) - docBlocks = docBlocks[docBlock.FullLen():] - } - require.Equal(t, numBlocks, n) -} - -var ( - now = time.Now() - nextRid = 0 -) - -func nextSeqID() seq.ID { - nextRid++ - return seq.ID{ - MID: seq.MID(now.Nanosecond()), - RID: seq.RID(nextRid), - } -} diff --git a/frac/active_sealing_source.go b/frac/active_sealing_source.go new file mode 100644 index 00000000..58d43ef0 --- /dev/null +++ b/frac/active_sealing_source.go @@ -0,0 +1,460 @@ +package frac + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "iter" + "os" + "path/filepath" + "slices" + "time" + "unsafe" + + "github.com/alecthomas/units" + "go.uber.org/zap" + + "github.com/ozontech/seq-db/bytespool" + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" +) + +// ActiveSealingSource transforms data from in-memory (frac.Active) storage +// into a format suitable for disk writing during index creation. +// +// The main purpose of this type is to provide access to sorted data +// through a set of iterators that allow sequential processing of +// data in sized blocks for disk writing: +// +// - TokenBlocks() - iterator for token blocks, sorted by fields and values +// - Fields() - iterator for sorted fields with maximum TIDs +// - IDsBlocks() - iterator for document ID blocks and their positions +// - TokenLIDs() - iterator for LID lists for each token +// - Docs() - iterator for documents themselves with duplicate handling +// +// All iterators work with pre-sorted data and return information +// in an order optimal for creating disk index structures. +type ActiveSealingSource struct { + params common.SealParams // Sealing parameters + info *common.Info // fraction Info + created time.Time // Creation time of the source + sortedLIDs []uint32 // Sorted LIDs (Local ID) + oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting) + mids *UInt64s // MIDs + rids *UInt64s // RIDs + fields []string // Sorted field names + fieldsMaxTIDs []uint32 // Maximum TIDs for each field + tids []uint32 // Sorted TIDs (Token ID) + tokens [][]byte // Tokens (values) by TID + lids []*TokenLIDs // LID lists for each token + docPosOrig []seq.DocPos // Original document positions + docPosSorted []seq.DocPos // Document positions after sorting + blocksOffsets []uint64 // Document block offsets + docsReader *storage.DocsReader // Document storage reader + lastErr error // Last error +} + +// NewActiveSealingSource creates a new data source for sealing +// based on an active in-memory index. +func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) { + info := *active.info // copy + sortedLIDs := active.GetAllDocuments() + + // Sort fields and get maximum TIDs for each field + sortedFields, fieldsMaxTIDs := sortFields(active.TokenList) + + // Sort tokens within each field + sortedTIDs := sortTokens(sortedFields, active.TokenList) + + src := ActiveSealingSource{ + params: params, + info: &info, + created: time.Now(), + sortedLIDs: sortedLIDs, + oldToNewLIDs: makeInverser(sortedLIDs), // Create LID mapping + mids: active.MIDs, + rids: active.RIDs, + fields: sortedFields, + tids: sortedTIDs, + fieldsMaxTIDs: fieldsMaxTIDs, + tokens: active.TokenList.tidToVal, + lids: active.TokenList.tidToLIDs, + docPosOrig: active.DocsPositions.lidToPos, + blocksOffsets: active.DocBlocks.vals, + docsReader: &active.sortReader, + } + + src.prepareInfo() + + // Sort documents if not skipped in configuration + if !active.Config.SkipSortDocs { + if err := src.SortDocs(); err != nil { + return nil, err + } + } + + return &src, nil +} + +// sortFields sorts field names and calculates maximum TIDs for each field. +// Returns sorted field list and array of maximum TIDs. +func sortFields(tl *TokenList) ([]string, []uint32) { + fields := make([]string, 0, len(tl.FieldTIDs)) + for field := range tl.FieldTIDs { + fields = append(fields, field) + } + slices.Sort(fields) + + pos := 0 + maxTIDs := make([]uint32, 0, len(fields)) + for _, field := range fields { + pos += len(tl.FieldTIDs[field]) + maxTIDs = append(maxTIDs, uint32(pos)) + } + + return fields, maxTIDs +} + +// sortTokens sorts tokens lexicographically within each field. +// Returns sorted list of TIDs. +func sortTokens(sortedFields []string, tl *TokenList) []uint32 { + pos := 0 + tids := make([]uint32, 0, len(tl.tidToVal)) + for _, field := range sortedFields { + tids = append(tids, tl.FieldTIDs[field]...) + chunk := tids[pos:] + slices.SortFunc(chunk, func(i, j uint32) int { + a := tl.tidToVal[i] + b := tl.tidToVal[j] + return bytes.Compare(a, b) // Sort by token value + }) + pos = len(tids) + } + return tids +} + +// LastError returns the last error that occurred during processing. +func (src *ActiveSealingSource) LastError() error { + return src.lastErr +} + +// prepareInfo prepares metadata for disk writing. +func (src *ActiveSealingSource) prepareInfo() { + src.info.MetaOnDisk = 0 + src.info.SealingTime = uint64(src.created.UnixMilli()) + src.info.BuildDistribution(src.mids.vals) +} + +// Info returns index metadata information. +func (src *ActiveSealingSource) Info() *common.Info { + return src.info +} + +// TokenBlocks returns an iterator for token blocks for disk writing. +// Tokens are pre-sorted: first by fields, then lexicographically within each field. +// Each block contains up to blockSize bytes of data for efficient writing. +func (src *ActiveSealingSource) TokenBlocks(blockSize int) iter.Seq[[][]byte] { + const tokenLengthSize = int(unsafe.Sizeof(uint32(0))) + return func(yield func([][]byte) bool) { + if len(src.tids) == 0 { + return + } + if blockSize <= 0 { + src.lastErr = errors.New("sealing: token block size must be > 0") + return + } + + actualSize := 0 + block := make([][]byte, 0, blockSize) + + // Iterate through all sorted TIDs + for _, tid := range src.tids { + if actualSize >= blockSize { + if !yield(block) { + return + } + actualSize = 0 + block = block[:0] + } + token := src.tokens[tid] + actualSize += tokenLengthSize // Add the size of the token length field + actualSize += len(token) // Add the size of the token itself + block = append(block, token) + } + yield(block) + } +} + +// Fields returns an iterator for sorted fields and their maximum TIDs. +// Fields are sorted lexicographically, ensuring predictable order +// when building disk index structures. +func (src *ActiveSealingSource) Fields() iter.Seq2[string, uint32] { + return func(yield func(string, uint32) bool) { + for i, field := range src.fields { + if !yield(field, src.fieldsMaxTIDs[i]) { + return + } + } + } +} + +// IDsBlocks returns an iterator for document ID blocks and corresponding positions. +// IDs are sorted. Block size is controlled by blockSize parameter for balance between +// performance and memory usage. +func (src *ActiveSealingSource) IDsBlocks(blockSize int) iter.Seq2[[]seq.ID, []seq.DocPos] { + return func(yield func([]seq.ID, []seq.DocPos) bool) { + mids := src.mids.vals + rids := src.rids.vals + + ids := make([]seq.ID, 0, blockSize) + pos := make([]seq.DocPos, 0, blockSize) + + // First reserved ID (system). This position is not used because Local IDs (LIDs) use 1-based indexing. + ids = append(ids, seq.ID{MID: seq.MID(mids[0]), RID: seq.RID(rids[0])}) + pos = append(pos, 0) + + // Iterate through sorted LIDs + for i, lid := range src.sortedLIDs { + if len(ids) == blockSize { + if !yield(ids, pos) { + return + } + ids = ids[:0] + pos = pos[:0] + } + id := seq.ID{MID: seq.MID(mids[lid]), RID: seq.RID(rids[lid])} + ids = append(ids, id) + + // Use sorted or original positions + if len(src.docPosSorted) == 0 { + pos = append(pos, src.docPosOrig[lid]) + } else { + pos = append(pos, src.docPosSorted[i+1]) // +1 for system document + } + } + yield(ids, pos) + } +} + +// BlocksOffsets returns document block offsets. +func (src *ActiveSealingSource) BlocksOffsets() []uint64 { + return src.blocksOffsets +} + +// TokenLIDs returns an iterator for LID lists for each token. +// LIDs are converted to new numbering after document sorting. +// Each iterator call returns a list of documents containing a specific token, +// in sorted order. +func (src *ActiveSealingSource) TokenLIDs() iter.Seq[[]uint32] { + return func(yield func([]uint32) bool) { + newLIDs := []uint32{} + + // For each sorted TID + for _, tid := range src.tids { + // Get original LIDs for this token + oldLIDs := src.lids[tid].GetLIDs(src.mids, src.rids) + newLIDs = slices.Grow(newLIDs[:0], len(oldLIDs)) + + // Convert old LIDs to new through mapping + for _, lid := range oldLIDs { + newLIDs = append(newLIDs, src.oldToNewLIDs[lid]) + } + + if !yield(newLIDs) { + return + } + } + } +} + +// makeInverser creates an array for converting old LIDs to new ones. +// sortedLIDs[i] = oldLID -> inverser[oldLID] = i+1 +func makeInverser(sortedLIDs []uint32) []uint32 { + inverser := make([]uint32, len(sortedLIDs)+1) + for i, lid := range sortedLIDs { + inverser[lid] = uint32(i + 1) // +1 because 0 position is reserved and unused + } + return inverser +} + +// Docs returns an iterator for documents with their IDs. +// Handles duplicate IDs (for nested indexes). +func (src *ActiveSealingSource) Docs() iter.Seq2[seq.ID, []byte] { + src.lastErr = nil + return func(yield func(seq.ID, []byte) bool) { + var ( + prev seq.ID + curDoc []byte + ) + + // Iterate through ID and position blocks + for ids, pos := range src.IDsBlocks(consts.IDsPerBlock) { + for i, id := range ids { + if id == systemSeqID { + curDoc = nil // reserved system document (no payload) + } else if id != prev { + // If ID changed, read new document + if curDoc, src.lastErr = src.doc(pos[i]); src.lastErr != nil { + return + } + } + prev = id + if !yield(id, curDoc) { + return + } + } + } + } +} + +// doc reads a document from storage by its position. +func (src *ActiveSealingSource) doc(pos seq.DocPos) ([]byte, error) { + blockIndex, docOffset := pos.Unpack() + blockOffset := src.blocksOffsets[blockIndex] + + var doc []byte + err := src.docsReader.ReadDocsFunc(blockOffset, []uint64{docOffset}, func(b []byte) error { + doc = b + return nil + }) + if err != nil { + return nil, err + } + return doc, nil +} + +// SortDocs sorts documents and writes them in compressed form to disk. +// Creates a temporary file that is then renamed to the final one. +func (src *ActiveSealingSource) SortDocs() error { + start := time.Now() + logger.Info("sorting docs...") + + // Create temporary file for sorted documents + sdocsFile, err := os.Create(src.info.Path + consts.SdocsTmpFileSuffix) + if err != nil { + return err + } + + bw := bytespool.AcquireWriterSize(sdocsFile, int(units.MiB)) + defer bytespool.ReleaseWriter(bw) + + // Group documents into blocks + blocks := docBlocks(src.Docs(), src.params.DocBlockSize) + + // Write blocks and get new offsets and positions + blocksOffsets, positions, err := src.writeDocs(blocks, bw) + + if err := util.CollapseErrors([]error{src.lastErr, err}); err != nil { + return err + } + if err := bw.Flush(); err != nil { + return err + } + + src.docPosSorted = positions + src.blocksOffsets = blocksOffsets + + // Get file statistics + stat, err := sdocsFile.Stat() + if err != nil { + return err + } + src.info.DocsOnDisk = uint64(stat.Size()) + + // Synchronize and rename file + if err := sdocsFile.Sync(); err != nil { + return err + } + if err := sdocsFile.Close(); err != nil { + return err + } + if err := os.Rename(sdocsFile.Name(), src.info.Path+consts.SdocsFileSuffix); err != nil { + return err + } + if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil { + return err + } + + // Log compression statistics + ratio := float64(src.info.DocsRaw) / float64(src.info.DocsOnDisk) + logger.Info("docs sorting stat", + util.ZapUint64AsSizeStr("raw", src.info.DocsRaw), + util.ZapUint64AsSizeStr("compressed", src.info.DocsOnDisk), + util.ZapFloat64WithPrec("ratio", ratio, 2), + zap.Int("blocks_count", len(blocksOffsets)), + zap.Int("docs_total", len(positions)), + util.ZapDurationWithPrec("write_duration_ms", time.Since(start), "ms", 0), + ) + + return nil +} + +// writeDocs compresses and writes document blocks, calculating new offsets +// and collecting document positions. +func (src *ActiveSealingSource) writeDocs(blocks iter.Seq2[[]byte, []seq.DocPos], w io.Writer) ([]uint64, []seq.DocPos, error) { + offset := 0 + buf := make([]byte, 0) + blocksOffsets := make([]uint64, 0) + allPositions := make([]seq.DocPos, 0, len(src.mids.vals)) + + // Process each document block + for block, positions := range blocks { + allPositions = append(allPositions, positions...) + blocksOffsets = append(blocksOffsets, uint64(offset)) + + // Compress document block + buf = storage.CompressDocBlock(block, buf[:0], src.params.DocBlocksZstdLevel) + if _, err := w.Write(buf); err != nil { + return nil, nil, err + } + offset += len(buf) + } + return blocksOffsets, allPositions, nil +} + +// docBlocks groups documents into fixed-size blocks. +// Returns an iterator for blocks and corresponding document positions. +func docBlocks(docs iter.Seq2[seq.ID, []byte], blockSize int) iter.Seq2[[]byte, []seq.DocPos] { + return func(yield func([]byte, []seq.DocPos) bool) { + var ( + prev seq.ID + index uint32 // Current block index + ) + pos := make([]seq.DocPos, 0) + buf := make([]byte, 0, blockSize) + + // Iterate through documents + for id, doc := range docs { + if id == prev { + // Duplicate IDs (for nested indexes) - store document once, + // but create positions for each LID + pos = append(pos, seq.PackDocPos(index, uint64(len(buf)))) + continue + } + prev = id + + // If block is full, yield it + if len(buf) >= blockSize { + if !yield(buf, pos) { + return + } + index++ + buf = buf[:0] + pos = pos[:0] + } + + // Add document position + pos = append(pos, seq.PackDocPos(index, uint64(len(buf)))) + + // Write document size and the document itself + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(doc))) + buf = append(buf, doc...) + } + yield(buf, pos) + } +} diff --git a/frac/info.go b/frac/common/info.go similarity index 96% rename from frac/info.go rename to frac/common/info.go index 7d1d18c4..ad3c3a57 100644 --- a/frac/info.go +++ b/frac/common/info.go @@ -1,4 +1,4 @@ -package frac +package common import ( "fmt" @@ -70,12 +70,12 @@ func (s *Info) Name() string { return path.Base(s.Path) } -func (s *Info) BuildDistribution(ids []seq.ID) { +func (s *Info) BuildDistribution(mids []uint64) { if !s.InitEmptyDistribution() { return } - for _, id := range ids { - s.Distribution.Add(id.MID) + for _, mid := range mids { + s.Distribution.Add(seq.MID(mid)) } } diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go new file mode 100644 index 00000000..c19365f9 --- /dev/null +++ b/frac/common/seal_params.go @@ -0,0 +1,12 @@ +package common + +type SealParams struct { + IDsZstdLevel int + LIDsZstdLevel int + TokenListZstdLevel int + DocsPositionsZstdLevel int + TokenTableZstdLevel int + + DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. + DocBlockSize int // DocBlockSize is decompressed payload size of document block. +} diff --git a/frac/disk_blocks.go b/frac/disk_blocks.go deleted file mode 100644 index ac0974bd..00000000 --- a/frac/disk_blocks.go +++ /dev/null @@ -1,62 +0,0 @@ -package frac - -import ( - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/seq" -) - -type idsBlock struct { - mids seqids.BlockMIDs - rids seqids.BlockRIDs - params seqids.BlockParams -} - -func (b idsBlock) GetMinID() seq.ID { - last := len(b.mids.Values) - 1 - return seq.ID{ - MID: seq.MID(b.mids.Values[last]), - RID: seq.RID(b.rids.Values[last]), - } -} - -func (b idsBlock) GetExtForRegistry() (uint64, uint64) { - last := b.GetMinID() - return uint64(last.MID), uint64(last.RID) -} - -type lidsBlock struct { - payload lids.Block - minTID uint32 - maxTID uint32 - isContinued bool -} - -func (e lidsBlock) getExtForRegistry() (uint64, uint64) { - var ext1, ext2 uint64 - if e.isContinued { - ext1 = 1 - } - ext2 = uint64(e.maxTID)<<32 | uint64(e.minTID) - return ext1, ext2 -} - -type tokensBlock struct { - field string - isStartOfField bool - totalSizeOfField int - startTID uint32 - payload token.Block -} - -func (t *tokensBlock) createTokenTableEntry(startIndex, blockIndex uint32) *token.TableEntry { - size := len(t.payload.Offsets) - return &token.TableEntry{ - StartIndex: startIndex, - StartTID: t.startTID, - ValCount: uint32(size), - BlockIndex: blockIndex, - MaxVal: string(t.payload.GetToken(size - 1)), - } -} diff --git a/frac/disk_blocks_producer.go b/frac/disk_blocks_producer.go deleted file mode 100644 index 3c8b0036..00000000 --- a/frac/disk_blocks_producer.go +++ /dev/null @@ -1,265 +0,0 @@ -package frac - -import ( - "bytes" - "encoding/binary" - "sort" - - "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/seq" -) - -type DiskBlocksProducer struct { - sortedTids map[string][]uint32 - fields []string -} - -func NewDiskBlocksProducer() *DiskBlocksProducer { - return &DiskBlocksProducer{ - sortedTids: make(map[string][]uint32), - } -} - -func (g *DiskBlocksProducer) getInfoBlock(info *Info) *BlockInfo { - return &BlockInfo{Info: info} -} - -func (g *DiskBlocksProducer) getPositionBlock(idsLen uint32, blocks []uint64) *BlockOffsets { - return &BlockOffsets{ - IDsTotal: idsLen, - Offsets: blocks, - } -} - -func (g *DiskBlocksProducer) getTokenTableBlocksGenerator(tokenList *TokenList, tokenTable token.Table) func(func(token.TableBlock) error) error { - return func(push func(token.TableBlock) error) error { - for _, field := range g.getFracSortedFields(tokenList) { - if fieldData, ok := tokenTable[field]; ok { - block := token.TableBlock{ - FieldsTables: []token.FieldTable{{ - Field: field, - Entries: fieldData.Entries, - }}, - } - if err := push(block); err != nil { - return err - } - } - } - return nil - } -} - -func (g *DiskBlocksProducer) getIDsBlocksGenerator(sortedSeqIDs []seq.ID, docsPositions *DocsPositions, size int) func(func(*idsBlock) error) error { - var block idsBlock - return func(push func(*idsBlock) error) error { - for len(sortedSeqIDs) > 0 { - right := min(size, len(sortedSeqIDs)) - g.fillIDsBlock(sortedSeqIDs[:right], docsPositions, &block) - if err := push(&block); err != nil { - return nil - } - sortedSeqIDs = sortedSeqIDs[right:] - } - - return nil - } -} - -func (g *DiskBlocksProducer) fillIDsBlock(ids []seq.ID, positions *DocsPositions, block *idsBlock) { - block.mids.Values = block.mids.Values[:0] - block.rids.Values = block.rids.Values[:0] - block.params.Values = block.params.Values[:0] - for _, id := range ids { - block.mids.Values = append(block.mids.Values, uint64(id.MID)) - block.rids.Values = append(block.rids.Values, uint64(id.RID)) - block.params.Values = append(block.params.Values, uint64(positions.Get(id))) - } -} - -func (g *DiskBlocksProducer) getFracSortedFields(tokenList *TokenList) []string { - if g.fields == nil { - g.fields = make([]string, 0, len(tokenList.FieldTIDs)) - for field := range tokenList.FieldTIDs { - g.fields = append(g.fields, field) - } - sort.Strings(g.fields) - } - return g.fields -} - -type valSort struct { - val []uint32 - lessFn func(i, j int) bool -} - -func (p *valSort) Len() int { return len(p.val) } -func (p *valSort) Less(i, j int) bool { return p.lessFn(i, j) } -func (p *valSort) Swap(i, j int) { p.val[i], p.val[j] = p.val[j], p.val[i] } - -func (g *DiskBlocksProducer) getTIDsSortedByToken(tokenList *TokenList, field string) []uint32 { - if tids, ok := g.sortedTids[field]; ok { - return tids - } - - srcTIDs := tokenList.FieldTIDs[field] - tids := append(make([]uint32, 0, len(srcTIDs)), srcTIDs...) - - sort.Sort( - &valSort{ - val: tids, - lessFn: func(i int, j int) bool { - a := tokenList.tidToVal[tids[i]] - b := tokenList.tidToVal[tids[j]] - return bytes.Compare(a, b) < 0 - }, - }, - ) - - g.sortedTids[field] = tids - return tids -} - -func (g *DiskBlocksProducer) getTokensBlocksGenerator(tokenList *TokenList) func(func(*tokensBlock) error) error { - return func(push func(*tokensBlock) error) error { - var cur uint32 = 1 - var payload token.Block - - fieldSizes := tokenList.GetFieldSizes() - - for _, field := range g.getFracSortedFields(tokenList) { - first := true - fieldSize := int(fieldSizes[field]) - blocksCount := fieldSize/consts.RegularBlockSize + 1 - - tids := g.getTIDsSortedByToken(tokenList, field) - blockSize := len(tids) / blocksCount - - for len(tids) > 0 { - right := min(blockSize, len(tids)) - g.fillTokens(tokenList, tids[:right], &payload) - tids = tids[right:] - - block := tokensBlock{ - field: field, - isStartOfField: first, - totalSizeOfField: fieldSize, - startTID: cur, - payload: payload, - } - - if err := push(&block); err != nil { - return err - } - - first = false - cur += uint32(right) - } - } - return nil - } -} - -func (g *DiskBlocksProducer) fillTokens(tokenList *TokenList, tids []uint32, block *token.Block) { - block.Payload = block.Payload[:0] - block.Offsets = block.Offsets[:0] - for _, tid := range tids { - val := tokenList.tidToVal[tid] - block.Offsets = append(block.Offsets, uint32(len(block.Payload))) - block.Payload = binary.LittleEndian.AppendUint32(block.Payload, uint32(len(val))) - block.Payload = append(block.Payload, val...) - } -} - -func (g *DiskBlocksProducer) getLIDsBlockGenerator(tokenList *TokenList, oldToNewLIDsIndex []uint32, mids, rids *UInt64s, maxBlockSize int) func(func(*lidsBlock) error) error { - var maxTID, lastMaxTID uint32 - - isContinued := false - offsets := []uint32{0} // first offset is always zero - blockLIDs := make([]uint32, 0, maxBlockSize) - - newBlockFn := func(isLastLID bool) *lidsBlock { - block := &lidsBlock{ - // for continued block we will have minTID > maxTID - // this is not a bug, everything is according to plan for now - // TODO: But in future we want to get rid of this - minTID: lastMaxTID + 1, - maxTID: maxTID, - isContinued: isContinued, - payload: lids.Block{ - LIDs: reassignLIDs(blockLIDs, oldToNewLIDsIndex), - Offsets: offsets, - IsLastLID: isLastLID, - }, - } - lastMaxTID = maxTID - isContinued = !isLastLID - - // reset for reuse - offsets = offsets[:1] // keep the first offset, which is always zero - blockLIDs = blockLIDs[:0] - - return block - } - - return func(push func(*lidsBlock) error) error { - for _, field := range g.getFracSortedFields(tokenList) { - for _, tid := range g.getTIDsSortedByToken(tokenList, field) { - maxTID++ - tokenLIDs := tokenList.Provide(tid).GetLIDs(mids, rids) - - for len(tokenLIDs) > 0 { - right := min(maxBlockSize-len(blockLIDs), len(tokenLIDs)) - blockLIDs = append(blockLIDs, tokenLIDs[:right]...) - offsets = append(offsets, uint32(len(blockLIDs))) - tokenLIDs = tokenLIDs[right:] - - if len(blockLIDs) == maxBlockSize { - if err := push(newBlockFn(len(tokenLIDs) == 0)); err != nil { - return nil - } - } - } - } - - if len(blockLIDs) > 0 { - if err := push(newBlockFn(true)); err != nil { - return nil - } - } - } - return nil - } -} - -func reassignLIDs(lIDs, oldToNewLIDsIndex []uint32) []uint32 { - for i, lid := range lIDs { - lIDs[i] = oldToNewLIDsIndex[lid] - } - return lIDs -} - -func sortSeqIDs(f *Active, mids, rids []uint64) ([]seq.ID, []uint32) { - seqIDs := make([]seq.ID, len(mids)) - index := make([]uint32, len(mids)) - - // some stub value in zero position - seqIDs[0] = seq.ID{ - MID: seq.MID(mids[0]), - RID: seq.RID(rids[0]), - } - - subSeqIDs := seqIDs[1:] - - for i, lid := range f.GetAllDocuments() { - subSeqIDs[i] = seq.ID{ - MID: seq.MID(mids[lid]), - RID: seq.RID(rids[lid]), - } - index[lid] = uint32(i + 1) - } - - return seqIDs, index -} diff --git a/frac/disk_blocks_writer.go b/frac/disk_blocks_writer.go deleted file mode 100644 index f1eb9c3f..00000000 --- a/frac/disk_blocks_writer.go +++ /dev/null @@ -1,232 +0,0 @@ -package frac - -import ( - "io" - "time" - - "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/storage" - "github.com/ozontech/seq-db/util" -) - -type DiskBlocksWriter struct { - buf []byte - writer *storage.BlocksWriter - stats storage.SealingStats - - startOfIDsBlockIndex uint32 -} - -func NewSealedBlockWriter(ws io.WriteSeeker) *DiskBlocksWriter { - return &DiskBlocksWriter{ - writer: storage.NewBlocksWriter(ws), - stats: make(storage.SealingStats, 0), - } -} - -func (w *DiskBlocksWriter) resetBuf(size int) []byte { - w.buf = util.EnsureSliceSize(w.buf, size)[:0] - return w.buf -} - -func (w *DiskBlocksWriter) NewBlockFormer(name string, size int) *storage.BlockFormer { - return storage.NewBlockFormer(name, w.writer, size, w.resetBuf(size)) -} - -func (w *DiskBlocksWriter) writeInfoBlock(block *BlockInfo) error { - now := time.Now() - w.buf = block.Pack(w.resetBuf(consts.RegularBlockSize)) - n, err := w.writer.WriteBlock("info", w.buf, false, 0, 0, 0) - if err != nil { - return err - } - - w.stats = append(w.stats, &storage.BlockStats{ - Name: "info", - Raw: uint64(len(w.buf)), - Comp: uint64(n), - Blocks: 1, - Duration: time.Since(now), - }) - - return nil -} - -func (w *DiskBlocksWriter) writePositionsBlock(zstdCompressLevel int, block *BlockOffsets) error { - now := time.Now() - w.buf = block.Pack(w.resetBuf(consts.IDsBlockSize)) - n, err := w.writer.WriteBlock("positions", w.buf, true, zstdCompressLevel, 0, 0) - if err != nil { - return err - } - - w.stats = append(w.stats, &storage.BlockStats{ - Name: "positions", - Raw: uint64(len(w.buf)), - Comp: uint64(n), - Blocks: 1, - Duration: time.Since(now), - }) - - return nil -} - -func (w *DiskBlocksWriter) writeIDsBlocks(zstdLevel int, generateBlocks func(func(*idsBlock) error) error) ([]seq.ID, error) { - w.startOfIDsBlockIndex = w.writer.GetBlockIndex() - - levelOpt := storage.WithZstdCompressLevel(zstdLevel) - - former := w.NewBlockFormer("ids", consts.IDsBlockSize) - - minBlockIDs := make([]seq.ID, 0) - - push := func(block *idsBlock) error { - former.Buf = block.mids.Pack(former.Buf) - if err := former.FlushForced(storage.WithExt(block.GetExtForRegistry()), levelOpt); err != nil { - return err - } - - former.Buf = block.rids.Pack(former.Buf) - if err := former.FlushForced(levelOpt); err != nil { - return err - } - - former.Buf = block.params.Pack(former.Buf) - if err := former.FlushForced(levelOpt); err != nil { - return err - } - - minBlockIDs = append(minBlockIDs, block.GetMinID()) - return nil - } - - if err := generateBlocks(push); err != nil { - return nil, err - } - - w.writer.WriteEmptyBlock() - - w.stats = append(w.stats, former.GetStats()) - - return minBlockIDs, nil -} - -func (w *DiskBlocksWriter) writeTokensBlocks(zstdCompressLevel int, generateBlocks func(func(*tokensBlock) error) error) (token.Table, error) { - var startIndex uint32 - tokenTable := make(token.Table) - - opts := []storage.FlushOption{storage.WithZstdCompressLevel(zstdCompressLevel)} - - former := w.NewBlockFormer("tokens", consts.RegularBlockSize) - - push := func(block *tokensBlock) error { - if block.isStartOfField && block.totalSizeOfField > consts.RegularBlockSize { - if err := former.FlushForced(opts...); err != nil { - return err - } - startIndex = 0 - } - - tokenTableEntry := block.createTokenTableEntry(startIndex, w.writer.GetBlockIndex()) - fieldData, ok := tokenTable[block.field] - if !ok { - minVal := string(block.payload.GetToken(0)) - fieldData = &token.FieldData{ - MinVal: minVal, - } - tokenTable[block.field] = fieldData - tokenTableEntry.MinVal = minVal - } - fieldData.Entries = append(fieldData.Entries, tokenTableEntry) - - former.Buf = block.payload.Pack(former.Buf) - startIndex += uint32(block.payload.Len()) - - if flushed, err := former.FlushIfNeeded(opts...); err != nil { - return err - } else if flushed { - startIndex = 0 - } - return nil - } - - if err := generateBlocks(push); err != nil { - return nil, err - } - - if err := former.FlushForced(opts...); err != nil { - return nil, err - } - - w.writer.WriteEmptyBlock() - - w.stats = append(w.stats, former.GetStats()) - - return tokenTable, nil -} - -func (w *DiskBlocksWriter) writeTokenTableBlocks(zstdCompressLevel int, generateBlocks func(func(token.TableBlock) error) error) error { - former := w.NewBlockFormer("token_table", consts.RegularBlockSize) - - opts := []storage.FlushOption{storage.WithZstdCompressLevel(zstdCompressLevel)} - - push := func(block token.TableBlock) error { - former.Buf = block.Pack(former.Buf) - if _, err := former.FlushIfNeeded(opts...); err != nil { - return err - } - return nil - } - - if err := generateBlocks(push); err != nil { - return err - } - - if err := former.FlushForced(opts...); err != nil { - return err - } - - w.writer.WriteEmptyBlock() - - w.stats = append(w.stats, former.GetStats()) - - return nil -} - -func (w *DiskBlocksWriter) writeLIDsBlocks(zstdCompressLevel int, generateBlocks func(func(*lidsBlock) error) error) (*lids.Table, error) { - lidsTable := lids.NewTable(w.writer.GetBlockIndex(), nil, nil, nil) - - former := w.NewBlockFormer("lids", consts.RegularBlockSize) - - levelOpt := storage.WithZstdCompressLevel(zstdCompressLevel) - - push := func(block *lidsBlock) error { - former.Buf = block.payload.Pack(former.Buf) - if err := former.FlushForced(storage.WithExt(block.getExtForRegistry()), levelOpt); err != nil { - return err - } - - lidsTable.MinTIDs = append(lidsTable.MinTIDs, block.minTID) - lidsTable.MaxTIDs = append(lidsTable.MaxTIDs, block.maxTID) - lidsTable.IsContinued = append(lidsTable.IsContinued, block.isContinued) - - return nil - } - - if err := generateBlocks(push); err != nil { - return nil, err - } - - w.writer.WriteEmptyBlock() - - w.stats = append(w.stats, former.GetStats()) - - return lidsTable, nil -} - -func (w *DiskBlocksWriter) WriteRegistryBlock() error { - return w.writer.WriteBlocksRegistry() -} diff --git a/frac/fraction.go b/frac/fraction.go index 8c5ce662..128c9f05 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/seq" @@ -21,7 +22,7 @@ type DataProvider interface { } type Fraction interface { - Info() *Info + Info() *common.Info IsIntersecting(from seq.MID, to seq.MID) bool Contains(mid seq.MID) bool DataProvider(context.Context) (DataProvider, func()) diff --git a/frac/remote.go b/frac/remote.go index 85714c68..700bb34e 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -10,6 +10,8 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" @@ -37,7 +39,7 @@ type Remote struct { BaseFileName string - info *Info + info *common.Info useMu sync.RWMutex suicided bool @@ -50,9 +52,9 @@ type Remote struct { indexCache *IndexCache indexReader storage.IndexReader - loadMu *sync.RWMutex - isLoaded bool - state sealedState + loadMu *sync.RWMutex + isLoaded bool + blocksData sealed.BlocksData s3cli *s3.Client readLimiter *storage.ReadLimiter @@ -64,7 +66,7 @@ func NewRemote( readLimiter *storage.ReadLimiter, indexCache *IndexCache, docsCache *cache.Cache[[]byte], - info *Info, + info *common.Info, config *Config, s3cli *s3.Client, ) *Remote { @@ -145,7 +147,7 @@ func (f *Remote) DataProvider(ctx context.Context) (DataProvider, func()) { } } -func (f *Remote) Info() *Info { +func (f *Remote) Info() *common.Info { return f.info } @@ -195,19 +197,19 @@ func (f *Remote) createDataProvider(ctx context.Context) *sealedDataProvider { info: f.info, config: f.Config, docsReader: &f.docsReader, - blocksOffsets: f.state.BlocksOffsets, - lidsTable: f.state.lidsTable, + blocksOffsets: f.blocksData.BlocksOffsets, + lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), - idsTable: &f.state.idsTable, + idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( &f.indexReader, f.indexCache.MIDs, f.indexCache.RIDs, f.indexCache.Params, - &f.state.idsTable, + &f.blocksData.IDsTable, f.info.BinaryDataVer, ), } @@ -229,7 +231,7 @@ func (f *Remote) load() error { return err } - (&Loader{}).Load(&f.state, f.info, &f.indexReader) + (&Loader{}).Load(&f.blocksData, f.info, &f.indexReader) f.isLoaded = true return nil diff --git a/frac/seal_stats.go b/frac/seal_stats.go deleted file mode 100644 index 6b8bdea7..00000000 --- a/frac/seal_stats.go +++ /dev/null @@ -1 +0,0 @@ -package frac diff --git a/frac/sealed.go b/frac/sealed.go index dc838dee..b2fe3b89 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -12,6 +12,8 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" @@ -31,7 +33,7 @@ type Sealed struct { BaseFileName string - info *Info + info *common.Info useMu sync.RWMutex suicided bool @@ -44,9 +46,9 @@ type Sealed struct { indexCache *IndexCache indexReader storage.IndexReader - loadMu *sync.RWMutex - isLoaded bool - state sealedState + loadMu *sync.RWMutex + isLoaded bool + blocksData sealed.BlocksData readLimiter *storage.ReadLimiter @@ -54,12 +56,6 @@ type Sealed struct { PartialSuicideMode PSD } -type sealedState struct { - idsTable seqids.Table - lidsTable *lids.Table - BlocksOffsets []uint64 -} - type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests const ( @@ -73,7 +69,7 @@ func NewSealed( readLimiter *storage.ReadLimiter, indexCache *IndexCache, docsCache *cache.Cache[[]byte], - info *Info, + info *common.Info, config *Config, ) *Sealed { f := &Sealed{ @@ -116,68 +112,51 @@ func (f *Sealed) openIndex() { func (f *Sealed) openDocs() { if f.docsFile == nil { var err error - f.docsFile, err = os.Open(f.BaseFileName + consts.DocsFileSuffix) + f.docsFile, err = os.Open(f.BaseFileName + consts.SdocsFileSuffix) // try first open *.sdocs file if err != nil { if !errors.Is(err, os.ErrNotExist) { - logger.Fatal("can't open docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + logger.Fatal("can't open sdocs file", zap.String("frac", f.BaseFileName), zap.Error(err)) } - f.docsFile, err = os.Open(f.BaseFileName + consts.SdocsFileSuffix) + f.docsFile, err = os.Open(f.BaseFileName + consts.DocsFileSuffix) // fallback to *.docs file if err != nil { - logger.Fatal("can't open sdocs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + logger.Fatal("can't open docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) } } f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } } -type PreloadedData struct { - info *Info - idsTable seqids.Table - lidsTable *lids.Table - tokenTable token.Table - blocksOffsets []uint64 - indexFile *os.File - docsFile *os.File -} - func NewSealedPreloaded( baseFile string, - preloaded *PreloadedData, + preloaded *sealed.PreloadedData, rl *storage.ReadLimiter, indexCache *IndexCache, docsCache *cache.Cache[[]byte], config *Config, ) *Sealed { f := &Sealed{ - state: sealedState{ - idsTable: preloaded.idsTable, - lidsTable: preloaded.lidsTable, - BlocksOffsets: preloaded.blocksOffsets, - }, - - docsFile: preloaded.docsFile, + blocksData: preloaded.BlocksData, docsCache: docsCache, - docsReader: storage.NewDocsReader(rl, preloaded.docsFile, docsCache), - - indexFile: preloaded.indexFile, - indexCache: indexCache, - indexReader: storage.NewIndexReader(rl, preloaded.indexFile.Name(), preloaded.indexFile, indexCache.Registry), + indexCache: indexCache, loadMu: &sync.RWMutex{}, isLoaded: true, readLimiter: rl, - info: preloaded.info, + info: preloaded.Info, BaseFileName: baseFile, Config: config, } // put the token table built during sealing into the cache of the sealed fraction indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) { - return preloaded.tokenTable, preloaded.tokenTable.Size() + return preloaded.TokenTable, preloaded.TokenTable.Size() }) + f.openDocs() + f.openIndex() + docsCountK := float64(f.info.DocsTotal) / 1000 logger.Info("sealed fraction created from active", zap.String("frac", f.info.Name()), @@ -201,7 +180,7 @@ func (f *Sealed) load() { f.openDocs() f.openIndex() - (&Loader{}).Load(&f.state, f.info, &f.indexReader) + (&Loader{}).Load(&f.blocksData, f.info, &f.indexReader) f.isLoaded = true } } @@ -379,25 +358,25 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { info: f.info, config: f.Config, docsReader: &f.docsReader, - blocksOffsets: f.state.BlocksOffsets, - lidsTable: f.state.lidsTable, + blocksOffsets: f.blocksData.BlocksOffsets, + lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), - idsTable: &f.state.idsTable, + idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( &f.indexReader, f.indexCache.MIDs, f.indexCache.RIDs, f.indexCache.Params, - &f.state.idsTable, + &f.blocksData.IDsTable, f.info.BinaryDataVer, ), } } -func (f *Sealed) Info() *Info { +func (f *Sealed) Info() *common.Info { return f.info } @@ -412,7 +391,7 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool { func loadHeader( indexFile storage.ImmutableFile, indexReader storage.IndexReader, -) *Info { +) *common.Info { block, _, err := indexReader.ReadIndexBlock(0, nil) if err != nil { logger.Fatal( @@ -422,7 +401,7 @@ func loadHeader( ) } - var bi BlockInfo + var bi sealed.BlockInfo if err := bi.Unpack(block); err != nil { logger.Fatal( "error unpacking info block", diff --git a/frac/block_info.go b/frac/sealed/block_info.go similarity index 88% rename from frac/block_info.go rename to frac/sealed/block_info.go index dee65101..8436f91e 100644 --- a/frac/block_info.go +++ b/frac/sealed/block_info.go @@ -1,4 +1,4 @@ -package frac +package sealed import ( "encoding/json" @@ -6,13 +6,14 @@ import ( "go.uber.org/zap" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/logger" ) const seqDBMagic = "SEQM" type BlockInfo struct { - Info *Info + Info *common.Info } func (b *BlockInfo) Pack(buf []byte) []byte { @@ -32,7 +33,7 @@ func (b *BlockInfo) Unpack(data []byte) error { return errors.New("seq-db index file header corrupted") } - b.Info = &Info{} + b.Info = &common.Info{} if err := json.Unmarshal(data[4:], b.Info); err != nil { return errors.New("stats unmarshaling error") } diff --git a/frac/block_offsets.go b/frac/sealed/block_offsets.go similarity index 98% rename from frac/block_offsets.go rename to frac/sealed/block_offsets.go index c1737906..2be59942 100644 --- a/frac/block_offsets.go +++ b/frac/sealed/block_offsets.go @@ -1,4 +1,4 @@ -package frac +package sealed import ( "encoding/binary" diff --git a/frac/sealed/preloaded_data.go b/frac/sealed/preloaded_data.go new file mode 100644 index 00000000..1b43b865 --- /dev/null +++ b/frac/sealed/preloaded_data.go @@ -0,0 +1,20 @@ +package sealed + +import ( + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" +) + +type PreloadedData struct { + Info *common.Info + BlocksData BlocksData + TokenTable token.Table +} + +type BlocksData struct { + IDsTable seqids.Table + LIDsTable *lids.Table + BlocksOffsets []uint64 +} diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go new file mode 100644 index 00000000..14a5cac7 --- /dev/null +++ b/frac/sealed/sealing/blocks_builder.go @@ -0,0 +1,306 @@ +package sealing + +import ( + "encoding/binary" + "errors" + "iter" + + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/seq" +) + +// tokensExt represents the token ID range contained in a block. +type tokensExt struct { + minTID uint32 // First token ID in the block + maxTID uint32 // Last token ID in the block +} + +// tokensSealBlock represents a sealed block containing token data with metadata. +type tokensSealBlock struct { + ext tokensExt // Tokens block metadata for registry marking + payload token.Block // Actual token data payload +} + +// lidsExt represents the range and continuation status of LID blocks. +type lidsExt struct { + minTID uint32 // First token ID in the LID block + maxTID uint32 // Last token ID in the LID block + isContinued bool // Whether LID sequence continues in next block +} + +// lidsSealBlock represents a sealed block containing LID (Local ID) data. +type lidsSealBlock struct { + ext lidsExt // LIDs block metadata for registry marking + payload lids.Block // LID data payload +} + +// idsSealBlock represents a sealed block containing various identifier types. +type idsSealBlock struct { + mids seqids.BlockMIDs + rids seqids.BlockRIDs + params seqids.BlockParams +} + +// blocksBuilder constructs sealed blocks from various data sources. +// Provides error tracking and consistency validation during block construction. +type blocksBuilder struct { + lastErr error // Last error encountered during processing +} + +// LastError returns the last error encountered during block processing. +func (bb *blocksBuilder) LastError() error { + return bb.lastErr +} + +// BuildTokenBlocks converts token batches into token blocks with field tables. The function creates an iterator +// that returns token blocks and corresponding field tables describing which fields are covered by which tokens +// in the block. +// +// Visualization of relationships between fields, tokens, and table entries: +// +// Field Ranges: <-------f1----------><------f2-------><------------f3------------><----------f4----------> +// Token Blocks: [.t1.t2.t3.t4.][.t5.t6.t7.t8.][.t9....etc...][.............][.............][.............] +// Field Entries: {-----f1------}{-f1-}{---f2--}{--f2--}{-f3--}{------f3-----}{-f3-}{----f4-}{-----f4------} +// +// So we split field ranges into field entries - sub-ranges of fields aligned to block boundaries. +// Each field table (token.FieldTable) links a field to a blocks and token ranges inside the blocks. +// +// Parameters: +// - tokenBatches: Iterator of token batches, where each batch becomes a separate block +// - fields: Iterator of [fieldName, maxTID] pairs for all fields in ascending TID order +// +// Returns: Iterator of [token block, field table for block] pairs, where field table contains +// information about which fields and their ranges are represented in this block. +func (bb *blocksBuilder) BuildTokenBlocks( + tokenBatches iter.Seq[[][]byte], + fields iter.Seq2[string, uint32], +) iter.Seq2[tokensSealBlock, []token.FieldTable] { + return func(yield func(tokensSealBlock, []token.FieldTable) bool) { + // Create pull iterator for fields - convert Seq2 to a function that can be called on demand + getNextField, stop := iter.Pull2(fields) + defer stop() + + var ( + hasMore bool + currentTID uint32 = 1 // Current TID to process + fieldMaxTID uint32 = 0 // Maximum TID of current field (0 = field not yet selected) + fieldName string // Current field name + ) + + // Iterate through all token blocks created from batches + for idx, block := range createTokensSealBlocks(tokenBatches) { + table := []token.FieldTable{} + // Process all TIDs in current block (from currentTID to block.ext.maxTID) + for currentTID <= block.ext.maxTID { + // If current field doesn't cover currentTID, get next field + // This happens when: 1) field not yet selected, 2) current field has ended + if fieldMaxTID < currentTID { + if fieldName, fieldMaxTID, hasMore = getNextField(); !hasMore { + bb.lastErr = errors.New("not enough fields to cover all TIDs") + return + } + } + // Entry covers TIDs from currentTID to min(fieldMaxTID, block.ext.maxTID) + entry := createTokenTableEntry(currentTID, fieldMaxTID, idx, block) + table = append(table, token.FieldTable{Field: fieldName, Entries: []*token.TableEntry{entry}}) + currentTID += entry.ValCount + } + + if !yield(block, table) { + return // Consumer requested stop + } + } + + // Verify consistency + if currentTID-1 != fieldMaxTID { + bb.lastErr = errors.New("fields and tokens not consistent") + } else if _, _, hasMore = getNextField(); hasMore { + bb.lastErr = errors.New("excess field after processing all blocks") + } + } +} + +// createTokenTableEntry creates a token table entry for a field-block span. +// Calculates the range of tokens belonging to a field within a specific block. +// Parameters: +// - entryStartTID: Starting token ID for this entry +// - fieldMaxTID: Maximum token ID for the field +// - blockIndex: Index of the current token block +// - block: Current token block data +func createTokenTableEntry(entryStartTID, fieldMaxTID, blockIndex uint32, block tokensSealBlock) *token.TableEntry { + // Convert global TIDs to block-local indices + firstIndex := entryStartTID - block.ext.minTID + lastIndex := min(fieldMaxTID, block.ext.maxTID) - block.ext.minTID + + // Extract min and max token values for the entry range + minVal := string(block.payload.GetToken(int(firstIndex))) + maxVal := string(block.payload.GetToken(int(lastIndex))) + + return &token.TableEntry{ + StartIndex: firstIndex, // Starting index within the block + StartTID: entryStartTID, // Starting token ID (global) + BlockIndex: blockIndex, // Reference to containing block + ValCount: lastIndex - firstIndex + 1, // Number of tokens in this entry + MinVal: minVal, // Smallest token value in range + MaxVal: maxVal, // Largest token value in range + } +} + +// BuildLIDsBlocks constructs LID blocks from Token LID sequences. +// Processes LIDs grouped by TID and creates optimally sized blocks: +// - Splits large LID sequences across multiple blocks +// - Tracks continuation status between blocks +// +// Parameters: +// - tokenLIDs: Sequence of LID arrays, one per TokenID, in TID order +// - blockCapacity: Maximum number of LIDs per block +// +// Returns: +// - iter.Seq[lidsSealBlock]: Sequence of sealed LID blocks +func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapacity int) iter.Seq[lidsSealBlock] { + return func(yield func(lidsSealBlock) bool) { + if blockCapacity <= 0 { + bb.lastErr = errors.New("sealing: LID block size must be > 0") + return + } + var ( + currentTID uint32 // Current TID being processed + currentBlock lidsSealBlock // Current block under construction + isEndOfToken bool // Flag for end of current token's LIDs + isContinued bool // Flag for block continuation + ) + + // Initialize first block + currentBlock.ext.minTID = 1 + currentBlock.payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), // Pre-allocate with capacity + Offsets: []uint32{0}, // Start with initial offset + } + + // finalizeBlock prepares and yields the current block + finalizeBlock := func() bool { + if !isEndOfToken { + // Add final offset for current token if not already done + currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) + } + currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field + currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field + isContinued = !isEndOfToken + return yield(currentBlock) + } + + // Process LIDs for each TID + for lidsBatch := range tokenLIDs { + currentTID++ + + for _, lid := range lidsBatch { + // Check if block reached capacity + if len(currentBlock.payload.LIDs) == blockCapacity { + if !finalizeBlock() { + return + } + // Initialize new block + currentBlock.ext.minTID = currentTID + currentBlock.payload.LIDs = currentBlock.payload.LIDs[:0] + currentBlock.payload.Offsets = currentBlock.payload.Offsets[:1] // Reset to initial offset + } + + isEndOfToken = false + currentBlock.ext.maxTID = currentTID + currentBlock.payload.LIDs = append(currentBlock.payload.LIDs, lid) // Add each LID to the block + } + + // Store offset and mark end of current token + currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) + isEndOfToken = true + } + + // Yield the final block + finalizeBlock() + } +} + +// createIDsSealBlocks converts sequences of IDs and positions into sealed ID blocks. +// Transforms raw ID sequences into optimized block format for storage: +// - Processes IDs in batches for efficiency +// - Maintains correlation between IDs and their positions +// - Creates separate slices for MIDs, RIDs, and positions +// +// Parameters: +// - idsBatches: Sequence of ID batches with corresponding document positions +// +// Returns: +// - iter.Seq[idsSealBlock]: Sequence of sealed ID blocks +func createIDsSealBlocks(idsBatches iter.Seq2[[]seq.ID, []seq.DocPos]) iter.Seq[idsSealBlock] { + return func(yield func(idsSealBlock) bool) { + block := idsSealBlock{} + + // Process each batch of IDs and positions + for ids, positions := range idsBatches { + // Reset block arrays for new batch + block.mids.Values = block.mids.Values[:0] + block.rids.Values = block.rids.Values[:0] + block.params.Values = block.params.Values[:0] + + // Convert each ID and position to storage format + for i, id := range ids { + block.mids.Values = append(block.mids.Values, uint64(id.MID)) + block.rids.Values = append(block.rids.Values, uint64(id.RID)) + block.params.Values = append(block.params.Values, uint64(positions[i])) + } + + // Yield completed block + if !yield(block) { + return + } + } + } +} + +// createTokensSealBlocks converts raw token sequences into sealed token blocks. +// Transforms batches of tokens into optimized storage format: +// - Merges a set of byte slices into a contiguous slice Payload and a slice of Offsets +// - Tracks token ID ranges for indexing [MinTID, MaxTID] +// +// Parameters: +// - tokenBatches: Sequence of token batches to process +// +// Returns: +// - iter.Seq[uint32, tokensSealBlock]: Sequence of sealed token blocks with their indexes +func createTokensSealBlocks(tokenBatches iter.Seq[[][]byte]) iter.Seq2[uint32, tokensSealBlock] { + return func(yield func(uint32, tokensSealBlock) bool) { + var ( + idx uint32 // 1-based block index + currentTID uint32 // Current token ID counter + block tokensSealBlock // Current block under construction + ) + + // Process each batch of tokens + for tokens := range tokenBatches { + idx++ + // Initialize new block + block.ext.minTID = currentTID + 1 + block.payload.Payload = block.payload.Payload[:0] + block.payload.Offsets = block.payload.Offsets[:0] + + // Process each token in current batch + for _, tokenData := range tokens { + currentTID++ + // Store offset to current token + block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) + // Store token length (little-endian) followed by token bytes + block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tokenData))) + block.payload.Payload = append(block.payload.Payload, tokenData...) + } + + block.ext.maxTID = currentTID + + // Yield completed block + if !yield(idx, block) { + return + } + } + } +} diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go new file mode 100644 index 00000000..80892ca2 --- /dev/null +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -0,0 +1,395 @@ +package sealing + +import ( + "iter" + "slices" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/seq" +) + +type mockSource struct { + info common.Info + tokens [][]byte + fields []string + fieldMaxTIDs []uint32 + ids []seq.ID + pos []seq.DocPos + tokenLIDs [][]uint32 + blocksOffsets []uint64 + lastError error +} + +func (m *mockSource) Info() common.Info { return m.info } + +func (m *mockSource) Fields() iter.Seq2[string, uint32] { + return func(yield func(string, uint32) bool) { + for i := range len(m.fields) { + if !yield(m.fields[i], m.fieldMaxTIDs[i]) { + return + } + } + } +} + +func (m *mockSource) IDsBlocks(size int) iter.Seq2[[]seq.ID, []seq.DocPos] { + return func(yield func([]seq.ID, []seq.DocPos) bool) { + ids := make([]seq.ID, 0, size) + pos := make([]seq.DocPos, 0, size) + for i, id := range m.ids { + if len(ids) == size { + if !yield(ids, pos) { + return + } + ids = ids[:0] + pos = pos[:0] + } + ids = append(ids, id) + pos = append(pos, m.pos[i]) + } + yield(ids, pos) + } +} + +func (m *mockSource) TokenBlocks(size int) iter.Seq[[][]byte] { + return func(yield func([][]byte) bool) { + block := [][]byte{} + blockSize := 0 + for _, token := range m.tokens { + if blockSize >= size { + if !yield(block) { + return + } + blockSize = 0 + block = block[:0] + } + block = append(block, token) + blockSize += len(token) + 4 + } + yield(block) + } +} + +func (m *mockSource) TokenLIDs() iter.Seq[[]uint32] { + return func(yield func([]uint32) bool) { + for _, lids := range m.tokenLIDs { + if !yield(lids) { + return + } + } + } +} + +func (m *mockSource) BlocksOffsets() []uint64 { return m.blocksOffsets } +func (m *mockSource) LastError() error { return m.lastError } + +func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { + src := mockSource{ + tokens: [][]byte{ + []byte("f1v1"), // 1 + []byte("f1v2"), // 2, max TID for f1 + []byte("f2v1"), // 3 + + []byte("f2v2"), // 4 + []byte("f2v3"), // 5 + []byte("f2v4"), // 6 + + []byte("f2v5"), // 7, max TID for f2 + []byte("f3v1"), // 8 + []byte("f3v2"), // 9, max TID for f3 + + []byte("f4v1"), // 10 + []byte("f4v2"), // 11 + []byte("f4v3"), // 12, max TID for f4 + + []byte("f5v1"), // 13, max TID for f5 + []byte("f6v1"), // 14, max TID for f6 + }, + fields: []string{"f1", "f2", "f3", "f4", "f5", "f6"}, + fieldMaxTIDs: []uint32{2, 7, 9, 12, 13, 14}, + } + + // Block size in bytes. + const blockSize = 24 + + bb := blocksBuilder{} + tokenBlocks := bb.BuildTokenBlocks(src.TokenBlocks(blockSize), src.Fields()) + + // In our test case, each token is 4 bytes long. Also for each token we use uint32 to encode the length. + // So 3 tokens take up exactly 24 bytes. And we expect all token blocks to contain 3 tokens except the last one. + expectedSizes := []int{3, 3, 3, 3, 2} + + tid := 0 + blockIndex := 0 + + allFieldsTables := []token.FieldTable{} + for block, fieldsTables := range tokenBlocks { + assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) + for i := range block.payload.Len() { + tid++ + assert.Equal(t, src.tokens[tid-1], block.payload.GetToken(i)) + } + allFieldsTables = append(allFieldsTables, fieldsTables...) + blockIndex++ + } + + actualTokenTable := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} + assert.Equal(t, tid, len(src.tokens)) + + expectedTokenTable := token.TableBlock{ + FieldsTables: []token.FieldTable{ + { + Field: "f1", + Entries: []*token.TableEntry{ + { + StartIndex: 0, + StartTID: 1, + BlockIndex: 1, + ValCount: 2, + MinVal: "f1v1", + MaxVal: "f1v2", + }, + }, + }, { + Field: "f2", + Entries: []*token.TableEntry{ + { + StartIndex: 2, + StartTID: 3, + BlockIndex: 1, + ValCount: 1, + MinVal: "f2v1", + MaxVal: "f2v1", + }, { + StartIndex: 0, + StartTID: 4, + BlockIndex: 2, + ValCount: 3, + MinVal: "f2v2", + MaxVal: "f2v4", + }, { + StartIndex: 0, + StartTID: 7, + BlockIndex: 3, + ValCount: 1, + MinVal: "f2v5", + MaxVal: "f2v5", + }, + }, + }, { + Field: "f3", + Entries: []*token.TableEntry{ + { + StartIndex: 1, + StartTID: 8, + BlockIndex: 3, + ValCount: 2, + MinVal: "f3v1", + MaxVal: "f3v2", + }, + }, + }, { + Field: "f4", + Entries: []*token.TableEntry{ + { + StartIndex: 0, + StartTID: 10, + BlockIndex: 4, + ValCount: 3, + MinVal: "f4v1", + MaxVal: "f4v3", + }, + }, + }, { + Field: "f5", + Entries: []*token.TableEntry{ + { + StartIndex: 0, + StartTID: 13, + BlockIndex: 5, + ValCount: 1, + MinVal: "f5v1", + MaxVal: "f5v1", + }, + }, + }, { + Field: "f6", + Entries: []*token.TableEntry{ + { + StartIndex: 1, + StartTID: 14, + BlockIndex: 5, + ValCount: 1, + MinVal: "f6v1", + MaxVal: "f6v1", + }, + }, + }, + }, + } + assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) +} + +func TestBlocksBuilder_IDsBlocks(t *testing.T) { + src := mockSource{ + ids: []seq.ID{ + {MID: 8, RID: 1}, + {MID: 7, RID: 1}, + {MID: 6, RID: 1}, + + {MID: 5, RID: 1}, + {MID: 4, RID: 1}, + {MID: 3, RID: 1}, + + {MID: 2, RID: 1}, + {MID: 1, RID: 1}, + }, + pos: []seq.DocPos{ + seq.PackDocPos(1, 0), + seq.PackDocPos(1, 10), + seq.PackDocPos(2, 0), + + seq.PackDocPos(2, 10), + seq.PackDocPos(2, 20), + seq.PackDocPos(3, 0), + + seq.PackDocPos(4, 0), + seq.PackDocPos(4, 10), + }, + } + + expectedSizes := []int{3, 3, 2} + + i := 0 + ids := []seq.ID{} + pos := []seq.DocPos{} + for block := range createIDsSealBlocks(src.IDsBlocks(3)) { + assert.Equal(t, expectedSizes[i], len(block.mids.Values)) + assert.Equal(t, expectedSizes[i], len(block.rids.Values)) + assert.Equal(t, expectedSizes[i], len(block.params.Values)) + i++ + j := 0 + for _, mid := range block.mids.Values { + ids = append(ids, seq.ID{MID: seq.MID(mid), RID: seq.RID(block.rids.Values[j])}) + pos = append(pos, seq.DocPos(block.params.Values[j])) + j++ + } + } + + assert.Equal(t, src.ids, ids) + assert.Equal(t, src.pos, pos) +} + +func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { + src := mockSource{ + tokenLIDs: [][]uint32{ + { + 10, // block 1, tid 1 + 20, // block 1, tid 1 + 30, // block 1, tid 1 + + 40, // block 2, tid 1 + }, { + 11, // block 2, tid 2 + 21, // block 2, tid 2 + + 31, // block 3, tid 2 + 41, // block 3, tid 2 + }, { + 10, // block 3, tid 3 + + 11, // block 4, tid 3 + 20, // block 4, tid 3 + 21, // block 4, tid 3 + + }, { + 30, // block 5, tid 4 + 40, // block 5, tid 4 + 50, // block 5, tid 4 + + 60, // block 6, tid 4 + }, + }, + } + + expected := []lidsSealBlock{{ + ext: lidsExt{ + minTID: 1, + maxTID: 1, + isContinued: false, + }, + payload: lids.Block{ + LIDs: []uint32{10, 20, 30}, + Offsets: []uint32{0, 3}, + IsLastLID: false, + }, + }, { + ext: lidsExt{ + minTID: 1, + maxTID: 2, + isContinued: true, + }, + payload: lids.Block{ + LIDs: []uint32{40, 11, 21}, + Offsets: []uint32{0, 1, 3}, + IsLastLID: false, + }, + }, { + ext: lidsExt{ + minTID: 2, + maxTID: 3, + isContinued: true, + }, + payload: lids.Block{ + LIDs: []uint32{31, 41, 10}, + Offsets: []uint32{0, 2, 3}, + IsLastLID: false, + }, + }, { + ext: lidsExt{ + minTID: 3, + maxTID: 3, + isContinued: true, + }, + payload: lids.Block{ + LIDs: []uint32{11, 20, 21}, + Offsets: []uint32{0, 3}, + IsLastLID: true, + }, + }, { + ext: lidsExt{ + minTID: 4, + maxTID: 4, + isContinued: false, + }, + payload: lids.Block{ + LIDs: []uint32{30, 40, 50}, + Offsets: []uint32{0, 3}, + IsLastLID: false, + }, + }, { + ext: lidsExt{ + minTID: 4, + maxTID: 4, + isContinued: true, + }, + payload: lids.Block{ + LIDs: []uint32{60}, + Offsets: []uint32{0, 1}, + IsLastLID: true, + }}, + } + bb := blocksBuilder{} + blocks := []lidsSealBlock{} + for block := range bb.BuildLIDsBlocks(src.TokenLIDs(), 3) { + block.payload.LIDs = slices.Clone(block.payload.LIDs) // copy lids + block.payload.Offsets = slices.Clone(block.payload.Offsets) // copy offsets + blocks = append(blocks, block) + } + assert.Equal(t, expected, blocks) +} diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go new file mode 100644 index 00000000..b04e4a52 --- /dev/null +++ b/frac/sealed/sealing/index.go @@ -0,0 +1,447 @@ +package sealing + +import ( + "bytes" + "encoding/binary" + "io" + "iter" + "time" + + "github.com/alecthomas/units" + + "github.com/ozontech/seq-db/bytespool" + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +// IndexSealer is responsible for creating and writing the index structure for sealed fractions. +// It organizes data into blocks, compresses them, and builds the complete index file with: +// - Multiple data sections (info, tokens, token table, offsets, IDs, LIDs) +// - Compression using ZSTD with configurable levels +// - Registry for quick access to block locations +// - PreloadedData structures for fast initialization instance of sealed fraction +type IndexSealer struct { + lastErr error // Last error encountered during processing + buf1 []byte // Reusable buffer for packing raw data before compression + buf2 []byte // Reusable buffer for compressed data + params common.SealParams // Configuration parameters for sealing process + + // PreloadedData structures built during sealing for fast initialization of sealed fraction + idsTable seqids.Table // Table mapping document IDs to blocks + lidsTable lids.Table // Table mapping token IDs to LID blocks + tokenTable token.Table // Table mapping fields to token blocks +} + +// NewIndexSealer creates a new IndexSealer instance with the given parameters. +func NewIndexSealer(params common.SealParams) *IndexSealer { + return &IndexSealer{ + params: params, + } +} + +// indexBlock represents a single block of data in the index file. +// Each block can be compressed and contains metadata for efficient retrieval. +type indexBlock struct { + codec storage.Codec // Compression codec used (No compression or ZSTD) + payload []byte // The actual block data (may be compressed) + rawLen uint32 // Original uncompressed data length + ext1 uint64 // Extended metadata field 1 (block-specific usage) + ext2 uint64 // Extended metadata field 2 (block-specific usage) +} + +// Bin converts the indexBlock to its binary representation for storage. +// It creates a header with metadata and returns the header + payload. +// Parameters: +// - pos: The file position where this block will be written +// +// Returns: +// - storage.IndexBlockHeader: The block header with metadata +// - []byte: The payload data to write +func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { + header := storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec) + return header, i.payload +} + +// WriteIndex writes the complete index structure to the provided writer. +// The index file structure: +// +----------------+----------------+----------------+ +// | Prefix | Data Blocks | Registry | +// | (16 bytes) | (multiple) | (block headers)| +// +----------------+----------------+----------------+ +// +// Prefix contains: +// - 8 bytes: Position of registry start +// - 8 bytes: Size of registry +// +// Parameters: +// - ws: WriteSeeker to write the index data to +// - src: Source interface providing the data to be sealed +// +// Returns: +// - error: Any error encountered during writing +func (s *IndexSealer) WriteIndex(ws io.WriteSeeker, src Source) error { + const prefixSize = 16 // Size of prefix that will hold registry position and size + + // Skip prefix area initially - we'll write it at the end + if _, err := ws.Seek(prefixSize, io.SeekStart); err != nil { + return err + } + + // Create buffers for headers and payload writing + hw := bytes.NewBuffer(nil) // Headers writer - collects all block headers + bw := bytespool.AcquireWriterSize(ws, int(units.MiB)) // Buffered writer for payload + defer bytespool.ReleaseWriter(bw) + + // Write all index blocks and collect headers + if err := s.writeBlocks(prefixSize, bw, hw, src); err != nil { + return err + } + if err := bw.Flush(); err != nil { + return err + } + + // Calculate registry position and size + size := hw.Len() // Registry size (all headers) + pos, err := ws.Seek(0, io.SeekEnd) // Current end position = registry start + if err != nil { + return err + } + + // Write registry (all block headers) at the end of file + if _, err := bw.Write(hw.Bytes()); err != nil { + return err + } + if err := bw.Flush(); err != nil { + return err + } + + // Write prefix at beginning of file with registry metadata + prefix := make([]byte, 0, prefixSize) + prefix = binary.LittleEndian.AppendUint64(prefix, uint64(pos)) // Registry position + prefix = binary.LittleEndian.AppendUint64(prefix, uint64(size)) // Registry size + if _, err := ws.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err = ws.Write(prefix); err != nil { + return err + } + + return nil +} + +// writeBlocks processes all index blocks from the source and writes them to the output. +// It simultaneously writes payload data to one writer and headers to another. +// Parameters: +// - pos: Starting position for the first block +// - payloadWriter: Writer for block payload data +// - headersWriter: Writer for block headers (registry) +// - src: Data source +// +// Returns: +// - error: Any error encountered during processing +func (s *IndexSealer) writeBlocks(pos int, payloadWriter, headersWriter io.Writer, src Source) error { + // Process each index block from the source + for block := range s.indexBlocks(src) { + header, payload := block.Bin(int64(pos)) + // Write payload to main data section + if _, err := payloadWriter.Write(payload); err != nil { + return err + } + // Write header to registry + if _, err := headersWriter.Write(header); err != nil { + return err + } + pos += len(payload) // Advance position for next block + } + if s.lastErr != nil { + return s.lastErr + } + return nil +} + +// indexBlocks generates a sequence of index blocks from the source data. +// The blocks are organized in specific sections: +// 1. Info Section - Basic fraction metadata +// 2. Tokens Section - Token data blocks +// 3. Token Table Section - Field-to-token mapping table +// 4. Offsets Section - Document block offsets +// 5. IDs Section - Document ID blocks (MIDs, RIDs, Positions) +// 6. LIDs Section - Token ID to LID mapping blocks +// +// Returns: +// - iter.Seq[indexBlock]: Sequence of index blocks to write +func (s *IndexSealer) indexBlocks(src Source) iter.Seq[indexBlock] { + return func(yield func(indexBlock) bool) { + bb := blocksBuilder{} + blocksCounter := uint32(0) // Global block counter for indexing + statsOverall := startStats() // Overall statistics collector + + // Helper to push a block and update statistics + push := func(b indexBlock, statsSection *blocksStats) bool { + blocksCounter++ + statsOverall.takeStock(b) + statsSection.takeStock(b) + return yield(b) + } + + // Helper to write section separator (empty block) + sectionSeparator := func() bool { + blocksCounter++ + return yield(indexBlock{}) // empty block as separator + } + + // SECTION 1: Info Section + statsInfo := startStats() + info := src.Info() + if !push(s.packInfoBlock(sealed.BlockInfo{Info: info}), &statsInfo) { + return + } + + // SECTION 2: Tokens Section + statsTokens := startStats() + allFieldsTables := []token.FieldTable{} + tokensBlocks := bb.BuildTokenBlocks(src.TokenBlocks(consts.RegularBlockSize), src.Fields()) + for block, fieldsTables := range tokensBlocks { + if !push(s.packTokenBlock(block), &statsTokens) { + return + } + allFieldsTables = append(allFieldsTables, fieldsTables...) + } + if s.lastErr = util.CollapseErrors([]error{src.LastError(), bb.LastError()}); s.lastErr != nil { + return + } + + if !sectionSeparator() { + return + } + + // SECTION 3: Token Table Section + statsTokenTable := startStats() + tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} + if !push(s.packTokenTableBlock(tokenTableBlock), &statsTokenTable) { + return + } + + if !sectionSeparator() { + return + } + + // SECTION 4: Offsets Section + statsOffsets := startStats() + offsets := sealed.BlockOffsets{ + IDsTotal: info.DocsTotal + 1, // +1 for system ID at position zero + Offsets: src.BlocksOffsets(), + } + if !push(s.packBlocksOffsetsBlock(offsets), &statsOffsets) { + return + } + + // SECTION 5: IDs Section + s.idsTable.StartBlockIndex = blocksCounter // Record starting position for IDs blocks + statsMIDs, statsRIDs, statsParams := startStats(), startStats(), startStats() + for block := range createIDsSealBlocks(src.IDsBlocks(consts.IDsPerBlock)) { + if !push(s.packMIDsBlock(block), &statsMIDs) { + return + } + if !push(s.packRIDsBlock(block), &statsRIDs) { + return + } + if !push(s.packPosBlock(block), &statsParams) { + return + } + } + if s.lastErr = src.LastError(); s.lastErr != nil { + return + } + + if !sectionSeparator() { + return + } + + // SECTION 6: LIDs Section + statsLIDs := startStats() + s.lidsTable.StartBlockIndex = blocksCounter + for block := range bb.BuildLIDsBlocks(src.TokenLIDs(), consts.LIDBlockCap) { + if !push(s.packLIDsBlock(block), &statsLIDs) { + return + } + } + if s.lastErr = util.CollapseErrors([]error{src.LastError(), bb.LastError()}); s.lastErr != nil { + return + } + + if !sectionSeparator() { + return + } + + // Log statistics for all sections + endTime := time.Now() + statsInfo.log("info", statsTokens.start) + statsTokens.log("tokens", statsTokenTable.start) + statsTokenTable.log("tokenTable", statsOffsets.start) + statsOffsets.log("offsets", statsMIDs.start) + statsMIDs.log("mids", statsLIDs.start) + statsRIDs.log("rids", statsLIDs.start) + statsParams.log("pos", statsLIDs.start) + statsLIDs.log("lids", endTime) + statsOverall.log("overall", endTime) + } +} + +// collapseOrderedFieldsTables merges field tables with identical field names +// Assumes the input array is already sorted by the Field property +func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { + if len(src) == 0 { + return nil + } + current := src[0] + dst := []token.FieldTable{} + for _, ft := range src[1:] { + if current.Field == ft.Field { + current.Entries = append(current.Entries, ft.Entries...) + continue + } + dst = append(dst, current) + current = ft + } + dst = append(dst, current) + return dst +} + +// newIndexBlock creates an uncompressed index block. +func newIndexBlock(raw []byte) indexBlock { + return indexBlock{ + codec: storage.CodecNo, + rawLen: uint32(len(raw)), + payload: raw, + } +} + +// newIndexBlockZSTD creates a compressed index block using ZSTD compression. +// Falls back to uncompressed if compression doesn't provide benefits. +func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { + s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) + // Only use compression if it actually reduces size + if len(s.buf2) < len(raw) { + return indexBlock{ + codec: storage.CodecZSTD, + rawLen: uint32(len(raw)), + payload: s.buf2, + } + } + return newIndexBlock(raw) +} + +// packInfoBlock packs fraction information into an index block. +func (s *IndexSealer) packInfoBlock(block sealed.BlockInfo) indexBlock { + s.buf1 = block.Pack(s.buf1[:0]) + return newIndexBlock(s.buf1) // Info block is typically small, no compression +} + +// packTokenBlock packs token data into a compressed index block. +func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { + s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data + b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) + // Store TID range in extended metadata + b.ext1 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) + return b +} + +// packTokenTableBlock packs the token table into a compressed index block. +func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { + s.tokenTable = token.TableFromBlocks([]token.TableBlock{tokenTableBlock}) // Store for PreloadedData + + // Packing block + s.buf1 = tokenTableBlock.Pack(s.buf1[:0]) + return s.newIndexBlockZSTD(s.buf1, s.params.TokenTableZstdLevel) +} + +// packBlocksOffsetsBlock packs document block offsets into a compressed index block. +func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { + // Update IDs table for PreloadedData + s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs + s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks + + // Packing block + s.buf1 = block.Pack(s.buf1[:0]) + b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel) + return b +} + +// packMIDsBlock packs MIDs into a compressed index block. +func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { + // Get the last ID in the block (smallest due to descending order) + last := len(block.mids.Values) - 1 + minID := seq.ID{ + MID: seq.MID(block.mids.Values[last]), + RID: seq.RID(block.rids.Values[last]), + } + s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData + + // Packing block + s.buf1 = block.mids.Pack(s.buf1[:0]) + b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) + // Store min MID and RID in extended metadata + b.ext1 = uint64(minID.MID) + b.ext2 = uint64(minID.RID) + return b +} + +// packRIDsBlock packs RIDs into a compressed index block. +func (s *IndexSealer) packRIDsBlock(block idsSealBlock) indexBlock { + s.buf1 = block.rids.Pack(s.buf1[:0]) + b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) + return b +} + +// packPosBlock packs document positions into a compressed index block. +func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { + s.buf1 = block.params.Pack(s.buf1[:0]) + b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) + return b +} + +// packLIDsBlock packs Local IDs (LIDs) into a compressed index block. +// Also updates LIDs table for preloaded data access. +func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { + var ext1 uint64 + if block.ext.isContinued { // todo: Legacy continuation flag + ext1 = 1 + block.ext.minTID++ // Adjust for legacy format + } + + // Update LIDs table for PreloadedData + s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) + s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) + s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) + + // Packing block + s.buf1 = block.payload.Pack(s.buf1[:0]) + b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) + b.ext1 = ext1 // Legacy continuation flag + b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range + return b +} + +// LIDsTable returns the built LIDs table for fast initialization of sealed fraction. +func (s *IndexSealer) LIDsTable() *lids.Table { + return &s.lidsTable +} + +// TokenTable returns the built token table for fast initialization of sealed fraction. +func (s *IndexSealer) TokenTable() token.Table { + return s.tokenTable +} + +// IDsTable returns the built IDs table for fast initialization of sealed fraction. +func (s *IndexSealer) IDsTable() seqids.Table { + return s.idsTable +} diff --git a/frac/sealed/sealing/sealer.go b/frac/sealed/sealing/sealer.go new file mode 100644 index 00000000..dc0714a6 --- /dev/null +++ b/frac/sealed/sealing/sealer.go @@ -0,0 +1,110 @@ +package sealing + +import ( + "errors" + "iter" + "os" + "path/filepath" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +// Source interface defines the contract for data sources that can be sealed. +// Provides access to all necessary data components for index creation. +type Source interface { + Info() *common.Info // Fraction metadata information + IDsBlocks(size int) iter.Seq2[[]seq.ID, []seq.DocPos] // Ordered sequence of document IDs and their positions, divided into blocks + TokenBlocks(size int) iter.Seq[[][]byte] // Ordered sequence of tokens divided into blocks + Fields() iter.Seq2[string, uint32] // Ordered sequence of fields with their max field's TID value + TokenLIDs() iter.Seq[[]uint32] // Sequence of Token LIDs ordered by TID and LID + BlocksOffsets() []uint64 // Offsets of DocBlock's in the doc file + LastError() error // Last error encountered during data retrieval +} + +// Seal is the main entry point for sealing a fraction. +// It performs the complete sealing process: +// 1. Creates the index file structure +// 2. Writes all index blocks with compression +// 3. Builds PreloadedData structures for fast initialization of sealed fraction +// 4. Handles file system operations and error recovery +// +// Parameters: +// - src: Data source providing all fraction data +// - params: Sealing parameters including compression levels +// +// Returns: +// - *sealed.PreloadedData: Preloaded data structures for initialization of sealed fraction +// - error: Any error encountered during the sealing process +func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { + start := time.Now() + info := src.Info() + + // Validate that we're not sealing an empty fraction + if info.To == 0 { + return nil, errors.New("sealing of an empty active fraction is not supported") + } + + // Create temporary index file (will be renamed on success) + indexFile, err := os.Create(info.Path + consts.IndexTmpFileSuffix) + if err != nil { + return nil, err + } + + // Create index sealer and write the index structure + indexSealer := NewIndexSealer(params) + if err := indexSealer.WriteIndex(indexFile, src); err != nil { + return nil, err + } + + // Ensure data is flushed to disk + if err := indexFile.Sync(); err != nil { + return nil, err + } + + // Get final file size for metadata + stat, err := indexFile.Stat() + if err != nil { + return nil, err + } + info.IndexOnDisk = uint64(stat.Size()) + + // Close file before renaming + if err := indexFile.Close(); err != nil { + return nil, err + } + + // Atomically rename temporary file to final name + if err := os.Rename(indexFile.Name(), info.Path+consts.IndexFileSuffix); err != nil { + return nil, err + } + + // Ensure directory metadata is synced to disk + util.MustSyncPath(filepath.Dir(info.Path)) + + // Build preloaded data structure for fast query access + preloaded := sealed.PreloadedData{ + Info: info, + TokenTable: indexSealer.TokenTable(), + BlocksData: sealed.BlocksData{ + IDsTable: indexSealer.IDsTable(), + LIDsTable: indexSealer.LIDsTable(), + BlocksOffsets: src.BlocksOffsets(), + }, + } + + // Log successful sealing operation + logger.Info( + "fraction sealed", + zap.String("fraction", filepath.Dir(info.Path)), + zap.Float64("time_spent_s", util.DurationToUnit(time.Since(start), "s")), + ) + return &preloaded, nil +} diff --git a/frac/sealed/sealing/stats.go b/frac/sealed/sealing/stats.go new file mode 100644 index 00000000..5b119d60 --- /dev/null +++ b/frac/sealed/sealing/stats.go @@ -0,0 +1,42 @@ +package sealing + +import ( + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" +) + +type blocksStats struct { + start time.Time + len int + rawLen int + blocksCount int +} + +func startStats() blocksStats { + return blocksStats{start: time.Now()} +} + +func (s *blocksStats) takeStock(block indexBlock) { + s.blocksCount++ + s.len += len(block.payload) + s.rawLen += int(block.rawLen) +} + +func (s *blocksStats) log(name string, endTime time.Time) { + var ratio float64 + if s.len > 0 { + ratio = float64(s.rawLen) / float64(s.len) + } + logger.Info("seal block stats", + zap.String("type", name), + util.ZapUint64AsSizeStr("raw", uint64(s.rawLen)), + util.ZapUint64AsSizeStr("compressed", uint64(s.len)), + util.ZapFloat64WithPrec("ratio", ratio, 2), + zap.Uint64("blocks_count", uint64(s.blocksCount)), + util.ZapDurationWithPrec("write_duration_ms", endTime.Sub(s.start), "ms", 0), + ) +} diff --git a/frac/sealed/token/table.go b/frac/sealed/token/table.go index cfccf38d..92c6102d 100644 --- a/frac/sealed/token/table.go +++ b/frac/sealed/token/table.go @@ -9,6 +9,21 @@ import ( "github.com/ozontech/seq-db/logger" ) +// token.Table maps fields to token.Blocks, specifying which block and the token range +// contains the field's token sequence. +// +// A single token.Block may contain tokens for multiple fields; thus, multiple +// token.TableEntry instances can reference the same block but different ranges. +// +// A single field may also span multiple token.Blocks entirely. +// +// Here's how it can be depicted: +// +// Field Ranges: <-------f1----------><------f2-------><------------f3------------><----------f4----------> +// Token Blocks: [.t1.t2.t3.t4.][.t5.t6.t7.t8.][.t9....etc...][.............][.............][.............] +// TableEntries: {-----f1------}{-f1-}{---f2--}{--f2--}{-f3--}{------f3-----}{-f3-}{----f4-}{-----f4------} +// + const ( TableEntrySize = unsafe.Sizeof(TableEntry{}) + unsafe.Sizeof(&TableEntry{}) FieldDataSize = unsafe.Sizeof(FieldData{}) + unsafe.Sizeof(&FieldData{}) diff --git a/frac/sealed/token/table_entry.go b/frac/sealed/token/table_entry.go index 135c66bc..a16b9a55 100644 --- a/frac/sealed/token/table_entry.go +++ b/frac/sealed/token/table_entry.go @@ -1,9 +1,8 @@ package token -// TableEntry describes token.Block metadata: what TID and tokens it contains and etc. -// One token.Block can cover multiple instances of token.TableEntry +// TableEntry is part of token.Table and points to a fragment of token.Block type TableEntry struct { - StartIndex uint32 // number of tokens in block before this TokenEntry + StartIndex uint32 // offset from the beginning of the block to the first token pointed to by the TableEntry StartTID uint32 // first TID of TableEntry BlockIndex uint32 // sequence number of the physical block of tokens in the file ValCount uint32 diff --git a/frac/sealed/token/table_loader.go b/frac/sealed/token/table_loader.go index b9e3f28e..06ffc401 100644 --- a/frac/sealed/token/table_loader.go +++ b/frac/sealed/token/table_loader.go @@ -105,6 +105,7 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) { return blocks, nil } +// TableBlock represents how token.Table is stored on disk type TableBlock struct { FieldsTables []FieldTable } diff --git a/frac/sealed_index.go b/frac/sealed_index.go index ff8fae15..8c4107f6 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" @@ -23,7 +24,7 @@ import ( type sealedDataProvider struct { ctx context.Context - info *Info + info *common.Info config *Config idsTable *seqids.Table diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index c4ba2ed8..83a7f060 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -5,6 +5,8 @@ import ( "go.uber.org/zap" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/logger" @@ -19,7 +21,7 @@ type Loader struct { blockBuf []byte } -func (l *Loader) Load(state *sealedState, info *Info, indexReader *storage.IndexReader) { +func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexReader *storage.IndexReader) { t := time.Now() l.reader = indexReader @@ -29,11 +31,11 @@ func (l *Loader) Load(state *sealedState, info *Info, indexReader *storage.Index var err error - if state.idsTable, state.BlocksOffsets, err = l.loadIDs(); err != nil { + if blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(); err != nil { logger.Fatal("load ids error", zap.Error(err)) } - if state.lidsTable, err = l.loadLIDsBlocksTable(); err != nil { + if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(); err != nil { logger.Fatal("load lids error", zap.Error(err)) } @@ -76,7 +78,7 @@ func (l *Loader) loadIDs() (idsTable seqids.Table, blocksOffsets []uint64, err e return idsTable, nil, err } - b := BlockOffsets{} + b := sealed.BlockOffsets{} if err := b.Unpack(result); err != nil { return idsTable, nil, err } diff --git a/fracmanager/async_searcher_test.go b/fracmanager/async_searcher_test.go index 5ff94092..310a20fd 100644 --- a/fracmanager/async_searcher_test.go +++ b/fracmanager/async_searcher_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/seq" @@ -16,11 +17,11 @@ import ( type fakeFrac struct { frac.Fraction - info frac.Info + info common.Info dp fakeDP } -func (f *fakeFrac) Info() *frac.Info { +func (f *fakeFrac) Info() *common.Info { return &f.info } @@ -55,7 +56,7 @@ func TestAsyncSearcherMaintain(t *testing.T) { Retention: time.Hour, } fracs := []frac.Fraction{ - &fakeFrac{info: frac.Info{Path: "1"}}, + &fakeFrac{info: common.Info{Path: "1"}}, } r.NoError(as.StartSearch(req, fracs)) diff --git a/fracmanager/config.go b/fracmanager/config.go index b484145e..6b3be80f 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -7,6 +7,7 @@ import ( "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/util" ) @@ -23,7 +24,7 @@ type Config struct { MaintenanceDelay time.Duration CacheCleanupDelay time.Duration CacheGCDelay time.Duration - SealParams frac.SealParams + SealParams common.SealParams SortCacheSize uint64 // size for docs cache for active fraction Fraction frac.Config diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index b97b0481..fdaf80de 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -70,7 +70,7 @@ type activeRef struct { } func (fm *FracManager) newActiveRef(active *frac.Active) activeRef { - f := &proxyFrac{active: active, fp: fm.fracProvider} + f := newProxyFrac(active, fm.fracProvider) return activeRef{ frac: f, ref: &fracRef{instance: f}, diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 5e8d06e3..989c7b2d 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -11,8 +11,9 @@ import ( "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/tests/common" + testscommon "github.com/ozontech/seq-db/tests/common" ) // newFracManagerWithBackgroundStart only used from tests @@ -48,10 +49,10 @@ func MakeSomeFractions(t *testing.T, fm *FracManager) { } func TestCleanUp(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ FracSize: 1000, @@ -94,9 +95,9 @@ func TestCleanUp(t *testing.T) { } func TestMatureMode(t *testing.T) { - dataDir := common.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + dataDir := testscommon.GetTestTmpDir(t) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) launchAndCheck := func(checkFn func(fm *FracManager)) { fm := NewFracManager(context.Background(), &Config{ @@ -170,7 +171,7 @@ func TestOldestCT(t *testing.T) { for i := range fracCount { fm.localFracs = append(fm.localFracs, &fracRef{instance: frac.NewSealed( - "", nil, nil, nil, &frac.Info{ + "", nil, nil, nil, &common.Info{ Path: fmt.Sprintf("local-frac-%d", i), IndexOnDisk: 1, CreationTime: uint64(nowOldestLocal.UnixMilli()), @@ -193,7 +194,7 @@ func TestOldestCT(t *testing.T) { for i := range fracCount { fm.remoteFracs = append(fm.remoteFracs, frac.NewRemote( - t.Context(), "", nil, nil, nil, &frac.Info{ + t.Context(), "", nil, nil, nil, &common.Info{ Path: fmt.Sprintf("remote-frac-%d", i), IndexOnDisk: 1, CreationTime: uint64(nowOldestRemote.UnixMilli()), @@ -207,7 +208,7 @@ func TestOldestCT(t *testing.T) { for i := range fracCount { fm.localFracs = append(fm.localFracs, &fracRef{instance: frac.NewSealed( - "", nil, nil, nil, &frac.Info{ + "", nil, nil, nil, &common.Info{ Path: fmt.Sprintf("local-frac-%d", i), IndexOnDisk: 1, CreationTime: uint64(nowOldestLocal.UnixMilli()), diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 33b081b2..fa16c342 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) @@ -52,7 +54,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { ) } -func (fp *fractionProvider) NewSealed(name string, cachedInfo *frac.Info) *frac.Sealed { +func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *frac.Sealed { return frac.NewSealed( name, fp.readLimiter, @@ -63,7 +65,7 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *frac.Info) *frac. ) } -func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *frac.PreloadedData) *frac.Sealed { +func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *sealed.PreloadedData) *frac.Sealed { return frac.NewSealedPreloaded( name, preloadedData, @@ -75,7 +77,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *frac. } func (fp *fractionProvider) NewRemote( - ctx context.Context, name string, cachedInfo *frac.Info, + ctx context.Context, name string, cachedInfo *common.Info, ) *frac.Remote { return frac.NewRemote( ctx, diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index c0feb70b..a6fad95d 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -10,6 +10,8 @@ import ( "go.uber.org/zap" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/seq" @@ -43,10 +45,20 @@ type proxyFrac struct { sealed *frac.Sealed readonly bool + name string + indexWg sync.WaitGroup sealWg sync.WaitGroup } +func newProxyFrac(active *frac.Active, fp *fractionProvider) *proxyFrac { + return &proxyFrac{ + fp: fp, + active: active, + name: active.BaseFileName, + } +} + func (f *proxyFrac) cur() frac.Fraction { f.useMu.RLock() defer f.useMu.RUnlock() @@ -65,7 +77,7 @@ func (f *proxyFrac) Contains(mid seq.MID) bool { return f.cur().Contains(mid) } -func (f *proxyFrac) Info() *frac.Info { +func (f *proxyFrac) Info() *common.Info { return f.cur().Info() } @@ -100,13 +112,13 @@ func (f *proxyFrac) Append(docs, meta []byte) error { func (f *proxyFrac) WaitWriteIdle() { start := time.Now() - logger.Info("waiting fraction to stop write...", zap.String("name", f.active.BaseFileName)) + logger.Info("waiting fraction to stop write...", zap.String("name", f.name)) f.indexWg.Wait() waitTime := util.DurationToUnit(time.Since(start), "s") - logger.Info("write is stopped", zap.String("name", f.active.BaseFileName), zap.Float64("time_wait_s", waitTime)) + logger.Info("write is stopped", zap.String("name", f.name), zap.Float64("time_wait_s", waitTime)) } -func (f *proxyFrac) Seal(params frac.SealParams) (*frac.Sealed, error) { +func (f *proxyFrac) Seal(params common.SealParams) (*frac.Sealed, error) { f.useMu.Lock() if f.isSuicidedState() { f.useMu.Unlock() @@ -123,7 +135,11 @@ func (f *proxyFrac) Seal(params frac.SealParams) (*frac.Sealed, error) { f.WaitWriteIdle() - preloaded, err := frac.Seal(active, params) + src, err := frac.NewActiveSealingSource(active, params) + if err != nil { + return nil, err + } + preloaded, err := sealing.Seal(src, params) if err != nil { return nil, err } @@ -150,14 +166,14 @@ func (f *proxyFrac) trySetSuicided() (*frac.Active, *frac.Sealed, bool) { sealed := f.sealed active := f.active - sealing := f.isSealingState() + isSealing := f.isSealingState() - if !sealing { + if !isSealing { f.sealed = nil f.active = nil } - return active, sealed, sealing + return active, sealed, isSealing } func (f *proxyFrac) Offload(ctx context.Context, u storage.Uploader) (bool, error) { @@ -179,8 +195,8 @@ func (f *proxyFrac) Offload(ctx context.Context, u storage.Uploader) (bool, erro } func (f *proxyFrac) Suicide() { - active, sealed, sealing := f.trySetSuicided() - if sealing { + active, sealed, isSealing := f.trySetSuicided() + if isSealing { f.sealWg.Wait() // we can get `sealing` == true only once here // next attempt after Wait() should be successful diff --git a/fracmanager/sealed_frac_cache.go b/fracmanager/sealed_frac_cache.go index cce1b533..f7230fbd 100644 --- a/fracmanager/sealed_frac_cache.go +++ b/fracmanager/sealed_frac_cache.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/logger" ) @@ -22,7 +22,7 @@ type sealedFracCache struct { fileName string fracCacheMu sync.RWMutex - fracCache map[string]*frac.Info + fracCache map[string]*common.Info version uint64 // if we increment the counter every second it will take 31 billion years (quite enough) saveMu sync.Mutex @@ -31,7 +31,7 @@ type sealedFracCache struct { func NewSealedFracCache(filePath string) *sealedFracCache { fc := &sealedFracCache{ - fracCache: make(map[string]*frac.Info), + fracCache: make(map[string]*common.Info), fracCacheMu: sync.RWMutex{}, fullPath: filePath, fileName: filepath.Base(filePath), @@ -74,7 +74,7 @@ func (fc *sealedFracCache) LoadFromDisk(fileName string) { } // AddFraction adds a new entry to the in-memory [sealedFracCache]. -func (fc *sealedFracCache) AddFraction(name string, info *frac.Info) { +func (fc *sealedFracCache) AddFraction(name string, info *common.Info) { fc.fracCacheMu.Lock() defer fc.fracCacheMu.Unlock() @@ -94,7 +94,7 @@ func (fc *sealedFracCache) RemoveFraction(name string) { // GetFracInfo returns fraction info and a flag that indicates // whether the data is present in the map. -func (fc *sealedFracCache) GetFracInfo(name string) (*frac.Info, bool) { +func (fc *sealedFracCache) GetFracInfo(name string) (*common.Info, bool) { fc.fracCacheMu.RLock() defer fc.fracCacheMu.RUnlock() diff --git a/fracmanager/sealed_frac_cache_test.go b/fracmanager/sealed_frac_cache_test.go index 3f9fdec1..5a1dbdaf 100644 --- a/fracmanager/sealed_frac_cache_test.go +++ b/fracmanager/sealed_frac_cache_test.go @@ -13,8 +13,9 @@ import ( "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/tests/common" + testscommon "github.com/ozontech/seq-db/tests/common" ) const dummyFracFixture = `{"a":{"name":"a","ver":"1.1","docs_total":1,"docs_on_disk":363,"docs_raw":450,"meta_on_disk":0,"index_on_disk":1284,"const_regular_block_size":16384,"const_ids_per_block":4096,"const_lid_block_cap":65536,"from":1666193255114,"to":1666193255114,"creation_time":1666193044479},"b":{"name":"b","ver":"1.2","docs_total":1,"docs_on_disk":363,"docs_raw":450,"meta_on_disk":0,"index_on_disk":1276,"const_regular_block_size":16384,"const_ids_per_block":4096,"const_lid_block_cap":65536,"from":1666193602304,"to":1666193602304,"creation_time":1666193598979}}` @@ -25,13 +26,13 @@ func loadFracCacheContent(dataDir string) ([]byte, error) { return content, err } -func loadFracCache(dataDir string) (map[string]*frac.Info, error) { +func loadFracCache(dataDir string) (map[string]*common.Info, error) { content, err := loadFracCacheContent(dataDir) if err != nil { return nil, err } - fracCache := make(map[string]*frac.Info) + fracCache := make(map[string]*common.Info) err = json.Unmarshal(content, &fracCache) if err != nil { return nil, err @@ -47,10 +48,10 @@ func writeToFracCache(dataDir, fname, data string) error { } func TestEmpty(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) f := NewSealedFracCache(filepath.Join(dataDir, consts.FracCacheFileSuffix)) err := f.SyncWithDisk() @@ -71,10 +72,10 @@ func TestEmpty(t *testing.T) { } func TestLoadFromDisk(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) err := writeToFracCache(dataDir, consts.FracCacheFileSuffix, dummyFracFixture) assert.NoError(t, err) @@ -101,9 +102,9 @@ func TestLoadFromDisk(t *testing.T) { } func TestRemoveFraction(t *testing.T) { - dataDir := common.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + dataDir := testscommon.GetTestTmpDir(t) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) err := writeToFracCache(dataDir, consts.FracCacheFileSuffix, dummyFracFixture) assert.NoError(t, err) @@ -121,7 +122,7 @@ func TestRemoveFraction(t *testing.T) { assert.NoError(t, err) assert.Equal(t, contents, []byte("{}")) - newInfo := &frac.Info{ + newInfo := &common.Info{ Path: "/data/c", Ver: "1.3", DocsTotal: 0, @@ -142,7 +143,7 @@ func TestRemoveFraction(t *testing.T) { m, err := loadFracCache(dataDir) assert.NoError(t, err) - expected := map[string]*frac.Info{"c": newInfo} + expected := map[string]*common.Info{"c": newInfo} assert.Equal(t, expected, m) f.RemoveFraction("c") @@ -155,10 +156,10 @@ func TestRemoveFraction(t *testing.T) { } func TestWriteToDisk(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) err := writeToFracCache(dataDir, consts.FracCacheFileSuffix, dummyFracFixture) assert.NoError(t, err) @@ -166,7 +167,7 @@ func TestWriteToDisk(t *testing.T) { f := NewSealedFracCache(filepath.Join(dataDir, consts.FracCacheFileSuffix)) f.LoadFromDisk(filepath.Join(dataDir, consts.FracCacheFileSuffix)) - newInfo := &frac.Info{ + newInfo := &common.Info{ Path: "/data/c", Ver: "1.3", DocsTotal: 0, @@ -225,15 +226,15 @@ func TestWriteToDisk(t *testing.T) { } func TestUnusedFractionsCleanup(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) err := writeToFracCache(dataDir, consts.FracCacheFileSuffix, dummyFracFixture) assert.NoError(t, err) - expected := map[string]*frac.Info{} + expected := map[string]*common.Info{} cacheFile := filepath.Join(dataDir, consts.FracCacheFileSuffix) diskFracCache := NewFracCacheFromDisk(cacheFile) @@ -264,10 +265,10 @@ func rotateAndSeal(fm *FracManager) frac.Fraction { } func TestFracInfoSavedToCache(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) const maxSize = 10000 @@ -283,7 +284,7 @@ func TestFracInfoSavedToCache(t *testing.T) { metaRoot := insaneJSON.Spawn() defer insaneJSON.Release(metaRoot) - infos := map[string]*frac.Info{} + infos := map[string]*common.Info{} totalSize := uint64(0) cnt := 1 for totalSize < maxSize { @@ -345,10 +346,10 @@ func appendGlob(files []string, dataDir, glob string) []string { } func TestExtraFractionsRemoved(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) const maxSize = 5500 const times = 10 @@ -365,7 +366,7 @@ func TestExtraFractionsRemoved(t *testing.T) { assert.NoError(t, err) dp := frac.NewDocProvider() - infos := map[string]*frac.Info{} + infos := map[string]*common.Info{} for i := 1; i < times+1; i++ { addDummyDoc(t, fm, dp, seq.SimpleID(i)) @@ -408,10 +409,10 @@ func TestExtraFractionsRemoved(t *testing.T) { } func TestMissingCacheFilesDeleted(t *testing.T) { - dataDir := common.GetTestTmpDir(t) + dataDir := testscommon.GetTestTmpDir(t) - common.RecreateDir(dataDir) - defer common.RemoveDir(dataDir) + testscommon.RecreateDir(dataDir) + defer testscommon.RemoveDir(dataDir) const maxSize = 5500 const times = 10 diff --git a/fracmanager/sealer_test.go b/fracmanager/sealer_test.go index 7a3dda12..7b6baaa0 100644 --- a/fracmanager/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -18,8 +18,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/tests/common" + testscommon "github.com/ozontech/seq-db/tests/common" ) var ( @@ -38,7 +41,7 @@ func fillActiveFraction(active *frac.Active) error { docRoot := insaneJSON.Spawn() defer insaneJSON.Release(docRoot) - file, err := os.Open(filepath.Join(common.TestDataDir, "k8s.logs")) + file, err := os.Open(filepath.Join(testscommon.TestDataDir, "k8s.logs")) if err != nil { return err } @@ -81,9 +84,9 @@ func fillActiveFraction(active *frac.Active) error { return nil } -func defaultSealingParams() frac.SealParams { +func defaultSealingParams() common.SealParams { const minZstdLevel = 1 - return frac.SealParams{ + return common.SealParams{ IDsZstdLevel: minZstdLevel, LIDsZstdLevel: minZstdLevel, TokenListZstdLevel: minZstdLevel, @@ -105,18 +108,24 @@ func Benchmark_SealingWithSort(b *testing.B) { func runSealingBench(b *testing.B, cfg *frac.Config) { cm := NewCacheMaintainer(uint64(units.MiB)*64, uint64(units.MiB)*64, nil) fp := newFractionProvider(cfg, nil, cm, 1, 1) - defer fp.Stop() dataDir := filepath.Join(b.TempDir(), "BenchmarkSealing") - common.RecreateDir(dataDir) + testscommon.RecreateDir(dataDir) active := fp.NewActive(filepath.Join(dataDir, "test")) err := fillActiveFraction(active) assert.NoError(b, err) + fp.Stop() + + seal := func(active *frac.Active, params common.SealParams) (*sealed.PreloadedData, error) { + src, err := frac.NewActiveSealingSource(active, params) + assert.NoError(b, err) + return sealing.Seal(src, params) + } params := defaultSealingParams() // The first sealing will sort all the LIDs, so we take this load out of the measurement range - _, err = frac.Seal(active, params) + _, err = seal(active, params) assert.NoError(b, err) b.ReportAllocs() @@ -136,7 +145,7 @@ func runSealingBench(b *testing.B, cfg *frac.Config) { } for b.Loop() { - _, err = frac.Seal(active, params) + _, err = seal(active, params) assert.NoError(b, err) } } diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index 467163da..62465d61 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -10,6 +10,7 @@ import ( "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" @@ -27,8 +28,8 @@ func (t *testFakeFrac) DataProvider(_ context.Context) (frac.DataProvider, func( return frac.EmptyDataProvider{}, func() {} } -func (t *testFakeFrac) Info() *frac.Info { - return frac.NewInfo("test", 0, 0) +func (t *testFakeFrac) Info() *common.Info { + return common.NewInfo("test", 0, 0) } func TestFracsLimit(t *testing.T) { diff --git a/storage/block_former.go b/storage/block_former.go deleted file mode 100644 index 93447506..00000000 --- a/storage/block_former.go +++ /dev/null @@ -1,91 +0,0 @@ -// Package disk implements read write fraction routines -package storage - -import ( - "time" -) - -type BlockFormer struct { - Buf []byte - writer *BlocksWriter - blockThreshold int - - // stats - start time.Time - stats BlockStats -} - -type FlushOptions struct { - ext1 uint64 - ext2 uint64 - zstdCompressLevel int -} - -func NewDefaultFlushOptions() *FlushOptions { - const zstdFastestLevel = -5 - return &FlushOptions{ - ext1: 0, - ext2: 0, - zstdCompressLevel: zstdFastestLevel, - } -} - -type FlushOption func(*FlushOptions) - -func WithExt(ext1, ext2 uint64) FlushOption { - return func(o *FlushOptions) { - o.ext1 = ext1 - o.ext2 = ext2 - } -} - -func WithZstdCompressLevel(level int) FlushOption { - return func(o *FlushOptions) { - o.zstdCompressLevel = level - } -} - -func NewBlockFormer(blockType string, writer *BlocksWriter, blockSize int, buf []byte) *BlockFormer { - return &BlockFormer{ - Buf: buf[:0], - writer: writer, - blockThreshold: blockSize, - start: time.Now(), - stats: BlockStats{Name: blockType}, - } -} - -func (b *BlockFormer) FlushIfNeeded(options ...FlushOption) (bool, error) { - if len(b.Buf) > b.blockThreshold { - return true, b.FlushForced(options...) - } - return false, nil -} - -func (b *BlockFormer) FlushForced(options ...FlushOption) error { - if len(b.Buf) == 0 { - return nil - } - - o := NewDefaultFlushOptions() - for _, applyFn := range options { - applyFn(o) - } - - n, err := b.writer.WriteBlock(b.stats.Name, b.Buf, true, o.zstdCompressLevel, o.ext1, o.ext2) - if err != nil { - return err - } - - b.stats.Blocks++ - b.stats.Raw += uint64(len(b.Buf)) - b.stats.Comp += uint64(n) - - b.Buf = b.Buf[:0] - return nil -} - -func (b *BlockFormer) GetStats() *BlockStats { - b.stats.Duration = time.Since(b.start) - return &b.stats -} diff --git a/storage/blocks_stats.go b/storage/blocks_stats.go deleted file mode 100644 index 02af6987..00000000 --- a/storage/blocks_stats.go +++ /dev/null @@ -1,50 +0,0 @@ -package storage - -import ( - "time" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/util" -) - -type BlockStats struct { - Name string - Raw uint64 - Comp uint64 - Blocks uint64 - Duration time.Duration -} - -func (s *BlockStats) WriteLogs() { - ratio := float64(s.Raw) / float64(s.Comp) - logger.Info("seal block stats", - zap.String("type", s.Name), - util.ZapUint64AsSizeStr("raw", s.Raw), - util.ZapUint64AsSizeStr("compressed", s.Comp), - util.ZapFloat64WithPrec("ratio", ratio, 2), - zap.Uint64("blocks_count", s.Blocks), - util.ZapDurationWithPrec("write_duration_ms", s.Duration, "ms", 0), - ) -} - -type SealingStats []*BlockStats - -func (s SealingStats) getOverall() *BlockStats { - overall := &BlockStats{Name: "overall"} - for _, blockStats := range s { - overall.Raw += blockStats.Raw - overall.Comp += blockStats.Comp - overall.Blocks += blockStats.Blocks - overall.Duration += blockStats.Duration - } - return overall -} - -func (s SealingStats) WriteLogs() { - for _, blockStats := range s { - blockStats.WriteLogs() - } - s.getOverall().WriteLogs() -} diff --git a/storage/blocks_writer.go b/storage/blocks_writer.go deleted file mode 100644 index 9338f107..00000000 --- a/storage/blocks_writer.go +++ /dev/null @@ -1,100 +0,0 @@ -package storage - -import ( - "encoding/binary" - "io" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/bytespool" - "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/zstd" -) - -type BlocksWriter struct { - writeSeeker io.WriteSeeker - curIndex uint32 - blocksRegistry []byte -} - -func NewBlocksWriter(ws io.WriteSeeker) *BlocksWriter { - return &BlocksWriter{ - writeSeeker: ws, - } -} - -func (w *BlocksWriter) appendBlocksRegistry(entry IndexBlockHeader) { - w.blocksRegistry = append(w.blocksRegistry, entry...) - w.curIndex++ -} - -func (w *BlocksWriter) GetBlockIndex() uint32 { - return w.curIndex -} - -func (w *BlocksWriter) WriteEmptyBlock() { - header := NewEmptyIndexBlockHeader() - w.appendBlocksRegistry(header) -} - -func (w *BlocksWriter) WriteBlock(blockType string, data []byte, compress bool, zstdLevel int, ext1, ext2 uint64) (uint32, error) { - codec := CodecNo - finalData := data - if compress { - codec = CodecZSTD - compressed := bytespool.Acquire(len(data) + consts.RegularBlockSize) - defer bytespool.Release(compressed) - finalData = zstd.CompressLevel(data, compressed.B, zstdLevel) - if len(finalData) >= len(data) { - codec = CodecNo - finalData = data - } - } - - pos, err := w.writeSeeker.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - - w.appendBlocksRegistry(NewIndexBlockHeader(pos, ext1, ext2, data, finalData, codec)) - if _, err = w.writeSeeker.Write(finalData); err != nil { - return 0, err - } - - logger.Debug("write block", - zap.String("block_type", blockType), - zap.Int("raw", len(data)), - zap.Int("written", len(finalData)), - zap.Bool("compressed", compress), - zap.Uint8("codec", uint8(codec)), - ) - - return uint32(len(finalData)), nil -} - -func (w *BlocksWriter) WriteBlocksRegistry() error { - pos, err := w.writeSeeker.Seek(0, io.SeekEnd) - if err != nil { - return err - } - - size, err := w.writeSeeker.Write(w.blocksRegistry) - if err != nil { - return err - } - - buf := make([]byte, 0, 16) - buf = binary.LittleEndian.AppendUint64(buf, uint64(pos)) - buf = binary.LittleEndian.AppendUint64(buf, uint64(size)) - - if _, err = w.writeSeeker.Seek(0, io.SeekStart); err != nil { - return err - } - - if _, err = w.writeSeeker.Write(buf); err != nil { - return err - } - - return nil -} diff --git a/storage/index_block_header.go b/storage/index_block_header.go index 20bce101..18296ddd 100644 --- a/storage/index_block_header.go +++ b/storage/index_block_header.go @@ -22,12 +22,12 @@ func NewEmptyIndexBlockHeader() IndexBlockHeader { return make(IndexBlockHeader, IndexBlockHeaderSize) } -func NewIndexBlockHeader(pos int64, ext1, ext2 uint64, origBuff, finalBuf []byte, codec Codec) IndexBlockHeader { +func NewIndexBlockHeader(pos int64, ext1, ext2 uint64, size, rawSize uint32, codec Codec) IndexBlockHeader { header := NewEmptyIndexBlockHeader() header.SetExt1(ext1) header.SetExt2(ext2) - header.SetLen(uint32(len(finalBuf))) - header.SetRawLen(uint32(len(origBuff))) + header.SetLen(size) + header.SetRawLen(rawSize) header.SetCodec(codec) header.SetPos(uint64(pos)) return header diff --git a/tests/setup/env.go b/tests/setup/env.go index 38658a76..e95543ce 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -22,7 +22,7 @@ import ( "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/mappingprovider" @@ -34,7 +34,7 @@ import ( "github.com/ozontech/seq-db/seq" seqs3 "github.com/ozontech/seq-db/storage/s3" "github.com/ozontech/seq-db/storeapi" - "github.com/ozontech/seq-db/tests/common" + testscommon "github.com/ozontech/seq-db/tests/common" ) type TestingEnvConfig struct { @@ -91,7 +91,7 @@ func (cfg *TestingEnvConfig) GetFracManagerConfig(replicaID string) fracmanager. c = fracmanager.FillConfigWithDefault(&fracmanager.Config{ FracSize: 256 * uint64(units.MiB), TotalSize: 1 * uint64(units.GiB), - SealParams: frac.SealParams{ + SealParams: common.SealParams{ IDsZstdLevel: fastestZstdLevel, LIDsZstdLevel: fastestZstdLevel, TokenListZstdLevel: fastestZstdLevel, @@ -264,7 +264,7 @@ func (cfg *TestingEnvConfig) MakeStores( for i := range confs { k := i / replicas - common.CreateDir(confs[i].FracManager.DataDir) + testscommon.CreateDir(confs[i].FracManager.DataDir) mappingProvider, err := mappingprovider.New( "", @@ -429,7 +429,7 @@ func (t *TestingEnv) IngestorFetchAddr() string { } func randomListener() (lis net.Listener) { - lis, err := net.Listen("tcp", fmt.Sprintf("%s:0", common.Localhost)) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:0", testscommon.Localhost)) if err != nil { panic(err) } diff --git a/util/fs.go b/util/fs.go index 71502527..a31e9c29 100644 --- a/util/fs.go +++ b/util/fs.go @@ -13,17 +13,8 @@ import ( ) func MustSyncPath(path string) { - d, err := os.Open(path) - if err != nil { - logger.Panic("cannot open file for fsync", zap.Error(err)) - } - if err = d.Sync(); err != nil { - _ = d.Close() - logger.Panic("cannot flush path to storage", zap.String("path", path), zap.Error(err)) - } - - if err = d.Close(); err != nil { - logger.Panic("cannot close path", zap.String("path", path), zap.Error(err)) + if err := SyncPath(path); err != nil { + logger.Panic("cannot sync path", zap.String("path", path), zap.Error(err)) } } @@ -36,3 +27,19 @@ func MustRemoveFileByPath(path string) { ) } } + +func SyncPath(path string) error { + d, err := os.Open(path) + if err != nil { + return err + } + if err := d.Sync(); err != nil { + _ = d.Close() + return err + } + + if err := d.Close(); err != nil { + return err + } + return nil +}