Skip to content

Commit a6370c7

Browse files
hczhuhczhu-db
andauthored
Add Prometheus counters for pending write requests and series requests in Receive (#8308)
Signed-off-by: HC Zhu <[email protected]> Co-authored-by: HC Zhu (DB) <[email protected]>
1 parent 8f715b0 commit a6370c7

File tree

4 files changed

+27
-1
lines changed

4 files changed

+27
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1313
- [#8202](https://github.com/thanos-io/thanos/pull/8202) Receive: Unhide `--tsdb.enable-native-histograms` flag
1414
- [#8225](https://github.com/thanos-io/thanos/pull/8225) tools: Extend bucket ls options.
1515
- [#8282](https://github.com/thanos-io/thanos/pull/8282) Force sync writes to meta.json in case of host crash
16+
- [#8308](https://github.com/thanos-io/thanos/pull/8308) Receive: Prometheus counters for pending write requests and series requests
1617

1718
### Added
1819

docs/components/receive.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need
372372

373373
The following formula is used for calculating quorum:
374374

375-
```go mdox-exec="sed -n '1036,1046p' pkg/receive/handler.go"
375+
```go mdox-exec="sed -n '1046,1056p' pkg/receive/handler.go"
376376
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
377377
func (h *Handler) writeQuorum() int {
378378
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes

pkg/receive/handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/prometheus/prometheus/tsdb"
3838
"go.opentelemetry.io/otel/attribute"
3939
"go.opentelemetry.io/otel/trace"
40+
"go.uber.org/atomic"
4041
"google.golang.org/grpc"
4142
"google.golang.org/grpc/codes"
4243
"google.golang.org/grpc/status"
@@ -141,6 +142,9 @@ type Handler struct {
141142
writeSamplesTotal *prometheus.HistogramVec
142143
writeTimeseriesTotal *prometheus.HistogramVec
143144

145+
pendingWriteRequests prometheus.Gauge
146+
pendingWriteRequestsCounter atomic.Int32
147+
144148
Limiter *Limiter
145149
}
146150

@@ -222,6 +226,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
222226
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
223227
}, []string{"code", "tenant"},
224228
),
229+
pendingWriteRequests: promauto.With(registerer).NewGauge(
230+
prometheus.GaugeOpts{
231+
Name: "thanos_receive_pending_write_requests",
232+
Help: "The number of pending write requests.",
233+
},
234+
),
225235
}
226236

227237
h.forwardRequests.WithLabelValues(labelSuccess)
@@ -1060,6 +1070,9 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
10601070
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
10611071
defer span.Finish()
10621072

1073+
h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
1074+
defer h.pendingWriteRequestsCounter.Add(-1)
1075+
10631076
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
10641077
if err != nil {
10651078
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)

pkg/store/telemetry.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212

1313
"github.com/thanos-io/thanos/pkg/store/storepb"
14+
15+
"go.uber.org/atomic"
1416
)
1517

1618
// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
@@ -145,6 +147,9 @@ type instrumentedStoreServer struct {
145147
storepb.StoreServer
146148
seriesRequested prometheus.Histogram
147149
chunksRequested prometheus.Histogram
150+
151+
pendingRequests prometheus.Gauge
152+
pendingRequestsCounter atomic.Int32
148153
}
149154

150155
// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
@@ -161,11 +166,18 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
161166
Help: "Number of requested chunks for Series calls",
162167
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
163168
}),
169+
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
170+
Name: "thanos_store_server_pending_series_requests",
171+
Help: "Number of pending series requests",
172+
}),
164173
}
165174
}
166175

167176
func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
168177
instrumented := newInstrumentedServer(srv)
178+
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
179+
defer s.pendingRequestsCounter.Add(-1)
180+
169181
if err := s.StoreServer.Series(req, instrumented); err != nil {
170182
return err
171183
}

0 commit comments

Comments
 (0)