Skip to content

Commit 0b03281

Browse files
committed
Query: bump promql-engine
Bumping PromQL engine, fixing fallback fallout. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 4ba7d59 commit 0b03281

File tree

9 files changed

+314
-234
lines changed

9 files changed

+314
-234
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2828

2929
- [#7890](https://github.com/thanos-io/thanos/pull/7890) Query,Ruler: *breaking :warning:* deprecated `--store.sd-file` and `--store.sd-interval` to be replaced with `--endpoint.sd-config` and `--endpoint-sd-config-reload-interval`; removed legacy flags to pass endpoints `--store`, `--metadata`, `--rule`, `--exemplar`.
3030
- [#7012](https://github.com/thanos-io/thanos/pull/7012) Query: Automatically adjust `max_source_resolution` based on promql query to avoid querying data from higher resolution resulting empty results.
31+
- [#8118](https://github.com/thanos-io/thanos/pull/8118) Query: Bumped promql-engine
3132

3233
### Removed
3334

cmd/thanos/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ func runQuery(
478478
})
479479
}
480480

481-
engineFactory := apiv1.NewQueryEngineFactory(engineOpts, remoteEngineEndpoints)
481+
engineFactory := apiv1.NewQueryFactory(engineOpts, remoteEngineEndpoints)
482482

483483
lookbackDeltaCreator := LookbackDeltaFactory(engineOpts.EngineOpts, dynamicLookbackDelta)
484484

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ require (
6969
github.com/sony/gobreaker v0.5.0
7070
github.com/stretchr/testify v1.10.0
7171
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
72-
github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e
72+
github.com/thanos-io/promql-engine v0.0.0-20250221084015-4230034ebb6c
7373
github.com/uber/jaeger-client-go v2.30.0+incompatible
7474
github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a
7575
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2258,8 +2258,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
22582258
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
22592259
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
22602260
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
2261-
github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e h1:jDKxQzp4JIhpbn6NFHXc8TCsRy8GkfHMZ7XNicY1mx8=
2262-
github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
2261+
github.com/thanos-io/promql-engine v0.0.0-20250221084015-4230034ebb6c h1:p/XSjq43vYSorePYsxethwoNpwrJp6HD4QQ7xOH2DGI=
2262+
github.com/thanos-io/promql-engine v0.0.0-20250221084015-4230034ebb6c/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
22632263
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
22642264
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
22652265
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=

pkg/api/query/engine.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
// Copyright 2016 The Prometheus Authors
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
// This package is a modified copy from
18+
// github.com/prometheus/prometheus/web/api/v1@2121b4628baa7d9d9406aa468712a6a332e77aff.
19+
20+
package v1
21+
22+
import (
23+
"context"
24+
"time"
25+
26+
"github.com/prometheus/prometheus/promql"
27+
"github.com/prometheus/prometheus/storage"
28+
"github.com/thanos-io/promql-engine/api"
29+
"github.com/thanos-io/promql-engine/engine"
30+
"github.com/thanos-io/promql-engine/logicalplan"
31+
)
32+
33+
type Engine interface {
34+
MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error)
35+
MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error)
36+
MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error)
37+
MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error)
38+
}
39+
40+
type prometheusEngineAdapter struct {
41+
engine promql.QueryEngine
42+
}
43+
44+
func (a *prometheusEngineAdapter) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
45+
return a.engine.NewInstantQuery(ctx, q, opts, qs, ts)
46+
}
47+
48+
func (a *prometheusEngineAdapter) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
49+
return a.engine.NewRangeQuery(ctx, q, opts, qs, start, end, step)
50+
}
51+
52+
func (a *prometheusEngineAdapter) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
53+
return a.engine.NewInstantQuery(ctx, q, opts, plan.String(), ts)
54+
}
55+
56+
func (a *prometheusEngineAdapter) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
57+
return a.engine.NewRangeQuery(ctx, q, opts, plan.String(), start, end, step)
58+
}
59+
60+
type QueryFactory struct {
61+
prometheus Engine
62+
thanos Engine
63+
}
64+
65+
func NewQueryFactory(
66+
engineOpts engine.Opts,
67+
remoteEngineEndpoints api.RemoteEndpoints,
68+
) *QueryFactory {
69+
70+
var thanosEngine Engine
71+
if remoteEngineEndpoints == nil {
72+
thanosEngine = engine.New(engineOpts)
73+
} else {
74+
thanosEngine = engine.NewDistributedEngine(engineOpts, remoteEngineEndpoints)
75+
}
76+
promEngine := promql.NewEngine(engineOpts.EngineOpts)
77+
return &QueryFactory{
78+
prometheus: &prometheusEngineAdapter{promEngine},
79+
thanos: thanosEngine,
80+
}
81+
}
82+
83+
// Always has query, sometimes already has a plan.
84+
type planOrQuery struct {
85+
query string
86+
plan logicalplan.Node
87+
}
88+
89+
func (f *QueryFactory) makeInstantQuery(
90+
ctx context.Context,
91+
e PromqlEngineType,
92+
q storage.Queryable,
93+
qry planOrQuery,
94+
opts *engine.QueryOpts,
95+
ts time.Time,
96+
) (res promql.Query, err error) {
97+
if e == PromqlEngineThanos {
98+
if qry.plan != nil {
99+
res, err = f.thanos.MakeInstantQueryFromPlan(ctx, q, opts, qry.plan, ts)
100+
} else {
101+
res, err = f.thanos.MakeInstantQuery(ctx, q, opts, qry.query, ts)
102+
}
103+
if err != nil {
104+
if engine.IsUnimplemented(err) {
105+
goto fallback
106+
}
107+
return nil, err
108+
}
109+
return res, nil
110+
}
111+
fallback:
112+
return f.prometheus.MakeInstantQuery(ctx, q, opts, qry.query, ts)
113+
}
114+
115+
func (f *QueryFactory) makeRangeQuery(
116+
ctx context.Context,
117+
e PromqlEngineType,
118+
q storage.Queryable,
119+
qry planOrQuery,
120+
opts *engine.QueryOpts,
121+
start time.Time,
122+
end time.Time,
123+
step time.Duration,
124+
) (res promql.Query, err error) {
125+
if e == PromqlEngineThanos {
126+
if qry.plan != nil {
127+
res, err = f.thanos.MakeRangeQueryFromPlan(ctx, q, opts, qry.plan, start, end, step)
128+
} else {
129+
res, err = f.thanos.MakeRangeQuery(ctx, q, opts, qry.query, start, end, step)
130+
}
131+
if err != nil {
132+
if engine.IsUnimplemented(err) {
133+
goto fallback
134+
}
135+
return nil, err
136+
}
137+
return res, nil
138+
}
139+
fallback:
140+
return f.prometheus.MakeRangeQuery(ctx, q, opts, qry.query, start, end, step)
141+
}

