Skip to content

Commit 72cb649

Browse files
authored
chore: introduce mw to route requests between v1 and v2 engines (#19452)
1 parent 1a7b489 commit 72cb649

File tree

9 files changed

+747
-57
lines changed

9 files changed

+747
-57
lines changed

docs/sources/shared/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4849,6 +4849,15 @@ engine_v2:
48494849
# CLI flag: -querier.engine-v2.enable
48504850
[enable: <boolean> | default = false]
48514851
4852+
# Amount of time until data objects are available.
4853+
# CLI flag: -querier.engine-v2.dataobj-storage-lag
4854+
[dataobj_storage_lag: <duration> | default = 1h]
4855+
4856+
# Initial date when data objects became available. Format YYYY-MM-DD. If not
4857+
# set, assume data objects are always available no matter how far back.
4858+
# CLI flag: -querier.engine-v2.dataobj-storage-start
4859+
[dataobj_storage_start: <time> | default = 0]
4860+
48524861
# Experimental: Batch size of the next generation query engine.
48534862
# CLI flag: -querier.engine-v2.batch-size
48544863
[batch_size: <int> | default = 100]

pkg/engine/engine.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"go.opentelemetry.io/otel/codes"
1818
"go.opentelemetry.io/otel/trace"
1919

20+
dskit_flagext "github.com/grafana/dskit/flagext"
21+
2022
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
2123
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
2224
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
@@ -68,6 +70,9 @@ type Config struct {
6870
// Enable the next generation Loki Query Engine for supported queries.
6971
Enable bool `yaml:"enable" category:"experimental"`
7072

73+
DataobjStorageLag time.Duration `yaml:"dataobj_storage_lag" category:"experimental"`
74+
DataobjStorageStart dskit_flagext.Time `yaml:"dataobj_storage_start" category:"experimental"`
75+
7176
// Batch size of the v2 execution engine.
7277
BatchSize int `yaml:"batch_size" category:"experimental"`
7378

@@ -78,11 +83,18 @@ type Config struct {
7883
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`
7984
}
8085

81-
func (opts *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
82-
f.BoolVar(&opts.Enable, prefix+"enable", false, "Experimental: Enable next generation query engine for supported queries.")
83-
f.IntVar(&opts.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
84-
f.IntVar(&opts.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
85-
opts.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
86+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
87+
f.BoolVar(&cfg.Enable, prefix+"enable", false, "Experimental: Enable next generation query engine for supported queries.")
88+
f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
89+
f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
90+
cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
91+
92+
f.DurationVar(&cfg.DataobjStorageLag, prefix+"dataobj-storage-lag", 1*time.Hour, "Amount of time until data objects are available.")
93+
f.Var(&cfg.DataobjStorageStart, prefix+"dataobj-storage-start", "Initial date when data objects became available. Format YYYY-MM-DD. If not set, assume data objects are always available no matter how far back.")
94+
}
95+
96+
func (cfg *Config) ValidQueryRange() (time.Time, time.Time) {
97+
return time.Time(cfg.DataobjStorageStart).UTC(), time.Now().UTC().Add(-cfg.DataobjStorageLag)
8698
}
8799

88100
// QueryEngine combines logical planning, physical planning, and execution to evaluate LogQL queries.
@@ -276,6 +288,11 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
276288
return builder.Build(stats, metadataCtx), nil
277289
}
278290

291+
func IsQuerySupported(params logql.Params) bool {
292+
_, err := logical.BuildPlan(params)
293+
return err == nil
294+
}
295+
279296
func collectResult(ctx context.Context, pipeline executor.Pipeline, builder ResultBuilder) error {
280297
for {
281298
rec, err := pipeline.Read(ctx)

pkg/loki/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,7 @@ func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
11461146
middleware, stopper, err := queryrange.NewMiddleware(
11471147
t.Cfg.QueryRange,
11481148
t.Cfg.Querier.Engine,
1149+
t.Cfg.Querier.EngineV2,
11491150
ingesterQueryOptions{t.Cfg.Querier},
11501151
util_log.Logger,
11511152
t.Overrides,

pkg/querier/http.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,8 @@ func hasDataObjectsAvailable(config Config, start, end time.Time) bool {
113113
// Data objects in object storage lag behind 20-30 minutes.
114114
// We are generous and only enable v2 engine queries that end earlier than 1DataObjStorageLag ago (default 1h),
115115
// to ensure data objects are available.
116-
if config.DataobjStorageStart != "" {
117-
startTime, _ := time.Parse("2006-01-02", config.DataobjStorageStart) // already validated
118-
return end.Before(time.Now().Add(-1*config.DataobjStorageLag.Abs())) && start.After(startTime)
119-
}
120-
// no start time; assume we always have data objects no matter how far back
121-
return end.Before(time.Now().Add(-1 * config.DataobjStorageLag.Abs()))
116+
v2Start, v2End := config.EngineV2.ValidQueryRange()
117+
return end.Before(v2End) && start.After(v2Start)
122118
}
123119

124120
// InstantQueryHandler is a http.HandlerFunc for instant queries.
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package queryrange
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"slices"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/pkg/errors"
12+
13+
"github.com/grafana/loki/v3/pkg/engine"
14+
"github.com/grafana/loki/v3/pkg/logproto"
15+
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
16+
)
17+
18+
// engineReqResp represents a request with its result channel
19+
type engineReqResp struct {
20+
lokiResult
21+
isV2Engine bool
22+
}
23+
24+
// engineRouter handles splitting queries between V1 and V2 engines
25+
type engineRouter struct {
26+
v2Start, v2End time.Time // v2 engine time range
27+
forMetricQuery bool
28+
29+
v1Next queryrangebase.Handler
30+
v2Next queryrangebase.Handler
31+
32+
merger queryrangebase.Merger
33+
34+
logger log.Logger
35+
}
36+
37+
// newEngineRouterMiddleware creates a middleware that splits and routes part of the query
38+
// to v2 engine if the query is supported by it.
39+
func newEngineRouterMiddleware(
40+
v2Start, v2End time.Time,
41+
v2EngineHandler queryrangebase.Handler,
42+
v1Chain []queryrangebase.Middleware,
43+
merger queryrangebase.Merger,
44+
metricQuery bool,
45+
logger log.Logger,
46+
) queryrangebase.Middleware {
47+
if v2EngineHandler == nil {
48+
panic("v2EngineHandler cannot be nil")
49+
}
50+
51+
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
52+
return &engineRouter{
53+
v2Start: v2Start,
54+
v2End: v2End,
55+
v1Next: queryrangebase.MergeMiddlewares(v1Chain...).Wrap(next),
56+
v2Next: v2EngineHandler,
57+
merger: merger,
58+
logger: logger,
59+
forMetricQuery: metricQuery,
60+
}
61+
})
62+
}
63+
64+
func (e *engineRouter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
65+
// if query is entirely before or after v2 engine range, process using next handler.
66+
// ignore any boundary overlap, splitting requests that fall on bounary would result in tiny requests.
67+
if !r.GetEnd().After(e.v2Start) || !r.GetStart().Before(e.v2End) {
68+
return e.v1Next.Do(ctx, r)
69+
}
70+
71+
params, err := ParamsFromRequest(r)
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
// Unsupported queries should be entirely executed by chunks.
77+
if !engine.IsQuerySupported(params) {
78+
return e.v1Next.Do(ctx, r)
79+
}
80+
81+
inputs := e.splitOverlapping(r, e.v2Start, e.v2End)
82+
83+
// for log queries, order the splits to return early on hitting limits.
84+
var limit uint32
85+
if !e.forMetricQuery && len(inputs) > 1 {
86+
r, ok := r.(*LokiRequest)
87+
if !ok {
88+
level.Error(e.logger).Log("msg", "engine router received unexpected request type", "type", fmt.Sprintf("%T", r))
89+
return nil, errors.New("engine router: unexpected request type")
90+
}
91+
92+
limit = r.Limit
93+
94+
if r.Direction == logproto.BACKWARD {
95+
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
96+
return b.req.GetStart().Compare(a.req.GetStart())
97+
})
98+
} else {
99+
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
100+
return a.req.GetStart().Compare(b.req.GetStart())
101+
})
102+
}
103+
}
104+
105+
responses, err := e.process(ctx, inputs, limit)
106+
if err != nil {
107+
return nil, err
108+
}
109+
110+
// Merge responses
111+
return e.merger.MergeResponse(responses...)
112+
}
113+
114+
// splitOverlapping breaks down the request into multiple ranges based on the V2 engine time range.
115+
// It returns a max of 3 requests:
116+
// - one for the range before V2 engine
117+
// - one for the range overlapping V2 engine range
118+
// - one for the range after V2 engine
119+
func (e *engineRouter) splitOverlapping(r queryrangebase.Request, v2Start, v2End time.Time) []*engineReqResp {
120+
var (
121+
reqs []*engineReqResp
122+
123+
stepNs = r.GetStep() * int64(time.Millisecond)
124+
gap = time.Duration(stepNs)
125+
)
126+
127+
// metric query splits are separated by a gap of 1 step. This is to ensure a step is included only in a single split.
128+
if !e.forMetricQuery {
129+
gap = 0
130+
}
131+
132+
// align the ranges by step before splitting.
133+
start, end := alignStartEnd(stepNs, r.GetStart(), r.GetEnd())
134+
v2Start, v2End = alignStartEnd(stepNs, v2Start, v2End)
135+
136+
// chunk req before V2 engine range
137+
if start.Before(v2Start) {
138+
reqs = append(reqs, &engineReqResp{
139+
lokiResult: lokiResult{
140+
req: r.WithStartEnd(start, v2Start.Add(-gap)), // add gap between splits
141+
ch: make(chan *packedResp),
142+
},
143+
isV2Engine: false,
144+
})
145+
}
146+
147+
addSplitGap := false
148+
// chunk req after V2 engine range
149+
if end.After(v2End) {
150+
reqs = append(reqs, &engineReqResp{
151+
lokiResult: lokiResult{
152+
req: r.WithStartEnd(v2End, end),
153+
ch: make(chan *packedResp),
154+
},
155+
isV2Engine: false,
156+
})
157+
158+
// add gap after v2 query only if there is a chunk query after it.
159+
addSplitGap = true
160+
}
161+
162+
if start.After(v2Start) {
163+
v2Start = start
164+
}
165+
if end.Before(v2End) {
166+
v2End = end
167+
} else if addSplitGap {
168+
v2End = v2End.Add(-gap)
169+
}
170+
171+
return append(reqs, &engineReqResp{
172+
lokiResult: lokiResult{
173+
req: r.WithStartEnd(v2Start, v2End),
174+
ch: make(chan *packedResp),
175+
},
176+
isV2Engine: true,
177+
})
178+
}
179+
180+
func (e *engineRouter) handleReq(ctx context.Context, r *engineReqResp) {
181+
var resp packedResp
182+
if r.isV2Engine {
183+
resp.resp, resp.err = e.v2Next.Do(ctx, r.req)
184+
} else {
185+
resp.resp, resp.err = e.v1Next.Do(ctx, r.req)
186+
}
187+
188+
select {
189+
case <-ctx.Done():
190+
return
191+
case r.ch <- &resp:
192+
}
193+
}
194+
195+
// process executes the inputs in parallel and collects the responses.
196+
func (e *engineRouter) process(ctx context.Context, inputs []*engineReqResp, limit uint32) ([]queryrangebase.Response, error) {
197+
ctx, cancel := context.WithCancelCause(ctx)
198+
defer cancel(errors.New("engine router process cancelled"))
199+
200+
// Run all requests in parallel as we only get a max of 3 splits.
201+
for _, r := range inputs {
202+
go e.handleReq(ctx, r)
203+
}
204+
205+
var responses []queryrangebase.Response
206+
var count int64
207+
for _, x := range inputs {
208+
select {
209+
case <-ctx.Done():
210+
return nil, ctx.Err()
211+
case data := <-x.ch:
212+
if data.err != nil {
213+
return nil, data.err
214+
}
215+
216+
responses = append(responses, data.resp)
217+
if limit > 0 {
218+
// exit early if limit has been reached
219+
if r, ok := data.resp.(*LokiResponse); ok {
220+
count += r.Count()
221+
if count >= int64(limit) {
222+
return responses, nil
223+
}
224+
}
225+
}
226+
227+
}
228+
}
229+
230+
return responses, nil
231+
}
232+
233+
// alignStartEnd aligns start and end times to step boundaries.
234+
func alignStartEnd(stepNs int64, start, end time.Time) (time.Time, time.Time) {
235+
startNs := start.UnixNano()
236+
endNs := end.UnixNano()
237+
238+
startNs -= startNs % stepNs // round down
239+
if mod := endNs % stepNs; mod != 0 {
240+
endNs += stepNs - mod // round up
241+
}
242+
243+
return time.Unix(0, startNs), time.Unix(0, endNs)
244+
}

0 commit comments

Comments
 (0)