Skip to content

Commit db54372

Browse files
committed
engine: fix range query duplicate label handling
Prometheus seems to care about duplicate samples for label only during the same timestamp. Since our operators might return samples for one result in two different series we need to merge them to one result again. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 4c3bd49 commit db54372

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

engine/engine.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/thanos-io/promql-engine/execution/model"
2828
"github.com/thanos-io/promql-engine/execution/parse"
2929
"github.com/thanos-io/promql-engine/execution/warnings"
30-
"github.com/thanos-io/promql-engine/extlabels"
3130
"github.com/thanos-io/promql-engine/logicalplan"
3231
"github.com/thanos-io/promql-engine/query"
3332
)
@@ -404,22 +403,42 @@ loop:
404403
}
405404

406405
// For range Query we expect always a Matrix value type.
406+
// Note: We have to zip together series that have the same label but
407+
// appear at different timestamps.
408+
// The engine already guarantees that we dont have a label conflict
409+
// in the same timestamp and prometheus accepts series with the same
410+
// labels that are populated at different timestamps just fine.
407411
if q.t == RangeQuery {
408412
matrix := make(promql.Matrix, 0, len(series))
409-
for _, s := range series {
413+
seenAt := make(map[uint64]int, len(series))
414+
for i, s := range series {
410415
if len(s.Floats)+len(s.Histograms) == 0 {
411416
continue
412417
}
413-
matrix = append(matrix, s)
418+
h := s.Metric.Hash()
419+
if j, ok := seenAt[h]; ok {
420+
matrix[j].Floats = append(matrix[j].Floats, s.Floats...)
421+
matrix[j].Histograms = append(matrix[j].Histograms, s.Histograms...)
422+
} else {
423+
matrix = append(matrix, s)
424+
seenAt[h] = i
425+
}
414426
}
415427
sort.Sort(matrix)
416-
if matrix.ContainsSameLabelset() {
417-
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
428+
if len(seenAt) < len(series) {
429+
// If we had collisions, we need to resort by timestamp, since we might have added
430+
// samples from the later collision pair first.
431+
for _, s := range matrix {
432+
sort.Slice(s.Floats, func(i, j int) bool { return s.Floats[i].T < s.Floats[j].T })
433+
sort.Slice(s.Histograms, func(i, j int) bool { return s.Histograms[i].T < s.Histograms[j].T })
434+
}
418435
}
419436
ret.Value = matrix
420437
return ret
421438
}
422439

440+
// We dont need to zip together results for instant queries since
441+
// they only have results for one timestamp.
423442
var result parser.Value
424443
switch q.expr.Type() {
425444
case parser.ValueTypeMatrix:

engine/engine_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
144144
(http_requests_total < 2*http_requests_total)
145145
)`,
146146
},
147+
{
148+
name: "duplicate label - top level presentation",
149+
load: `load 1m
150+
A 1 2 _ _ _ _ _ _ _ _ _
151+
B _ _ _ _ _ _ _ _ _ 1 2`,
152+
query: `exp({__name__=~"(A|B)"})`,
153+
},
147154
{
148155
name: "timestamp with multiple parenthesis",
149156
load: `load 30s

0 commit comments

Comments
 (0)