Skip to content

Commit 674ae61

Browse files
committed
remove gateway_indexer_fees source
1 parent e684e55 commit 674ae61

File tree

1 file changed

+2
-39
lines changed

1 file changed

+2
-39
lines changed

src/bin/main.rs

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,7 @@ async fn run() -> anyhow::Result<()> {
9898
.collect()
9999
};
100100

101-
let assignment = assign_partitions(
102-
&consumer,
103-
&["gateway_queries", "gateway_indexer_fees"],
104-
start_timestamp,
105-
)
106-
.await?;
101+
let assignment = assign_partitions(&consumer, &["gateway_queries"], start_timestamp).await?;
107102

108103
let (source_msg_tx, mut source_msg_rx) = mpsc::channel::<SourceMsg>(1024);
109104
let mut partition_consumers: Vec<JoinHandle<()>> = assignment
@@ -180,18 +175,6 @@ async fn handle_source_msg(
180175
tracing::info!(timestamp = print_unix_millis(t).unwrap(), "flushed");
181176
}
182177
}
183-
SourceMsg::IndexerFees {
184-
aggregation_timestamp,
185-
signer,
186-
receiver,
187-
fees_grt,
188-
} => {
189-
if aggregation_timestamp >= start_timestamp {
190-
let key = IndexerFeesKey { signer, receiver };
191-
let agg = aggregations.entry(aggregation_timestamp).or_default();
192-
*agg.indexer_fees.entry(key).or_default() += fees_grt;
193-
}
194-
}
195178
SourceMsg::ClientQuery {
196179
timestamp,
197180
aggregation_timestamp,
@@ -266,11 +249,7 @@ fn spawn_partition_consumer(
266249
let msg = SourceMsg::decode(msg, &legacy_source_offsets)?;
267250
let aggregation_timestamp = match &msg {
268251
SourceMsg::Flush { .. } => unreachable!(), // unreachable
269-
SourceMsg::IndexerFees {
270-
aggregation_timestamp,
271-
..
272-
}
273-
| SourceMsg::ClientQuery {
252+
SourceMsg::ClientQuery {
274253
aggregation_timestamp,
275254
..
276255
} => *aggregation_timestamp,
@@ -366,13 +345,6 @@ enum SourceMsg {
366345
partition_id: String,
367346
aggregation_timestamp: i64,
368347
},
369-
// TODO: remove after migration
370-
IndexerFees {
371-
aggregation_timestamp: i64,
372-
signer: Address,
373-
receiver: Address,
374-
fees_grt: f64,
375-
},
376348
ClientQuery {
377349
timestamp: i64,
378350
aggregation_timestamp: i64,
@@ -404,15 +376,6 @@ impl SourceMsg {
404376
data: decoded,
405377
})
406378
}
407-
"gateway_indexer_fees" => {
408-
let decoded = IndexerFeesProtobuf::decode(payload).context("decode protobuf")?;
409-
Ok(SourceMsg::IndexerFees {
410-
aggregation_timestamp,
411-
signer: Address::from_slice(&decoded.signer)?,
412-
receiver: Address::from_slice(&decoded.receiver)?,
413-
fees_grt: decoded.fees_grt,
414-
})
415-
}
416379
topic => anyhow::bail!("unexpected topic: {topic}"),
417380
}
418381
}

0 commit comments

Comments
 (0)