@@ -3,6 +3,7 @@ package fracmanager
3
3
import (
4
4
"bufio"
5
5
"io"
6
+ "math/rand"
6
7
"os"
7
8
"path/filepath"
8
9
"strconv"
@@ -11,112 +12,112 @@ import (
11
12
"time"
12
13
13
14
insaneJSON "github.com/ozontech/insane-json"
15
+ "github.com/pkg/profile"
14
16
"github.com/stretchr/testify/assert"
15
17
16
18
"github.com/ozontech/seq-db/consts"
17
- "github.com/ozontech/seq-db/disk"
18
19
"github.com/ozontech/seq-db/frac"
19
20
"github.com/ozontech/seq-db/seq"
20
21
"github.com/ozontech/seq-db/tests/common"
21
22
)
22
23
23
- func fillActiveFraction (active * frac.Active , wg * sync. WaitGroup ) error {
24
+ func fillActiveFraction (active * frac.Active ) error {
24
25
const muliplier = 10
25
26
26
27
docRoot := insaneJSON .Spawn ()
27
28
defer insaneJSON .Release (docRoot )
28
29
29
- dp := frac .NewDocProvider ()
30
-
31
30
file , err := os .Open (filepath .Join (common .TestDataDir , "k8s.logs" ))
32
31
if err != nil {
33
32
return err
34
33
}
35
34
defer file .Close ()
35
+
36
+ k := 0
37
+ wg := sync.WaitGroup {}
38
+ dp := frac .NewDocProvider ()
36
39
for i := 0 ; i < muliplier ; i ++ {
37
40
dp .TryReset ()
38
- _ , err := file . Seek ( 0 , io . SeekStart )
39
- if err != nil {
41
+
42
+ if _ , err := file . Seek ( 0 , io . SeekStart ); err != nil {
40
43
return err
41
44
}
45
+
42
46
scanner := bufio .NewScanner (file )
43
47
for scanner .Scan () {
48
+ k ++
44
49
doc := scanner .Bytes ()
45
- err := docRoot .DecodeBytes (doc )
46
- if err != nil {
50
+ if err := docRoot .DecodeBytes (doc ); err != nil {
47
51
return err
48
52
}
49
- tokens := seq .Tokens ("_all_:" , "service:100500" , "k8s_pod:" + strconv .Itoa (i ))
50
- dp .Append (doc , docRoot , seq .SimpleID (0 ), tokens )
53
+
54
+ id := seq .NewID (time .Now (), uint64 (rand .Int63 ()))
55
+ dp .Append (doc , docRoot , id , seq .Tokens (
56
+ "_all_:" ,
57
+ "service:service" + strconv .Itoa (rand .Intn (200 )),
58
+ "k8s_pod1:" + strconv .Itoa (k % 100000 ),
59
+ "k8s_pod2:" + strconv .Itoa (k % 1000000 ),
60
+ ))
51
61
}
52
62
docs , metas := dp .Provide ()
53
63
wg .Add (1 )
54
- if err := active .Append (docs , metas , wg ); err != nil {
64
+ if err := active .Append (docs , metas , & wg ); err != nil {
55
65
return err
56
66
}
57
67
}
58
68
69
+ wg .Wait ()
59
70
return nil
60
71
}
61
72
62
- func getCacheMaintainer () (* CacheMaintainer , func ()) {
63
- done := make (chan struct {})
64
- cm := NewCacheMaintainer (32 * consts .MB , 24 * consts .MB , nil )
65
- wg := cm .RunCleanLoop (done , time .Second , time .Second )
66
- return cm , func () {
67
- close (done )
68
- wg .Wait ()
73
+ func defaultSealingParams () frac.SealParams {
74
+ const minZstdLevel = 1
75
+ return frac.SealParams {
76
+ IDsZstdLevel : minZstdLevel ,
77
+ LIDsZstdLevel : minZstdLevel ,
78
+ TokenListZstdLevel : minZstdLevel ,
79
+ DocsPositionsZstdLevel : minZstdLevel ,
80
+ TokenTableZstdLevel : minZstdLevel ,
81
+ DocBlocksZstdLevel : minZstdLevel ,
82
+ DocBlockSize : 128 * consts .KB ,
69
83
}
70
84
}
71
85
72
- func BenchmarkSealing (b * testing.B ) {
73
- b .ResetTimer ()
74
- b .StopTimer ()
75
- b .ReportAllocs ()
86
+ func Benchmark_SealingNoSort (b * testing.B ) {
87
+ runSealingBench (b , & frac.Config {SkipSortDocs : true })
88
+ }
89
+
90
+ func Benchmark_SealingWithSort (b * testing.B ) {
91
+ runSealingBench (b , & frac.Config {})
92
+ }
76
93
77
- cm , stopFn := getCacheMaintainer ()
78
- defer stopFn ()
94
+ func runSealingBench (b * testing.B , cfg * frac.Config ) {
95
+ cm := NewCacheMaintainer (consts .MB * 64 , consts .MB * 64 , nil )
96
+ fp := newFractionProvider (cfg , cm , 1 , 1 )
97
+ defer fp .Stop ()
79
98
80
99
dataDir := filepath .Join (b .TempDir (), "BenchmarkSealing" )
81
100
common .RecreateDir (dataDir )
82
101
83
- readLimiter := disk . NewReadLimiter ( 1 , nil )
84
-
85
- activeIndexer := frac . NewActiveIndexer ( 10 , 10 )
102
+ active := fp . NewActive ( filepath . Join ( dataDir , "test" ) )
103
+ err := fillActiveFraction ( active )
104
+ assert . NoError ( b , err )
86
105
87
- activeIndexer .Start ()
88
- defer activeIndexer .Stop ()
106
+ params := defaultSealingParams ()
107
+ // The first sealing will sort all the LIDs, so we take this load out of the measurement range
108
+ _ , err = frac .Seal (active , params )
109
+ assert .NoError (b , err )
89
110
90
- const minZstdLevel = - 5
91
- defaultSealParams := frac.SealParams {
92
- IDsZstdLevel : minZstdLevel ,
93
- LIDsZstdLevel : minZstdLevel ,
94
- TokenListZstdLevel : minZstdLevel ,
95
- DocsPositionsZstdLevel : minZstdLevel ,
96
- TokenTableZstdLevel : minZstdLevel ,
97
- DocBlocksZstdLevel : minZstdLevel ,
98
- DocBlockSize : consts .MB * 4 ,
99
- }
100
- for i := 0 ; i < b .N ; i ++ {
101
- wg := sync.WaitGroup {}
102
- active := frac .NewActive (
103
- filepath .Join (dataDir , "test_" + strconv .Itoa (i )),
104
- activeIndexer ,
105
- readLimiter ,
106
- cm .CreateDocBlockCache (),
107
- cm .CreateSortDocsCache (),
108
- & frac.Config {},
109
- )
110
- err := fillActiveFraction (active , & wg )
111
- assert .NoError (b , err )
111
+ b .ReportAllocs ()
112
112
113
- wg .Wait ()
114
- active .GetAllDocuments () // emulate search-pre-sorted LIDs
113
+ defer profile .Start ( // turn on profiling (look for the file fracmanager/cpu.pprof)
114
+ profile .CPUProfile ,
115
+ profile .ProfilePath ("." ),
116
+ profile .NoShutdownHook ,
117
+ ).Stop ()
115
118
116
- b . StartTimer ()
117
- _ , err = frac .Seal (active , defaultSealParams )
119
+ for b . Loop () {
120
+ _ , err = frac .Seal (active , params )
118
121
assert .NoError (b , err )
119
-
120
- b .StopTimer ()
121
122
}
122
123
}
0 commit comments