pkg/api/query/grpc.go

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type GRPCAPI struct {
2626
now func() time.Time
2727
replicaLabels []string
2828
queryableCreate query.QueryableCreator
29-
engineFactory *QueryEngineFactory
29+
queryFactory *QueryFactory
3030
defaultEngine querypb.EngineType
3131
lookbackDeltaCreate func(int64) time.Duration
3232
defaultMaxResolutionSeconds time.Duration
@@ -36,7 +36,7 @@ func NewGRPCAPI(
3636
now func() time.Time,
3737
replicaLabels []string,
3838
creator query.QueryableCreator,
39-
engineFactory *QueryEngineFactory,
39+
queryFactory *QueryFactory,
4040
defaultEngine querypb.EngineType,
4141
lookbackDeltaCreate func(int64) time.Duration,
4242
defaultMaxResolutionSeconds time.Duration,
@@ -45,7 +45,7 @@ func NewGRPCAPI(
4545
now: now,
4646
replicaLabels: replicaLabels,
4747
queryableCreate: creator,
48-
engineFactory: engineFactory,
48+
queryFactory: queryFactory,
4949
defaultEngine: defaultEngine,
5050
lookbackDeltaCreate: lookbackDeltaCreate,
5151
defaultMaxResolutionSeconds: defaultMaxResolutionSeconds,
@@ -97,7 +97,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
9797
var qry promql.Query
9898
if err := tracing.DoInSpanWithErr(ctx, "instant_query_create", func(ctx context.Context) error {
9999
var err error
100-
qry, err = g.getQueryForEngine(ctx, request, queryable, maxResolution)
100+
qry, err = g.getInstantQueryForEngine(ctx, request, queryable, maxResolution)
101101
return err
102102
}); err != nil {
103103
return err
@@ -146,42 +146,6 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
146146
return nil
147147
}
148148

149-
func (g *GRPCAPI) getQueryForEngine(ctx context.Context, request *querypb.QueryRequest, queryable storage.Queryable, maxResolution int64) (promql.Query, error) {
150-
lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
151-
if request.LookbackDeltaSeconds > 0 {
152-
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
153-
}
154-
engineParam := request.Engine
155-
if engineParam == querypb.EngineType_default {
156-
engineParam = g.defaultEngine
157-
}
158-
159-
var ts time.Time
160-
if request.TimeSeconds == 0 {
161-
ts = g.now()
162-
} else {
163-
ts = time.Unix(request.TimeSeconds, 0)
164-
}
165-
opts := &engine.QueryOpts{
166-
LookbackDeltaParam: lookbackDelta,
167-
EnablePartialResponses: request.EnablePartialResponse,
168-
}
169-
switch engineParam {
170-
case querypb.EngineType_prometheus:
171-
queryEngine := g.engineFactory.GetPrometheusEngine()
172-
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
173-
case querypb.EngineType_thanos:
174-
queryEngine := g.engineFactory.GetThanosEngine()
175-
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
176-
if err != nil {
177-
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
178-
}
179-
return queryEngine.MakeInstantQueryFromPlan(ctx, queryable, opts, plan, ts)
180-
default:
181-
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
182-
}
183-
}
184-
185149
func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
186150
ctx := srv.Context()
187151
if request.TimeoutSeconds != 0 {
@@ -295,14 +259,54 @@ func extractQueryStats(qry promql.Query) *querypb.QueryStats {
295259
return stats
296260
}
297261

262+
func (g *GRPCAPI) getInstantQueryForEngine(ctx context.Context, request *querypb.QueryRequest, queryable storage.Queryable, maxResolution int64) (promql.Query, error) {
263+
lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
264+
if request.LookbackDeltaSeconds > 0 {
265+
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
266+
}
267+
engineParam := request.Engine
268+
if engineParam == querypb.EngineType_default {
269+
engineParam = g.defaultEngine
270+
}
271+
272+
var ts time.Time
273+
if request.TimeSeconds == 0 {
274+
ts = g.now()
275+
} else {
276+
ts = time.Unix(request.TimeSeconds, 0)
277+
}
278+
opts := &engine.QueryOpts{
279+
LookbackDeltaParam: lookbackDelta,
280+
EnablePartialResponses: request.EnablePartialResponse,
281+
}
282+
283+
var engineType PromqlEngineType
284+
switch engineParam {
285+
case querypb.EngineType_prometheus:
286+
engineType = PromqlEnginePrometheus
287+
case querypb.EngineType_thanos:
288+
engineType = PromqlEngineThanos
289+
default:
290+
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
291+
}
292+
293+
var qry planOrQuery
294+
if plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson()); err != nil {
295+
qry = planOrQuery{plan: plan}
296+
} else {
297+
qry = planOrQuery{query: request.Query}
298+
}
299+
return g.queryFactory.makeInstantQuery(ctx, engineType, queryable, qry, opts, ts)
300+
}
301+
298302
func (g *GRPCAPI) getRangeQueryForEngine(
299303
ctx context.Context,
300304
request *querypb.QueryRangeRequest,
301305
queryable storage.Queryable,
302306
) (promql.Query, error) {
303-
startTime := time.Unix(request.StartTimeSeconds, 0)
304-
endTime := time.Unix(request.EndTimeSeconds, 0)
305-
interval := time.Duration(request.IntervalSeconds) * time.Second
307+
start := time.Unix(request.StartTimeSeconds, 0)
308+
end := time.Unix(request.EndTimeSeconds, 0)
309+
step := time.Duration(request.IntervalSeconds) * time.Second
306310

307311
engineParam := request.Engine
308312
if engineParam == querypb.EngineType_default {
@@ -322,18 +326,21 @@ func (g *GRPCAPI) getRangeQueryForEngine(
322326
EnablePartialResponses: request.EnablePartialResponse,
323327
}
324328

329+
var engineType PromqlEngineType
325330
switch engineParam {
326331
case querypb.EngineType_prometheus:
327-
queryEngine := g.engineFactory.GetPrometheusEngine()
328-
return queryEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
332+
engineType = PromqlEnginePrometheus
329333
case querypb.EngineType_thanos:
330-
thanosEngine := g.engineFactory.GetThanosEngine()
331-
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
332-
if err != nil {
333-
return thanosEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
334-
}
335-
return thanosEngine.MakeRangeQueryFromPlan(ctx, queryable, opts, plan, startTime, endTime, interval)
334+
engineType = PromqlEngineThanos
336335
default:
337336
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
338337
}
338+
339+
var qry planOrQuery
340+
if plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson()); err != nil {
341+
qry = planOrQuery{plan: plan}
342+
} else {
343+
qry = planOrQuery{query: request.Query}
344+
}
345+
return g.queryFactory.makeRangeQuery(ctx, engineType, queryable, qry, opts, start, end, step)
339346
}

pkg/api/query/grpc_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
3232
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
3333
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
3434
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
35-
engineFactory := &QueryEngineFactory{
35+
queryFactory := &QueryFactory{
3636
thanos: &engineStub{},
3737
}
38-
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)
38+
api := NewGRPCAPI(time.Now, nil, queryableCreator, queryFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)
3939

4040
expr, err := extpromql.ParseExpr("metric")
4141
testutil.Ok(t, err)
@@ -97,10 +97,10 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
9797
}
9898

9999
for _, test := range tests {
100-
engineFactory := &QueryEngineFactory{
100+
queryFactory := &QueryFactory{
101101
prometheus: test.engine,
102102
}
103-
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_prometheus, lookbackDeltaFunc, 0)
103+
api := NewGRPCAPI(time.Now, nil, queryableCreator, queryFactory, querypb.EngineType_prometheus, lookbackDeltaFunc, 0)
104104
t.Run("range_query", func(t *testing.T) {
105105
rangeRequest := &querypb.QueryRangeRequest{
106106
Query: "metric",

0 commit comments

Comments
 (0)