sui_node/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use mysten_network::metrics::MetricsCallbackProvider;
7use prometheus::{
8    HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
9    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use sui_network::api::KNOWN_VALIDATOR_GRPC_PATHS;
13use sui_network::tonic::Code;
14
15pub struct SuiNodeMetrics {
16    pub jwk_requests: IntCounterVec,
17    pub jwk_request_errors: IntCounterVec,
18
19    pub total_jwks: IntCounterVec,
20    pub invalid_jwks: IntCounterVec,
21    pub unique_jwks: IntCounterVec,
22
23    pub current_protocol_version: IntGauge,
24    pub binary_max_protocol_version: IntGauge,
25    pub configured_max_protocol_version: IntGauge,
26}
27
28impl SuiNodeMetrics {
29    pub fn new(registry: &Registry) -> Self {
30        Self {
31            jwk_requests: register_int_counter_vec_with_registry!(
32                "jwk_requests",
33                "Total number of JWK requests",
34                &["provider"],
35                registry,
36            )
37            .unwrap(),
38            jwk_request_errors: register_int_counter_vec_with_registry!(
39                "jwk_request_errors",
40                "Total number of JWK request errors",
41                &["provider"],
42                registry,
43            )
44            .unwrap(),
45            total_jwks: register_int_counter_vec_with_registry!(
46                "total_jwks",
47                "Total number of JWKs",
48                &["provider"],
49                registry,
50            )
51            .unwrap(),
52            invalid_jwks: register_int_counter_vec_with_registry!(
53                "invalid_jwks",
54                "Total number of invalid JWKs",
55                &["provider"],
56                registry,
57            )
58            .unwrap(),
59            unique_jwks: register_int_counter_vec_with_registry!(
60                "unique_jwks",
61                "Total number of unique JWKs",
62                &["provider"],
63                registry,
64            )
65            .unwrap(),
66            current_protocol_version: register_int_gauge_with_registry!(
67                "sui_current_protocol_version",
68                "Current protocol version in this epoch",
69                registry,
70            )
71            .unwrap(),
72            binary_max_protocol_version: register_int_gauge_with_registry!(
73                "sui_binary_max_protocol_version",
74                "Max protocol version supported by this binary",
75                registry,
76            )
77            .unwrap(),
78            configured_max_protocol_version: register_int_gauge_with_registry!(
79                "sui_configured_max_protocol_version",
80                "Max protocol version configured in the node config",
81                registry,
82            )
83            .unwrap(),
84        }
85    }
86}
87
88#[derive(Clone)]
89pub struct GrpcMetrics {
90    inflight_grpc: IntGaugeVec,
91    grpc_requests: IntCounterVec,
92    grpc_request_latency: HistogramVec,
93}
94
95const LATENCY_SEC_BUCKETS: &[f64] = &[
96    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
97];
98
99impl GrpcMetrics {
100    pub fn new(registry: &Registry) -> Self {
101        Self {
102            inflight_grpc: register_int_gauge_vec_with_registry!(
103                "inflight_grpc",
104                "Total in-flight GRPC requests per route",
105                &["path"],
106                registry,
107            )
108            .unwrap(),
109            grpc_requests: register_int_counter_vec_with_registry!(
110                "grpc_requests",
111                "Total GRPC requests per route",
112                &["path", "status"],
113                registry,
114            )
115            .unwrap(),
116            grpc_request_latency: register_histogram_vec_with_registry!(
117                "grpc_request_latency",
118                "Latency of GRPC requests per route",
119                &["path"],
120                LATENCY_SEC_BUCKETS.to_vec(),
121                registry,
122            )
123            .unwrap(),
124        }
125    }
126}
127
128const UNKNOWN_PATH: &str = "UNKNOWN";
129
130fn sanitize_path(path: &str) -> &str {
131    if KNOWN_VALIDATOR_GRPC_PATHS.contains(path) {
132        path
133    } else {
134        UNKNOWN_PATH
135    }
136}
137
138impl MetricsCallbackProvider for GrpcMetrics {
139    fn on_request(&self, _path: String) {}
140
141    fn on_response(&self, path: String, latency: Duration, _status: u16, grpc_status_code: Code) {
142        let path = sanitize_path(&path);
143        self.grpc_requests
144            .with_label_values(&[path, format!("{grpc_status_code:?}").as_str()])
145            .inc();
146        self.grpc_request_latency
147            .with_label_values(&[path])
148            .observe(latency.as_secs_f64());
149    }
150
151    fn on_start(&self, path: &str) {
152        let path = sanitize_path(path);
153        self.inflight_grpc.with_label_values(&[path]).inc();
154    }
155
156    fn on_drop(&self, path: &str) {
157        let path = sanitize_path(path);
158        self.inflight_grpc.with_label_values(&[path]).dec();
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use mysten_metrics::start_prometheus_server;
166    use prometheus::{IntCounter, Registry};
167    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
168
169    #[tokio::test]
170    pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() {
171        let port: u16 = 8081;
172        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
173
174        let registry_service = start_prometheus_server(socket);
175
176        tokio::task::yield_now().await;
177
178        // now add a few registries to the service along side with metrics
179        let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap();
180        let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap();
181        registry_1.register(Box::new(counter_1)).unwrap();
182
183        let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap();
184        let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap();
185        registry_2.register(Box::new(counter_2.clone())).unwrap();
186
187        let registry_1_id = registry_service.add(registry_1);
188        let _registry_2_id = registry_service.add(registry_2);
189
190        // request the endpoint
191        let result = get_metrics(port).await;
192
193        assert!(result.contains(
194            "# HELP sui_counter_2 a sample counter 2
195# TYPE sui_counter_2 counter
196sui_counter_2 0"
197        ));
198
199        assert!(result.contains(
200            "# HELP narwhal_counter_1 a sample counter 1
201# TYPE narwhal_counter_1 counter
202narwhal_counter_1 0"
203        ));
204
205        // Now remove registry 1
206        assert!(registry_service.remove(registry_1_id));
207
208        // AND increase metric 2
209        counter_2.inc();
210
211        // Now pull again metrics
212        // request the endpoint
213        let result = get_metrics(port).await;
214
215        // Registry 1 metrics should not be present anymore
216        assert!(!result.contains(
217            "# HELP narwhal_counter_1 a sample counter 1
218# TYPE narwhal_counter_1 counter
219narwhal_counter_1 0"
220        ));
221
222        // Registry 2 metric should have increased by 1
223        assert!(result.contains(
224            "# HELP sui_counter_2 a sample counter 2
225# TYPE sui_counter_2 counter
226sui_counter_2 1"
227        ));
228    }
229
230    async fn get_metrics(port: u16) -> String {
231        let client = reqwest::Client::new();
232        let response = client
233            .get(format!("http://127.0.0.1:{}/metrics", port))
234            .send()
235            .await
236            .unwrap();
237        response.text().await.unwrap()
238    }
239
240    #[test]
241    fn test_grpc_metrics_unknown_path() {
242        let registry = Registry::new();
243        let metrics = GrpcMetrics::new(&registry);
244
245        for i in 0..1000 {
246            let path = format!("/nonexistent.Service/Method{i}");
247            metrics.on_start(&path);
248            metrics.on_response(path.clone(), Duration::from_millis(1), 200, Code::Ok);
249            metrics.on_drop(&path);
250        }
251
252        let metric_families = registry.gather();
253        let inflight = metric_families
254            .iter()
255            .find(|mf| mf.name() == "inflight_grpc")
256            .unwrap();
257        assert_eq!(
258            inflight.get_metric().len(),
259            1,
260            "all unknown paths should collapse into a single time series"
261        );
262    }
263}