Skip to content

Commit 0dc5a4c

Browse files
authored
add gateway_indexer_qos_hourly (#3)
1 parent 6ca6889 commit 0dc5a4c

File tree

3 files changed

+128
-2
lines changed

3 files changed

+128
-2
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,28 @@ aggregations into the following topics:
4747
}
4848
```
4949

50+
- `gateway_indexer_qos_hourly`
51+
52+
```protobuf
53+
syntax = "proto3";
54+
message IndexerQosHourly {
55+
message IndexerQos {
56+
/// 20 bytes (address)
57+
bytes indexer = 1;
58+
string deployment = 2;
59+
string chain = 3;
60+
uint32 success_count = 4;
61+
uint32 failure_count = 5;
62+
uint64 avg_seconds_behind = 6;
63+
uint32 avg_latency_ms = 7;
64+
double avg_fee_grt = 8;
65+
}
66+
/// start timestamp for aggregation; in unix milliseconds
67+
int64 timestamp = 1;
68+
repeated IndexerQos aggregations = 2;
69+
}
70+
```
71+
5072
## example configuration
5173

5274
```json

src/bin/main.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use titorelli::{
1212
kafka::{assign_partitions, fetch_partition_ids, latest_messages},
1313
messages::{
1414
ClientFeesHourlyProtobuf, ClientFeesProtobuf, ClientQueryProtobuf,
15-
IndexerFeesHourlyProtobuf, IndexerFeesProtobuf,
15+
IndexerFeesHourlyProtobuf, IndexerFeesProtobuf, IndexerQosHourlyProtobuf,
16+
IndexerQosProtobuf,
1617
},
1718
print_unix_millis,
1819
};
@@ -211,6 +212,32 @@ async fn handle_source_msg(
211212
};
212213
*agg.indexer_fees.entry(key).or_default() += indexer_query.fee_grt;
213214
}
215+
216+
for indexer_query in &data.indexer_queries {
217+
let key = IndexerQosKey {
218+
indexer: Address::from_slice(&indexer_query.allocation)?,
219+
deployment: deployment_cid(&indexer_query.deployment),
220+
};
221+
let value = agg
222+
.indexer_qos
223+
.entry(key)
224+
.or_insert_with(|| IndexerQosValue {
225+
chain: indexer_query.indexed_chain.clone(),
226+
success_count: 0,
227+
failure_count: 0,
228+
total_seconds_behind: 0,
229+
total_latency_ms: 0,
230+
total_fee_grt: 0.0,
231+
});
232+
if indexer_query.result == "success" {
233+
value.success_count += 1;
234+
} else {
235+
value.failure_count += 1;
236+
}
237+
value.total_seconds_behind += indexer_query.seconds_behind as u64;
238+
value.total_latency_ms += indexer_query.response_time_ms as u64;
239+
value.total_fee_grt += indexer_query.fee_grt;
240+
}
214241
}
215242

216243
let legacy_producer = match legacy_producer {
@@ -324,6 +351,7 @@ async fn record_aggregations(
324351
let Aggregations {
325352
client_fees,
326353
indexer_fees,
354+
indexer_qos,
327355
} = aggregations;
328356

329357
let record_payload = ClientFeesHourlyProtobuf {
@@ -377,10 +405,39 @@ async fn record_aggregations(
377405
.map_err(|(err, _)| err)
378406
.context("send aggregation record")?;
379407

408+
let record_payload = IndexerQosHourlyProtobuf {
409+
timestamp,
410+
aggregations: indexer_qos
411+
.into_iter()
412+
.map(|(k, v)| {
413+
let total_queries = (v.success_count + v.failure_count) as f64;
414+
IndexerQosProtobuf {
415+
indexer: k.indexer.0.to_vec(),
416+
deployment: k.deployment,
417+
chain: v.chain,
418+
success_count: v.success_count,
419+
failure_count: v.failure_count,
420+
avg_seconds_behind: (v.total_seconds_behind as f64 / total_queries) as u64,
421+
avg_latency_ms: (v.total_latency_ms as f64 / total_queries) as u32,
422+
avg_fee_grt: (v.total_fee_grt / total_queries),
423+
}
424+
})
425+
.collect(),
426+
}
427+
.encode_to_vec();
428+
let record = rdkafka::producer::FutureRecord::to("gateway_indexer_qos_hourly")
429+
.key(&record_key)
430+
.payload(&record_payload);
431+
producer
432+
.send(record, Duration::from_secs(30))
433+
.await
434+
.map_err(|(err, _)| err)
435+
.context("send aggregation record")?;
436+
380437
Ok(())
381438
}
382439

383-
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
440+
#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
384441
struct Address([u8; 20]);
385442

386443
impl Address {
@@ -460,6 +517,7 @@ impl SourceMsg {
460517
struct Aggregations {
461518
client_fees: BTreeMap<ClientFeesKey, ClientFeesValue>,
462519
indexer_fees: BTreeMap<IndexerFeesKey, f64>,
520+
indexer_qos: BTreeMap<IndexerQosKey, IndexerQosValue>,
463521
}
464522

465523
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
@@ -485,6 +543,22 @@ struct IndexerFeesKey {
485543
receiver: Address,
486544
}
487545

546+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
547+
struct IndexerQosKey {
548+
indexer: Address,
549+
deployment: String,
550+
}
551+
552+
#[derive(Debug)]
553+
struct IndexerQosValue {
554+
chain: String,
555+
success_count: u32,
556+
failure_count: u32,
557+
total_seconds_behind: u64,
558+
total_latency_ms: u64,
559+
total_fee_grt: f64,
560+
}
561+
488562
pub fn legacy_messages(
489563
timestamp: i64,
490564
client_query: ClientQueryProtobuf,

src/messages.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,33 @@ pub struct IndexerFeesProtobuf {
107107
#[prost(double, tag = "3")]
108108
pub fees_grt: f64,
109109
}
110+
111+
#[derive(prost::Message)]
112+
pub struct IndexerQosHourlyProtobuf {
113+
/// start timestamp for aggregation, in unix milliseconds
114+
#[prost(int64, tag = "1")]
115+
pub timestamp: i64,
116+
#[prost(message, repeated, tag = "2")]
117+
pub aggregations: Vec<IndexerQosProtobuf>,
118+
}
119+
120+
#[derive(prost::Message)]
121+
pub struct IndexerQosProtobuf {
122+
/// 20 bytes (address)
123+
#[prost(bytes, tag = "1")]
124+
pub indexer: Vec<u8>,
125+
#[prost(string, tag = "2")]
126+
pub deployment: String,
127+
#[prost(string, tag = "3")]
128+
pub chain: String,
129+
#[prost(uint32, tag = "4")]
130+
pub success_count: u32,
131+
#[prost(uint32, tag = "5")]
132+
pub failure_count: u32,
133+
#[prost(uint64, tag = "6")]
134+
pub avg_seconds_behind: u64,
135+
#[prost(uint32, tag = "7")]
136+
pub avg_latency_ms: u32,
137+
#[prost(double, tag = "8")]
138+
pub avg_fee_grt: f64,
139+
}

0 commit comments

Comments
 (0)