1use std::time::Duration;
5
6use mysten_network::metrics::MetricsCallbackProvider;
7use prometheus::{
8 HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
9 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use sui_network::api::KNOWN_VALIDATOR_GRPC_PATHS;
13use sui_network::tonic::Code;
14
15pub struct SuiNodeMetrics {
16 pub jwk_requests: IntCounterVec,
17 pub jwk_request_errors: IntCounterVec,
18
19 pub total_jwks: IntCounterVec,
20 pub invalid_jwks: IntCounterVec,
21 pub unique_jwks: IntCounterVec,
22
23 pub current_protocol_version: IntGauge,
24 pub binary_max_protocol_version: IntGauge,
25 pub configured_max_protocol_version: IntGauge,
26}
27
28impl SuiNodeMetrics {
29 pub fn new(registry: &Registry) -> Self {
30 Self {
31 jwk_requests: register_int_counter_vec_with_registry!(
32 "jwk_requests",
33 "Total number of JWK requests",
34 &["provider"],
35 registry,
36 )
37 .unwrap(),
38 jwk_request_errors: register_int_counter_vec_with_registry!(
39 "jwk_request_errors",
40 "Total number of JWK request errors",
41 &["provider"],
42 registry,
43 )
44 .unwrap(),
45 total_jwks: register_int_counter_vec_with_registry!(
46 "total_jwks",
47 "Total number of JWKs",
48 &["provider"],
49 registry,
50 )
51 .unwrap(),
52 invalid_jwks: register_int_counter_vec_with_registry!(
53 "invalid_jwks",
54 "Total number of invalid JWKs",
55 &["provider"],
56 registry,
57 )
58 .unwrap(),
59 unique_jwks: register_int_counter_vec_with_registry!(
60 "unique_jwks",
61 "Total number of unique JWKs",
62 &["provider"],
63 registry,
64 )
65 .unwrap(),
66 current_protocol_version: register_int_gauge_with_registry!(
67 "sui_current_protocol_version",
68 "Current protocol version in this epoch",
69 registry,
70 )
71 .unwrap(),
72 binary_max_protocol_version: register_int_gauge_with_registry!(
73 "sui_binary_max_protocol_version",
74 "Max protocol version supported by this binary",
75 registry,
76 )
77 .unwrap(),
78 configured_max_protocol_version: register_int_gauge_with_registry!(
79 "sui_configured_max_protocol_version",
80 "Max protocol version configured in the node config",
81 registry,
82 )
83 .unwrap(),
84 }
85 }
86}
87
88#[derive(Clone)]
89pub struct GrpcMetrics {
90 inflight_grpc: IntGaugeVec,
91 grpc_requests: IntCounterVec,
92 grpc_request_latency: HistogramVec,
93}
94
95const LATENCY_SEC_BUCKETS: &[f64] = &[
96 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
97];
98
99impl GrpcMetrics {
100 pub fn new(registry: &Registry) -> Self {
101 Self {
102 inflight_grpc: register_int_gauge_vec_with_registry!(
103 "inflight_grpc",
104 "Total in-flight GRPC requests per route",
105 &["path"],
106 registry,
107 )
108 .unwrap(),
109 grpc_requests: register_int_counter_vec_with_registry!(
110 "grpc_requests",
111 "Total GRPC requests per route",
112 &["path", "status"],
113 registry,
114 )
115 .unwrap(),
116 grpc_request_latency: register_histogram_vec_with_registry!(
117 "grpc_request_latency",
118 "Latency of GRPC requests per route",
119 &["path"],
120 LATENCY_SEC_BUCKETS.to_vec(),
121 registry,
122 )
123 .unwrap(),
124 }
125 }
126}
127
128const UNKNOWN_PATH: &str = "UNKNOWN";
129
130fn sanitize_path(path: &str) -> &str {
131 if KNOWN_VALIDATOR_GRPC_PATHS.contains(path) {
132 path
133 } else {
134 UNKNOWN_PATH
135 }
136}
137
138impl MetricsCallbackProvider for GrpcMetrics {
139 fn on_request(&self, _path: String) {}
140
141 fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
142 let path = sanitize_path(&path);
143 self.grpc_requests
144 .with_label_values(&[path, format!("{grpc_status_code:?}").as_str()])
145 .inc();
146 self.grpc_request_latency
147 .with_label_values(&[path])
148 .observe(latency.as_secs_f64());
149 }
150
151 fn on_start(&self, path: &str) {
152 let path = sanitize_path(path);
153 self.inflight_grpc.with_label_values(&[path]).inc();
154 }
155
156 fn on_drop(&self, path: &str) {
157 let path = sanitize_path(path);
158 self.inflight_grpc.with_label_values(&[path]).dec();
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use mysten_metrics::start_prometheus_server;
166 use prometheus::{IntCounter, Registry};
167 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
168
169 #[tokio::test]
170 pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
171 let port: u16 = 8081;
172 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
173
174 let registry_service = start_prometheus_server(socket);
175
176 tokio::task::yield_now().await;
177
178 let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap();
180 let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
181 registry_1.register(Box::new(counter_1)).unwrap();
182
183 let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap();
184 let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
185 registry_2.register(Box::new(counter_2.clone())).unwrap();
186
187 let registry_1_id = registry_service.add(registry_1);
188 let _registry_2_id = registry_service.add(registry_2);
189
190 let result = get_metrics(port).await;
192
193 assert!(result.contains(
194 "# HELP sui_counter_2 a sample counter 2
195# TYPE sui_counter_2 counter
196sui_counter_2 0"
197 ));
198
199 assert!(result.contains(
200 "# HELP narwhal_counter_1 a sample counter 1
201# TYPE narwhal_counter_1 counter
202narwhal_counter_1 0"
203 ));
204
205 assert!(registry_service.remove(registry_1_id));
207
208 counter_2.inc();
210
211 let result = get_metrics(port).await;
214
215 assert!(!result.contains(
217 "# HELP narwhal_counter_1 a sample counter 1
218# TYPE narwhal_counter_1 counter
219narwhal_counter_1 0"
220 ));
221
222 assert!(result.contains(
224 "# HELP sui_counter_2 a sample counter 2
225# TYPE sui_counter_2 counter
226sui_counter_2 1"
227 ));
228 }
229
230 async fn get_metrics(port: u16) -> String {
231 let client = reqwest::Client::new();
232 let response = client
233 .get(format!("http://127.0.0.1:{}/metrics", port))
234 .send()
235 .await
236 .unwrap();
237 response.text().await.unwrap()
238 }
239
240 #[test]
241 fn test_grpc_metrics_unknown_path() {
242 let registry = Registry::new();
243 let metrics = GrpcMetrics::new(®istry);
244
245 for i in 0..1000 {
246 let path = format!("/nonexistent.Service/Method{i}");
247 metrics.on_start(&path);
248 metrics.on_response(path.clone(), Duration::from_millis(1), 200, Code::Ok);
249 metrics.on_drop(&path);
250 }
251
252 let metric_families = registry.gather();
253 let inflight = metric_families
254 .iter()
255 .find(|mf| mf.name() == "inflight_grpc")
256 .unwrap();
257 assert_eq!(
258 inflight.get_metric().len(),
259 1,
260 "all unknown paths should collapse into a single time series"
261 );
262 }
263}