From 7647b8aa528e700f11fda81864c7a9eeacffd1f6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 6 Sep 2021 16:17:42 +0200 Subject: [PATCH 1/2] Fixes @ modifier when splitting queries by time. This will replace `start` and `end` at (`@`) modifier with the actual constant values based on the original queries. Meaning subqueries will not wrongly use their own query start and end time. Fixes #4463 Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/split_by_interval.go | 41 ++++++++++-- .../queryrange/split_by_interval_test.go | 63 +++++++++++++++++-- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index afb12445650..d3f663b07d2 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql/parser" ) type IntervalFn func(r Request) time.Duration @@ -40,7 +41,10 @@ type splitByInterval struct { func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) { // First we're going to build new requests, one for each day, taking care // to line up the boundaries with step. - reqs := splitQuery(r, s.interval(r)) + reqs, err := splitQuery(r, s.interval(r)) + if err != nil { + return nil, err + } s.splitByCounter.Add(float64(len(reqs))) reqResps, err := DoRequests(ctx, s.next, reqs, s.limits) @@ -60,7 +64,13 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) { return response, nil } -func splitQuery(r Request, interval time.Duration) []Request { +func splitQuery(r Request, interval time.Duration) ([]Request, error) { + // Replace @ modifier function to their respective constant values in the query. + // This way subqueries will be evaluated at the same time as the parent query. + query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd()) + if err != nil { + return nil, err + } var reqs []Request for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() { end := nextIntervalBoundary(start, r.GetStep(), interval) @@ -68,9 +78,32 @@ func splitQuery(r Request, interval time.Duration) []Request { end = r.GetEnd() } - reqs = append(reqs, r.WithStartEnd(start, end)) + reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end)) } - return reqs + return reqs, nil +} + +// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps. +// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00` +// If the modifier is already a constant, it will be returned as is. +func evaluateAtModifierFunction(query string, start, end int64) (string, error) { + expr, err := parser.ParseExpr(query) + if err != nil { + return "", err + } + parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { + if selector, ok := n.(*parser.VectorSelector); ok { + switch selector.StartOrEnd { + case parser.START: + selector.Timestamp = &start + case parser.END: + selector.Timestamp = &end + } + selector.StartOrEnd = 0 + } + return nil + }) + return expr.String(), err } // Round up to the step before the next interval boundary. diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 4fff8f72c9a..828077263e8 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -135,20 +136,20 @@ func TestSplitQuery(t *testing.T) { Start: 0, End: 2 * 24 * 3600 * seconds, Step: 15 * seconds, - Query: "foo", + Query: "foo @ start()", }, expected: []Request{ &PrometheusRequest{ Start: 0, End: (24 * 3600 * seconds) - (15 * seconds), Step: 15 * seconds, - Query: "foo", + Query: "foo @ 0.000", }, &PrometheusRequest{ Start: 24 * 3600 * seconds, End: 2 * 24 * 3600 * seconds, Step: 15 * seconds, - Query: "foo", + Query: "foo @ 0.000", }, }, interval: day, @@ -236,14 +237,14 @@ func TestSplitQuery(t *testing.T) { }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - days := splitQuery(tc.input, tc.interval) + days, err := splitQuery(tc.input, tc.interval) + require.NoError(t, err) require.Equal(t, tc.expected, days) }) } } func TestSplitByDay(t *testing.T) { - mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse) require.NoError(t, err) @@ -260,7 +261,6 @@ func TestSplitByDay(t *testing.T) { {query, string(mergedHTTPResponseBody), 2}, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - var actualCount atomic.Int32 s := httptest.NewServer( middleware.AuthenticateUser.Wrap( @@ -298,3 +298,54 @@ func TestSplitByDay(t *testing.T) { }) } } + +func Test_evaluateAtModifier(t *testing.T) { + const ( + start, end = int64(1546300800), int64(1646300800) + ) + for _, tt := range []struct { + in, expected string + }{ + {"topk(5, rate(http_requests_total[1h] @ start()))", "topk(5, rate(http_requests_total[1h] @ 1546300.800))"}, + {"topk(5, rate(http_requests_total[1h] @ 0))", "topk(5, rate(http_requests_total[1h] @ 0.000))"}, + {"http_requests_total[1h] @ 10.001", "http_requests_total[1h] @ 10.001"}, + { + `min_over_time( + sum by(cluster) ( + rate(http_requests_total[5m] @ end()) + )[10m:] + ) + or + max_over_time( + stddev_over_time( + deriv( + rate(http_requests_total[10m] @ start()) + [5m:1m]) + [2m:]) + [10m:])`, + `min_over_time( + sum by(cluster) ( + rate(http_requests_total[5m] @ 1646300.800) + )[10m:] + ) + or + max_over_time( + stddev_over_time( + deriv( + rate(http_requests_total[10m] @ 1546300.800) + [5m:1m]) + [2m:]) + [10m:])`, + }, + } { + tt := tt + t.Run(tt.in, func(t *testing.T) { + t.Parallel() + expectedExpr, err := parser.ParseExpr(tt.expected) + require.NoError(t, err) + out, err := evaluateAtModifierFunction(tt.in, start, end) + require.NoError(t, err) + require.Equal(t, expectedExpr.String(), out) + }) + } +} From b4740fc8d83feba4f1a44b05139e222a1ee2390e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 6 Sep 2021 16:28:49 +0200 Subject: [PATCH 2/2] Update changelog. Signed-off-by: Cyril Tovena --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd3a341aa33..40ae6487292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346 * [ENHANCEMENT] Update Go version to 1.16.6. #4362 * [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380 +* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464 * [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335