sui_rpc_api/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::http;
5use std::{borrow::Cow, collections::HashSet, sync::Arc, time::Instant};
6
7use mysten_network::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9    HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13use prost::Message;
14
15#[derive(Clone)]
16pub struct RpcMetrics {
17    inflight_requests: IntGaugeVec,
18    num_requests: IntCounterVec,
19    request_latency: HistogramVec,
20}
21
22const LATENCY_SEC_BUCKETS: &[f64] = &[
23    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
24];
25
26impl RpcMetrics {
27    pub fn new(registry: &Registry) -> Self {
28        Self {
29            inflight_requests: register_int_gauge_vec_with_registry!(
30                "rpc_inflight_requests",
31                "Total in-flight RPC requests per route",
32                &["path"],
33                registry,
34            )
35            .unwrap(),
36            num_requests: register_int_counter_vec_with_registry!(
37                "rpc_requests",
38                "Total RPC requests per route and their http status",
39                &["path", "status"],
40                registry,
41            )
42            .unwrap(),
43            request_latency: register_histogram_vec_with_registry!(
44                "rpc_request_latency",
45                "Latency of RPC requests per route",
46                &["path"],
47                LATENCY_SEC_BUCKETS.to_vec(),
48                registry,
49            )
50            .unwrap(),
51        }
52    }
53}
54
55/// Set of `/package.Service/Method` paths that are safe to use as metric
56/// labels.
57///
58/// Services are mounted with the wildcard route `/{ServiceName}/{*rest}`, so
59/// any path under a registered prefix matches a route and would otherwise be
60/// taken verbatim as a `path` label. Bounding the labels to known methods
61/// prevents an unauthenticated attacker from inflating Prometheus label maps
62/// (which the prometheus crate retains for the lifetime of the process) by
63/// streaming requests with random method suffixes.
64pub type GrpcMethodAllowlist = Arc<HashSet<String>>;
65
66/// Decode one or more encoded `FileDescriptorSet` byte slices and return the
67/// set of `/package.Service/Method` paths they declare.
68///
69/// Intended to be called once at server startup with the same bytes that are
70/// registered with `tonic_reflection`, so the metrics allowlist stays in sync
71/// with the services actually exposed over gRPC.
72pub fn grpc_method_paths_from_file_descriptor_sets(
73    encoded_sets: &[&[u8]],
74) -> Result<HashSet<String>, prost::DecodeError> {
75    let mut paths = HashSet::new();
76    for bytes in encoded_sets {
77        let fds = prost_types::FileDescriptorSet::decode(*bytes)?;
78        for file in fds.file {
79            let package = file.package.unwrap_or_default();
80            for service in file.service {
81                let Some(service_name) = service.name else {
82                    continue;
83                };
84                let qualified_service = if package.is_empty() {
85                    service_name
86                } else {
87                    format!("{}.{}", package, service_name)
88                };
89                for method in service.method {
90                    let Some(method_name) = method.name else {
91                        continue;
92                    };
93                    paths.insert(format!("/{}/{}", qualified_service, method_name));
94                }
95            }
96        }
97    }
98    Ok(paths)
99}
100
101#[derive(Clone)]
102pub struct RpcMetricsMakeCallbackHandler {
103    metrics: Arc<RpcMetrics>,
104    grpc_method_allowlist: GrpcMethodAllowlist,
105}
106
107impl RpcMetricsMakeCallbackHandler {
108    /// Construct a handler with no gRPC method allowlist. All gRPC requests
109    /// will be labelled with their matched route pattern (e.g.
110    /// `/sui.rpc.v2.LedgerService/{*rest}`) rather than the per-method path,
111    /// which is safe but loses per-method granularity.
112    pub fn new(metrics: Arc<RpcMetrics>) -> Self {
113        Self::with_grpc_method_allowlist(metrics, Arc::new(HashSet::new()))
114    }
115
116    /// Construct a handler that uses `allowlist` to decide which gRPC request
117    /// paths are safe to emit as Prometheus labels.
118    pub fn with_grpc_method_allowlist(
119        metrics: Arc<RpcMetrics>,
120        allowlist: GrpcMethodAllowlist,
121    ) -> Self {
122        Self {
123            metrics,
124            grpc_method_allowlist: allowlist,
125        }
126    }
127}
128
129impl MakeCallbackHandler for RpcMetricsMakeCallbackHandler {
130    type Handler = RpcMetricsCallbackHandler;
131
132    fn make_handler(&self, request: &http::request::Parts) -> Self::Handler {
133        let start = Instant::now();
134        let metrics = self.metrics.clone();
135
136        let matched_path = request
137            .extensions
138            .get::<axum::extract::MatchedPath>()
139            .map(|m| m.as_str());
140        let is_grpc = request
141            .headers
142            .get(&http::header::CONTENT_TYPE)
143            .is_some_and(is_grpc_content_type);
144
145        let path = compute_metric_label(
146            is_grpc,
147            request.uri.path(),
148            matched_path,
149            &self.grpc_method_allowlist,
150        );
151
152        metrics
153            .inflight_requests
154            .with_label_values(&[path.as_ref()])
155            .inc();
156
157        RpcMetricsCallbackHandler {
158            metrics,
159            path,
160            start,
161            counted_response: false,
162        }
163    }
164}
165
166/// Decide which string to use as the `path` Prometheus label for a request.
167///
168/// For gRPC traffic, prefer the per-method URI path when it is in the
169/// allowlist; otherwise fall back to the matched route pattern so unknown
170/// methods collapse into a single bounded series per service. For non-gRPC
171/// traffic the matched path is already bounded by the routes registered on
172/// the router, so it is used directly.
173fn compute_metric_label(
174    is_grpc: bool,
175    uri_path: &str,
176    matched_path: Option<&str>,
177    grpc_method_allowlist: &HashSet<String>,
178) -> Cow<'static, str> {
179    match (is_grpc, matched_path) {
180        (true, _) if grpc_method_allowlist.contains(uri_path) => Cow::Owned(uri_path.to_owned()),
181        (true, Some(matched)) => Cow::Owned(matched.to_owned()),
182        (false, Some(matched)) => Cow::Owned(matched.to_owned()),
183        (_, None) => Cow::Borrowed("unknown"),
184    }
185}
186
187fn is_grpc_content_type(content_type: &http::HeaderValue) -> bool {
188    content_type
189        .as_bytes()
190        .starts_with(tonic::metadata::GRPC_CONTENT_TYPE.as_bytes())
191}
192
193pub struct RpcMetricsCallbackHandler {
194    metrics: Arc<RpcMetrics>,
195    path: Cow<'static, str>,
196    start: Instant,
197    // Indicates if we successfully counted the response. In some cases when a request is
198    // prematurely canceled this will remain false
199    counted_response: bool,
200}
201
202impl ResponseHandler for RpcMetricsCallbackHandler {
203    fn on_response(&mut self, response: &http::response::Parts) {
204        const GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
205
206        let status = if response
207            .headers
208            .get(&http::header::CONTENT_TYPE)
209            .is_some_and(is_grpc_content_type)
210        {
211            let code = response
212                .headers
213                .get(&GRPC_STATUS)
214                .map(http::HeaderValue::as_bytes)
215                .map(tonic::Code::from_bytes)
216                .unwrap_or(tonic::Code::Ok);
217
218            code_as_str(code)
219        } else {
220            response.status.as_str()
221        };
222
223        self.metrics
224            .num_requests
225            .with_label_values(&[self.path.as_ref(), status])
226            .inc();
227
228        self.counted_response = true;
229    }
230
231    fn on_error<E>(&mut self, _error: &E) {
232        // Do nothing if the whole service errored
233        //
234        // in Axum this isn't possible since all services are required to have an error type of
235        // Infallible
236    }
237}
238
239impl Drop for RpcMetricsCallbackHandler {
240    fn drop(&mut self) {
241        self.metrics
242            .inflight_requests
243            .with_label_values(&[self.path.as_ref()])
244            .dec();
245
246        let latency = self.start.elapsed().as_secs_f64();
247        self.metrics
248            .request_latency
249            .with_label_values(&[self.path.as_ref()])
250            .observe(latency);
251
252        if !self.counted_response {
253            self.metrics
254                .num_requests
255                .with_label_values(&[self.path.as_ref(), "canceled"])
256                .inc();
257        }
258    }
259}
260
261fn code_as_str(code: tonic::Code) -> &'static str {
262    match code {
263        tonic::Code::Ok => "ok",
264        tonic::Code::Cancelled => "canceled",
265        tonic::Code::Unknown => "unknown",
266        tonic::Code::InvalidArgument => "invalid-argument",
267        tonic::Code::DeadlineExceeded => "deadline-exceeded",
268        tonic::Code::NotFound => "not-found",
269        tonic::Code::AlreadyExists => "already-exists",
270        tonic::Code::PermissionDenied => "permission-denied",
271        tonic::Code::ResourceExhausted => "resource-exhausted",
272        tonic::Code::FailedPrecondition => "failed-precondition",
273        tonic::Code::Aborted => "aborted",
274        tonic::Code::OutOfRange => "out-of-range",
275        tonic::Code::Unimplemented => "unimplemented",
276        tonic::Code::Internal => "internal",
277        tonic::Code::Unavailable => "unavailable",
278        tonic::Code::DataLoss => "data-loss",
279        tonic::Code::Unauthenticated => "unauthenticated",
280    }
281}
282
283#[derive(Clone)]
284pub(crate) struct SubscriptionMetrics {
285    pub inflight_subscribers: IntGauge,
286    pub last_recieved_checkpoint: IntGauge,
287}
288
289impl SubscriptionMetrics {
290    pub fn new(registry: &Registry) -> Self {
291        Self {
292            inflight_subscribers: register_int_gauge_with_registry!(
293                "subscription_inflight_subscribers",
294                "Total in-flight subscriptions",
295                registry,
296            )
297            .unwrap(),
298            last_recieved_checkpoint: register_int_gauge_with_registry!(
299                "subscription_last_recieved_checkpoint",
300                "Last recieved checkpoint by the subscription service",
301                registry,
302            )
303            .unwrap(),
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use prost_types::{
312        FileDescriptorProto, FileDescriptorSet, MethodDescriptorProto, ServiceDescriptorProto,
313    };
314
315    fn encode(set: FileDescriptorSet) -> Vec<u8> {
316        let mut buf = Vec::with_capacity(set.encoded_len());
317        set.encode(&mut buf).unwrap();
318        buf
319    }
320
321    fn fds(package: &str, services: &[(&str, &[&str])]) -> Vec<u8> {
322        encode(FileDescriptorSet {
323            file: vec![FileDescriptorProto {
324                package: Some(package.to_owned()),
325                service: services
326                    .iter()
327                    .map(|(name, methods)| ServiceDescriptorProto {
328                        name: Some((*name).to_owned()),
329                        method: methods
330                            .iter()
331                            .map(|m| MethodDescriptorProto {
332                                name: Some((*m).to_owned()),
333                                ..Default::default()
334                            })
335                            .collect(),
336                        ..Default::default()
337                    })
338                    .collect(),
339                ..Default::default()
340            }],
341        })
342    }
343
344    #[test]
345    fn parses_method_paths_from_file_descriptor_sets() {
346        let v2 = fds(
347            "sui.rpc.v2",
348            &[("LedgerService", &["GetCheckpoint", "GetTransaction"])],
349        );
350        let alpha = fds("sui.rpc.alpha", &[("EventService", &["Subscribe"])]);
351
352        let paths = grpc_method_paths_from_file_descriptor_sets(&[&v2, &alpha]).unwrap();
353
354        assert_eq!(paths.len(), 3);
355        assert!(paths.contains("/sui.rpc.v2.LedgerService/GetCheckpoint"));
356        assert!(paths.contains("/sui.rpc.v2.LedgerService/GetTransaction"));
357        assert!(paths.contains("/sui.rpc.alpha.EventService/Subscribe"));
358    }
359
360    #[test]
361    fn parser_handles_files_without_a_package() {
362        let bare = fds("", &[("BareService", &["Ping"])]);
363        let paths = grpc_method_paths_from_file_descriptor_sets(&[&bare]).unwrap();
364        assert!(paths.contains("/BareService/Ping"));
365    }
366
367    #[test]
368    fn known_grpc_method_uses_uri_path_label() {
369        let mut allowlist = HashSet::new();
370        allowlist.insert("/sui.rpc.v2.LedgerService/GetCheckpoint".to_owned());
371
372        let label = compute_metric_label(
373            true,
374            "/sui.rpc.v2.LedgerService/GetCheckpoint",
375            Some("/sui.rpc.v2.LedgerService/{*rest}"),
376            &allowlist,
377        );
378        assert_eq!(label, "/sui.rpc.v2.LedgerService/GetCheckpoint");
379    }
380
381    #[test]
382    fn known_grpc_method_without_matched_path_uses_uri_path_label() {
383        let mut allowlist = HashSet::new();
384        allowlist.insert("/sui.rpc.v2alpha.LedgerService/ListTransactions".to_owned());
385
386        let label = compute_metric_label(
387            true,
388            "/sui.rpc.v2alpha.LedgerService/ListTransactions",
389            None,
390            &allowlist,
391        );
392        assert_eq!(label, "/sui.rpc.v2alpha.LedgerService/ListTransactions");
393    }
394
395    #[test]
396    fn unknown_grpc_method_falls_back_to_route_pattern() {
397        // Empty allowlist simulates an attacker hitting an unknown method
398        // under a registered service. The label must collapse onto the
399        // route pattern instead of the attacker-controlled URI path,
400        // otherwise the prometheus label map can be inflated without bound.
401        let allowlist = HashSet::new();
402        let label = compute_metric_label(
403            true,
404            "/sui.rpc.v2.LedgerService/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
405            Some("/sui.rpc.v2.LedgerService/{*rest}"),
406            &allowlist,
407        );
408        assert_eq!(label, "/sui.rpc.v2.LedgerService/{*rest}");
409    }
410
411    #[test]
412    fn non_grpc_request_uses_matched_path() {
413        let allowlist = HashSet::new();
414        let label = compute_metric_label(false, "/health", Some("/health"), &allowlist);
415        assert_eq!(label, "/health");
416    }
417
418    #[test]
419    fn request_without_matched_path_is_labelled_unknown() {
420        let allowlist = HashSet::new();
421        let label = compute_metric_label(true, "/no/match", None, &allowlist);
422        assert_eq!(label, "unknown");
423    }
424
425    #[test]
426    fn grpc_content_type_accepts_codec_suffixes() {
427        assert!(is_grpc_content_type(&http::HeaderValue::from_static(
428            "application/grpc"
429        )));
430        assert!(is_grpc_content_type(&http::HeaderValue::from_static(
431            "application/grpc+proto"
432        )));
433        assert!(!is_grpc_content_type(&http::HeaderValue::from_static(
434            "application/json"
435        )));
436    }
437}