Skip to content

Commit a48a04f

Browse files
committed
feat(sealing): new sealing using interface to separate from active fraction implementation
1 parent 1e717bc commit a48a04f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1934
-1448
lines changed

bytespool/writer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ func AcquireWriterSize(out io.Writer, size int) *Writer {
2929
}
3030
}
3131

32-
func FlushReleaseWriter(w *Writer) error {
33-
err := w.Flush()
34-
if err != nil {
35-
return err
36-
}
32+
func ReleaseWriter(w *Writer) {
3733
Release(w.Buf)
3834
w.Buf = nil
3935
w.out = nil
4036
writerPool.Put(w)
37+
}
38+
39+
func FlushReleaseWriter(w *Writer) error {
40+
if err := w.Flush(); err != nil {
41+
return err
42+
}
43+
ReleaseWriter(w)
4144
return nil
4245
}
4346

cmd/distribution/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import (
1111

1212
"github.com/ozontech/seq-db/cache"
1313
"github.com/ozontech/seq-db/consts"
14-
"github.com/ozontech/seq-db/frac"
14+
"github.com/ozontech/seq-db/frac/common"
15+
"github.com/ozontech/seq-db/frac/sealed"
1516
"github.com/ozontech/seq-db/fracmanager"
1617
"github.com/ozontech/seq-db/logger"
1718
"github.com/ozontech/seq-db/seq"
@@ -58,7 +59,7 @@ func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) {
5859
return data, nil
5960
}
6061

61-
func loadInfo(path string) *frac.Info {
62+
func loadInfo(path string) *common.Info {
6263
indexReader, f := getReader(path)
6364
result, err := readBlock(indexReader, 0)
6465
if err != nil {
@@ -69,7 +70,7 @@ func loadInfo(path string) *frac.Info {
6970
logger.Fatal("seq-db index file header corrupted", zap.String("file", path))
7071
}
7172

72-
b := frac.BlockInfo{}
73+
b := sealed.BlockInfo{}
7374
err = b.Unpack(result)
7475
if err != nil {
7576
logger.Fatal("can't unpack info bloc of index file", zap.String("file", path), zap.Error(err))
@@ -84,7 +85,7 @@ func loadInfo(path string) *frac.Info {
8485
return b.Info
8586
}
8687

87-
func buildDist(dist *seq.MIDsDistribution, path string, _ *frac.Info) {
88+
func buildDist(dist *seq.MIDsDistribution, path string, _ *common.Info) {
8889
blocksReader, _ := getReader(path)
8990

9091
// skip tokens

cmd/index_analyzer/main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/alecthomas/units"
1111
"go.uber.org/zap"
1212

13-
"github.com/ozontech/seq-db/frac"
13+
"github.com/ozontech/seq-db/frac/sealed"
1414
"github.com/ozontech/seq-db/frac/sealed/lids"
1515
"github.com/ozontech/seq-db/frac/sealed/token"
1616
"github.com/ozontech/seq-db/fracmanager"
@@ -91,8 +91,11 @@ func analyzeIndex(
9191
}
9292

9393
// load info
94-
b := frac.BlockInfo{}
95-
_ = b.Unpack(readBlock())
94+
var b sealed.BlockInfo
95+
if err := b.Unpack(readBlock()); err != nil {
96+
logger.Fatal("error unpacking block info", zap.Error(err))
97+
}
98+
9699
docsCount := int(b.Info.DocsTotal)
97100

98101
// load tokens

cmd/seq-db/seq-db.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/ozontech/seq-db/config"
2323
"github.com/ozontech/seq-db/consts"
2424
"github.com/ozontech/seq-db/frac"
25+
"github.com/ozontech/seq-db/frac/common"
2526
"github.com/ozontech/seq-db/fracmanager"
2627
"github.com/ozontech/seq-db/logger"
2728
"github.com/ozontech/seq-db/mappingprovider"
@@ -258,7 +259,7 @@ func startStore(
258259
MaintenanceDelay: 0,
259260
CacheGCDelay: 0,
260261
CacheCleanupDelay: 0,
261-
SealParams: frac.SealParams{
262+
SealParams: common.SealParams{
262263
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
263264
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
264265
TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel,

consts/consts.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ const (
1111
// DummyMID is used in aggregations when we do not need to build time series.
1212
DummyMID = 0
1313

14-
IDsBlockSize = int(4 * units.KiB)
15-
RegularBlockSize = int(16 * units.KiB)
1614
IDsPerBlock = int(4 * units.KiB)
1715
LIDBlockCap = int(64 * units.KiB)
16+
RegularBlockSize = int(16 * units.KiB)
1817

1918
DefaultMaintenanceDelay = time.Second
2019
DefaultCacheGCDelay = 1 * time.Second

frac/active.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ozontech/seq-db/cache"
1717
"github.com/ozontech/seq-db/config"
1818
"github.com/ozontech/seq-db/consts"
19+
"github.com/ozontech/seq-db/frac/common"
1920
"github.com/ozontech/seq-db/logger"
2021
"github.com/ozontech/seq-db/metric"
2122
"github.com/ozontech/seq-db/metric/stopwatch"
@@ -38,7 +39,7 @@ type Active struct {
3839
released bool
3940

4041
infoMu sync.RWMutex
41-
info *Info
42+
info *common.Info
4243

4344
MIDs *UInt64s
4445
RIDs *UInt64s
@@ -103,7 +104,7 @@ func NewActive(
103104
writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),
104105

105106
BaseFileName: baseFileName,
106-
info: NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
107+
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
107108
Config: cfg,
108109
}
109110

@@ -300,7 +301,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
300301
}
301302
}
302303

303-
func (f *Active) Info() *Info {
304+
func (f *Active) Info() *common.Info {
304305
f.infoMu.RLock()
305306
defer f.infoMu.RUnlock()
306307

frac/active_docs_positions.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,22 @@ import (
77
)
88

99
type DocsPositions struct {
10-
mu sync.RWMutex
11-
positions map[seq.ID]seq.DocPos
10+
mu sync.RWMutex
11+
idToPos map[seq.ID]seq.DocPos
12+
lidToPos []seq.DocPos
1213
}
1314

1415
func NewSyncDocsPositions() *DocsPositions {
15-
return &DocsPositions{
16-
positions: make(map[seq.ID]seq.DocPos),
16+
dp := DocsPositions{
17+
lidToPos: make([]seq.DocPos, 0),
18+
idToPos: make(map[seq.ID]seq.DocPos),
1719
}
20+
dp.lidToPos = append(dp.lidToPos, 0) // systemID
21+
return &dp
1822
}
1923

2024
func (dp *DocsPositions) Get(id seq.ID) seq.DocPos {
21-
if val, ok := dp.positions[id]; ok {
25+
if val, ok := dp.idToPos[id]; ok {
2226
return val
2327
}
2428
return seq.DocPosNotFound
@@ -36,13 +40,22 @@ func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID {
3640
dp.mu.Lock()
3741
defer dp.mu.Unlock()
3842

39-
appended := make([]seq.ID, 0)
43+
appended := make([]seq.ID, 0, len(ids))
4044
for i, id := range ids {
41-
// Positions may be equal in case of nested index.
42-
if savedPos, ok := dp.positions[id]; !ok || savedPos == pos[i] {
43-
dp.positions[id] = pos[i]
44-
appended = append(appended, id)
45+
p, ok := dp.idToPos[id]
46+
47+
if ok {
48+
if p != pos[i] {
49+
// same ID but different position
50+
// this is a duplicate ID, we can't append it
51+
continue
52+
}
53+
} else {
54+
dp.idToPos[id] = pos[i]
4555
}
56+
57+
dp.lidToPos = append(dp.lidToPos, pos[i])
58+
appended = append(appended, id)
4659
}
4760
return appended
4861
}

frac/active_index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/prometheus/client_golang/prometheus"
77
"github.com/prometheus/client_golang/prometheus/promauto"
88

9+
"github.com/ozontech/seq-db/frac/common"
910
"github.com/ozontech/seq-db/frac/processor"
1011
"github.com/ozontech/seq-db/frac/sealed/lids"
1112
"github.com/ozontech/seq-db/metric"
@@ -47,7 +48,7 @@ var (
4748
type activeDataProvider struct {
4849
ctx context.Context
4950
config *Config
50-
info *Info
51+
info *common.Info
5152

5253
mids *UInt64s
5354
rids *UInt64s

0 commit comments

Comments
 (0)