sui_rpc_api/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::http;
5use std::{borrow::Cow, sync::Arc, time::Instant};
6
7use mysten_network::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9    HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13
14#[derive(Clone)]
15pub struct RpcMetrics {
16    inflight_requests: IntGaugeVec,
17    num_requests: IntCounterVec,
18    request_latency: HistogramVec,
19}
20
21const LATENCY_SEC_BUCKETS: &[f64] = &[
22    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
23];
24
25impl RpcMetrics {
26    pub fn new(registry: &Registry) -> Self {
27        Self {
28            inflight_requests: register_int_gauge_vec_with_registry!(
29                "rpc_inflight_requests",
30                "Total in-flight RPC requests per route",
31                &["path"],
32                registry,
33            )
34            .unwrap(),
35            num_requests: register_int_counter_vec_with_registry!(
36                "rpc_requests",
37                "Total RPC requests per route and their http status",
38                &["path", "status"],
39                registry,
40            )
41            .unwrap(),
42            request_latency: register_histogram_vec_with_registry!(
43                "rpc_request_latency",
44                "Latency of RPC requests per route",
45                &["path"],
46                LATENCY_SEC_BUCKETS.to_vec(),
47                registry,
48            )
49            .unwrap(),
50        }
51    }
52}
53
54#[derive(Clone)]
55pub struct RpcMetricsMakeCallbackHandler {
56    metrics: Arc<RpcMetrics>,
57}
58
59impl RpcMetricsMakeCallbackHandler {
60    pub fn new(metrics: Arc<RpcMetrics>) -> Self {
61        Self { metrics }
62    }
63}
64
65impl MakeCallbackHandler for RpcMetricsMakeCallbackHandler {
66    type Handler = RpcMetricsCallbackHandler;
67
68    fn make_handler(&self, request: &http::request::Parts) -> Self::Handler {
69        let start = Instant::now();
70        let metrics = self.metrics.clone();
71
72        let path =
73            if let Some(matched_path) = request.extensions.get::<axum::extract::MatchedPath>() {
74                if request
75                    .headers
76                    .get(&http::header::CONTENT_TYPE)
77                    .is_some_and(|header| header == tonic::metadata::GRPC_CONTENT_TYPE)
78                {
79                    Cow::Owned(request.uri.path().to_owned())
80                } else {
81                    Cow::Owned(matched_path.as_str().to_owned())
82                }
83            } else {
84                Cow::Borrowed("unknown")
85            };
86
87        metrics
88            .inflight_requests
89            .with_label_values(&[path.as_ref()])
90            .inc();
91
92        RpcMetricsCallbackHandler {
93            metrics,
94            path,
95            start,
96            counted_response: false,
97        }
98    }
99}
100
101pub struct RpcMetricsCallbackHandler {
102    metrics: Arc<RpcMetrics>,
103    path: Cow<'static, str>,
104    start: Instant,
105    // Indicates if we successfully counted the response. In some cases when a request is
106    // prematurely canceled this will remain false
107    counted_response: bool,
108}
109
110impl ResponseHandler for RpcMetricsCallbackHandler {
111    fn on_response(&mut self, response: &http::response::Parts) {
112        const GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
113
114        let status = if response
115            .headers
116            .get(&http::header::CONTENT_TYPE)
117            .is_some_and(|content_type| {
118                content_type
119                    .as_bytes()
120                    // check if the content-type starts_with 'application/grpc' in order to
121                    // consider this as a gRPC request. A prefix comparison is done instead of a
122                    // full equality check in order to account for the various types of
123                    // content-types that are considered as gRPC traffic.
124                    .starts_with(tonic::metadata::GRPC_CONTENT_TYPE.as_bytes())
125            }) {
126            let code = response
127                .headers
128                .get(&GRPC_STATUS)
129                .map(http::HeaderValue::as_bytes)
130                .map(tonic::Code::from_bytes)
131                .unwrap_or(tonic::Code::Ok);
132
133            code_as_str(code)
134        } else {
135            response.status.as_str()
136        };
137
138        self.metrics
139            .num_requests
140            .with_label_values(&[self.path.as_ref(), status])
141            .inc();
142
143        self.counted_response = true;
144    }
145
146    fn on_error<E>(&mut self, _error: &E) {
147        // Do nothing if the whole service errored
148        //
149        // in Axum this isn't possible since all services are required to have an error type of
150        // Infallible
151    }
152}
153
154impl Drop for RpcMetricsCallbackHandler {
155    fn drop(&mut self) {
156        self.metrics
157            .inflight_requests
158            .with_label_values(&[self.path.as_ref()])
159            .dec();
160
161        let latency = self.start.elapsed().as_secs_f64();
162        self.metrics
163            .request_latency
164            .with_label_values(&[self.path.as_ref()])
165            .observe(latency);
166
167        if !self.counted_response {
168            self.metrics
169                .num_requests
170                .with_label_values(&[self.path.as_ref(), "canceled"])
171                .inc();
172        }
173    }
174}
175
176fn code_as_str(code: tonic::Code) -> &'static str {
177    match code {
178        tonic::Code::Ok => "ok",
179        tonic::Code::Cancelled => "canceled",
180        tonic::Code::Unknown => "unknown",
181        tonic::Code::InvalidArgument => "invalid-argument",
182        tonic::Code::DeadlineExceeded => "deadline-exceeded",
183        tonic::Code::NotFound => "not-found",
184        tonic::Code::AlreadyExists => "already-exists",
185        tonic::Code::PermissionDenied => "permission-denied",
186        tonic::Code::ResourceExhausted => "resource-exhausted",
187        tonic::Code::FailedPrecondition => "failed-precondition",
188        tonic::Code::Aborted => "aborted",
189        tonic::Code::OutOfRange => "out-of-range",
190        tonic::Code::Unimplemented => "unimplemented",
191        tonic::Code::Internal => "internal",
192        tonic::Code::Unavailable => "unavailable",
193        tonic::Code::DataLoss => "data-loss",
194        tonic::Code::Unauthenticated => "unauthenticated",
195    }
196}
197
198#[derive(Clone)]
199pub(crate) struct SubscriptionMetrics {
200    pub inflight_subscribers: IntGauge,
201    pub last_recieved_checkpoint: IntGauge,
202}
203
204impl SubscriptionMetrics {
205    pub fn new(registry: &Registry) -> Self {
206        Self {
207            inflight_subscribers: register_int_gauge_with_registry!(
208                "subscription_inflight_subscribers",
209                "Total in-flight subscriptions",
210                registry,
211            )
212            .unwrap(),
213            last_recieved_checkpoint: register_int_gauge_with_registry!(
214                "subscription_last_recieved_checkpoint",
215                "Last recieved checkpoint by the subscription service",
216                registry,
217            )
218            .unwrap(),
219        }
220    }
221}