17
17
// You should have received a copy of the GNU Affero General Public License
18
18
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
19
20
+ use std:: net:: SocketAddr ;
21
+ use std:: sync:: Weak ;
22
+ use std:: time:: Duration ;
23
+
24
+ use chitchat:: { Chitchat , ChitchatId } ;
20
25
use once_cell:: sync:: Lazy ;
21
- use quickwit_common:: metrics:: { new_counter, IntCounter } ;
26
+ use quickwit_common:: metrics:: { new_counter, new_gauge, IntCounter , IntGauge } ;
27
+ use tokio:: sync:: Mutex ;
28
+
29
+ use crate :: member:: NodeStateExt ;
22
30
23
31
pub struct ClusterMetrics {
24
- pub gossip_recv_total : IntCounter ,
32
+ pub live_nodes : IntGauge ,
33
+ pub ready_nodes : IntGauge ,
34
+ pub zombie_nodes : IntGauge ,
35
+ pub dead_nodes : IntGauge ,
36
+ pub cluster_state_size_bytes : IntGauge ,
37
+ pub node_state_size_bytes : IntGauge ,
38
+ pub node_state_keys : IntGauge ,
39
+ pub gossip_recv_messages_total : IntCounter ,
25
40
pub gossip_recv_bytes_total : IntCounter ,
26
- pub gossip_send_total : IntCounter ,
27
- pub gossip_send_bytes_total : IntCounter ,
41
+ pub gossip_sent_messages_total : IntCounter ,
42
+ pub gossip_sent_bytes_total : IntCounter ,
28
43
}
29
44
30
45
impl Default for ClusterMetrics {
31
46
fn default ( ) -> Self {
32
47
ClusterMetrics {
33
- gossip_recv_total : new_counter (
34
- "gossip_recv_total" ,
48
+ live_nodes : new_gauge (
49
+ "live_nodes" ,
50
+ "The number of live nodes observed locally." ,
51
+ "cluster" ,
52
+ ) ,
53
+ ready_nodes : new_gauge (
54
+ "ready_nodes" ,
55
+ "The number of ready nodes observed locally." ,
56
+ "cluster" ,
57
+ ) ,
58
+ zombie_nodes : new_gauge (
59
+ "zombie_nodes" ,
60
+ "The number of zombie nodes observed locally." ,
61
+ "cluster" ,
62
+ ) ,
63
+ dead_nodes : new_gauge (
64
+ "dead_nodes" ,
65
+ "The number of dead nodes observed locally." ,
66
+ "cluster" ,
67
+ ) ,
68
+ cluster_state_size_bytes : new_gauge (
69
+ "cluster_state_size_bytes" ,
70
+ "The size of the cluster state in bytes." ,
71
+ "cluster" ,
72
+ ) ,
73
+ node_state_keys : new_gauge (
74
+ "node_state_keys" ,
75
+ "The number of keys in the node state." ,
76
+ "cluster" ,
77
+ ) ,
78
+ node_state_size_bytes : new_gauge (
79
+ "node_state_size_bytes" ,
80
+ "The size of the node state in bytes." ,
81
+ "cluster" ,
82
+ ) ,
83
+ gossip_recv_messages_total : new_counter (
84
+ "gossip_recv_messages_total" ,
35
85
"Total number of gossip messages received." ,
36
86
"cluster" ,
37
87
) ,
@@ -40,13 +90,13 @@ impl Default for ClusterMetrics {
40
90
"Total amount of gossip data received in bytes." ,
41
91
"cluster" ,
42
92
) ,
43
- gossip_send_total : new_counter (
44
- "gossip_send_total " ,
93
+ gossip_sent_messages_total : new_counter (
94
+ "gossip_sent_messages_total " ,
45
95
"Total number of gossip messages sent." ,
46
96
"cluster" ,
47
97
) ,
48
- gossip_send_bytes_total : new_counter (
49
- "gossip_send_bytes_total " ,
98
+ gossip_sent_bytes_total : new_counter (
99
+ "gossip_sent_bytes_total " ,
50
100
"Total amount of gossip data sent in bytes." ,
51
101
"cluster" ,
52
102
) ,
@@ -55,3 +105,61 @@ impl Default for ClusterMetrics {
55
105
}
56
106
57
107
pub static CLUSTER_METRICS : Lazy < ClusterMetrics > = Lazy :: new ( ClusterMetrics :: default) ;
108
+
109
+ pub ( crate ) fn spawn_metrics_task (
110
+ weak_chitchat : Weak < Mutex < Chitchat > > ,
111
+ self_chitchat_id : ChitchatId ,
112
+ ) {
113
+ const METRICS_INTERVAL : Duration = Duration :: from_secs ( 15 ) ;
114
+
115
+ const SIZE_OF_GENERATION_ID : usize = std:: mem:: size_of :: < u64 > ( ) ;
116
+ const SIZE_OF_SOCKET_ADDR : usize = std:: mem:: size_of :: < SocketAddr > ( ) ;
117
+
118
+ let future = async move {
119
+ let mut interval = tokio:: time:: interval ( METRICS_INTERVAL ) ;
120
+
121
+ while let Some ( chitchat) = weak_chitchat. upgrade ( ) {
122
+ interval. tick ( ) . await ;
123
+
124
+ let mut num_ready_nodes = 0 ;
125
+ let mut cluster_state_size_bytes = 0 ;
126
+
127
+ let chitchat_guard = chitchat. lock ( ) . await ;
128
+
129
+ let num_live_nodes = chitchat_guard. live_nodes ( ) . count ( ) ;
130
+ let num_zombie_nodes = chitchat_guard. scheduled_for_deletion_nodes ( ) . count ( ) ;
131
+ let num_dead_nodes = chitchat_guard. dead_nodes ( ) . count ( ) ;
132
+
133
+ for ( chitchat_id, node_state) in chitchat_guard. node_states ( ) {
134
+ if node_state. is_ready ( ) {
135
+ num_ready_nodes += 1 ;
136
+ }
137
+ let chitchat_id_size_bytes =
138
+ chitchat_id. node_id . len ( ) + SIZE_OF_GENERATION_ID + SIZE_OF_SOCKET_ADDR ;
139
+ let node_state_size_bytes = node_state. size_bytes ( ) ;
140
+
141
+ cluster_state_size_bytes += chitchat_id_size_bytes + node_state_size_bytes;
142
+
143
+ if * chitchat_id == self_chitchat_id {
144
+ CLUSTER_METRICS
145
+ . node_state_keys
146
+ . set ( node_state. num_key_values ( ) as i64 ) ;
147
+ CLUSTER_METRICS
148
+ . node_state_size_bytes
149
+ . set ( node_state_size_bytes as i64 ) ;
150
+ }
151
+ }
152
+ drop ( chitchat_guard) ;
153
+
154
+ CLUSTER_METRICS . live_nodes . set ( num_live_nodes as i64 ) ;
155
+ CLUSTER_METRICS . ready_nodes . set ( num_ready_nodes as i64 ) ;
156
+ CLUSTER_METRICS . zombie_nodes . set ( num_zombie_nodes as i64 ) ;
157
+ CLUSTER_METRICS . dead_nodes . set ( num_dead_nodes as i64 ) ;
158
+
159
+ CLUSTER_METRICS
160
+ . cluster_state_size_bytes
161
+ . set ( cluster_state_size_bytes as i64 ) ;
162
+ }
163
+ } ;
164
+ tokio:: spawn ( future) ;
165
+ }
0 commit comments