Skip to content

Commit 24c3575

Browse files
committed
compact: fix after review
Signed-off-by: Giedrius Statkevičius <[email protected]>
1 parent e4e7fa2 commit 24c3575

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

docs/components/compact.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,7 @@ message AggrChunk {
152152

153153
This means that for each series we collect various aggregations with a given interval: 5m or 1h (depending on resolution). This allows us to keep precision on large duration queries, without fetching too many samples.
154154

155-
Native histogram downsampling leverages the fact that one can aggregate & reduce schema i.e. downsample native histograms. Native histograms
156-
only store 3 aggregations - counter, count, and sum. Sum and count are used to produce "an average" native histogram. Counter is a counter
157-
that is used with functions irate, rate, increase, and resets.
155+
Native histogram downsampling leverages the fact that one can aggregate & reduce schema i.e. downsample native histograms. Native histograms only store 3 aggregations - counter, count, and sum. Sum and count are used to produce "an average" native histogram. Counter is a counter that is used with functions irate, rate, increase, and resets.
158156

159157
### ⚠ ️Downsampling: Note About Resolution and Retention ⚠️
160158

pkg/compact/downsample/downsample.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,26 @@ const (
4747
ResLevel2DownsampleRange = 10 * 24 * 60 * 60 * 1000 // 10 days.
4848
)
4949

50+
// cutNewChunk returns true when a new chunk needs to be cut.
51+
// Float histograms & regular histograms are the same
52+
// from our point of view (we always write float histograms)
53+
// so we only need to cut a chunk when we are going from any histogram encoding to non-histogram encoding.
54+
func cutNewChunk(curEnc, prevEnc chunkenc.Encoding) bool {
55+
isHist := func(c chunkenc.Encoding) bool {
56+
return c == chunkenc.EncFloatHistogram || c == chunkenc.EncHistogram
57+
}
58+
59+
if isHist(curEnc) && !isHist(prevEnc) {
60+
return true
61+
}
62+
63+
if !isHist(curEnc) && isHist(prevEnc) {
64+
return true
65+
}
66+
67+
return false
68+
}
69+
5070
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
5171
func Downsample(
5272
ctx context.Context,
@@ -153,7 +173,7 @@ func Downsample(
153173
var prevEnc chunkenc.Encoding = chks[0].Chunk.Encoding()
154174

155175
for _, c := range chks {
156-
if prevEnc != c.Chunk.Encoding() {
176+
if cutNewChunk(c.Chunk.Encoding(), prevEnc) {
157177
resChunks = append(resChunks, DownsampleRaw(all, resolution)...)
158178
all = all[:0]
159179
prevEnc = c.Chunk.Encoding()
@@ -209,7 +229,7 @@ func Downsample(
209229
// Downsample a block that contains aggregated chunks already.
210230
for i, c := range fixedChks {
211231
ac := c.Chunk.(*AggrChunk)
212-
if i > 0 && previousIsHistogram != isHistogram(ac) {
232+
if i > 0 && previousIsHistogram != isHistogramAggrChunk(ac) {
213233
err := downsampleAggr(
214234
aggrChunks,
215235
&all,
@@ -222,15 +242,15 @@ func Downsample(
222242
if err != nil {
223243
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At())
224244
}
225-
previousIsHistogram = isHistogram(ac)
245+
previousIsHistogram = isHistogramAggrChunk(ac)
226246
aggrChunks = aggrChunks[:0]
227247
mint = c.MinTime
228248
}
229249
aggrChunks = append(aggrChunks, ac)
230250

231251
if i == 0 {
232252
mint = c.MinTime
233-
previousIsHistogram = isHistogram(ac)
253+
previousIsHistogram = isHistogramAggrChunk(ac)
234254
}
235255
maxt = c.MaxTime
236256
}
@@ -831,7 +851,7 @@ func downsampleBatch(data []sample, resolution int64, aggr sampleAggregator, add
831851
return nextT
832852
}
833853

834-
func isHistogram(c *AggrChunk) bool {
854+
func isHistogramAggrChunk(c *AggrChunk) bool {
835855
// If it is an aggregated chunk histogram chunk, the counter will be of the type histogram.
836856
cntr, err := c.Get(AggrCounter)
837857
if err != nil {
@@ -851,7 +871,7 @@ func downsampleAggr(
851871
var numSamples int
852872

853873
for _, c := range chks {
854-
if isHistogram(c) {
874+
if isHistogramAggrChunk(c) {
855875
hChks = append(hChks, c)
856876
numSamples += c.NumSamples()
857877
} else {
@@ -977,7 +997,7 @@ func expandHistogramChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
977997
return it.Err()
978998
}
979999

980-
// expandHistogramChunkIterator reads all histograms from the iterator and appends them to buf.
1000+
// expandFloatHistogramChunkIterator reads all histograms from the iterator and appends them to buf.
9811001
func expandFloatHistogramChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
9821002
// For safety reasons, we check for each sample that it does not go back in time.
9831003
// If it does, we skip it.

pkg/compact/downsample/downsample_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2864,3 +2864,12 @@ func assertValidChunkTime(t *testing.T, chks []chunks.Meta) {
28642864
testutil.Assert(t, chk.MaxTime >= chk.MinTime, "chunk MaxTime is not greater equal to MinTime")
28652865
}
28662866
}
2867+
2868+
func TestDownsampleNHCutNewChunk(t *testing.T) {
2869+
require.False(t, cutNewChunk(chunkenc.EncFloatHistogram, chunkenc.EncHistogram))
2870+
require.False(t, cutNewChunk(chunkenc.EncHistogram, chunkenc.EncFloatHistogram))
2871+
require.False(t, cutNewChunk(chunkenc.EncHistogram, chunkenc.EncHistogram))
2872+
require.False(t, cutNewChunk(chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogram))
2873+
require.True(t, cutNewChunk(chunkenc.EncXOR, chunkenc.EncFloatHistogram))
2874+
require.True(t, cutNewChunk(chunkenc.EncXOR, chunkenc.EncHistogram))
2875+
}

0 commit comments

Comments
 (0)