1use axum::http;
5use std::{borrow::Cow, collections::HashSet, sync::Arc, time::Instant};
6
7use mysten_network::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9 HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13use prost::Message;
14
15#[derive(Clone)]
16pub struct RpcMetrics {
17 inflight_requests: IntGaugeVec,
18 num_requests: IntCounterVec,
19 request_latency: HistogramVec,
20}
21
22const LATENCY_SEC_BUCKETS: &[f64] = &[
23 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
24];
25
26impl RpcMetrics {
27 pub fn new(registry: &Registry) -> Self {
28 Self {
29 inflight_requests: register_int_gauge_vec_with_registry!(
30 "rpc_inflight_requests",
31 "Total in-flight RPC requests per route",
32 &["path"],
33 registry,
34 )
35 .unwrap(),
36 num_requests: register_int_counter_vec_with_registry!(
37 "rpc_requests",
38 "Total RPC requests per route and their http status",
39 &["path", "status"],
40 registry,
41 )
42 .unwrap(),
43 request_latency: register_histogram_vec_with_registry!(
44 "rpc_request_latency",
45 "Latency of RPC requests per route",
46 &["path"],
47 LATENCY_SEC_BUCKETS.to_vec(),
48 registry,
49 )
50 .unwrap(),
51 }
52 }
53}
54
55pub type GrpcMethodAllowlist = Arc<HashSet<String>>;
65
66pub fn grpc_method_paths_from_file_descriptor_sets(
73 encoded_sets: &[&[u8]],
74) -> Result<HashSet<String>, prost::DecodeError> {
75 let mut paths = HashSet::new();
76 for bytes in encoded_sets {
77 let fds = prost_types::FileDescriptorSet::decode(*bytes)?;
78 for file in fds.file {
79 let package = file.package.unwrap_or_default();
80 for service in file.service {
81 let Some(service_name) = service.name else {
82 continue;
83 };
84 let qualified_service = if package.is_empty() {
85 service_name
86 } else {
87 format!("{}.{}", package, service_name)
88 };
89 for method in service.method {
90 let Some(method_name) = method.name else {
91 continue;
92 };
93 paths.insert(format!("/{}/{}", qualified_service, method_name));
94 }
95 }
96 }
97 }
98 Ok(paths)
99}
100
101#[derive(Clone)]
102pub struct RpcMetricsMakeCallbackHandler {
103 metrics: Arc<RpcMetrics>,
104 grpc_method_allowlist: GrpcMethodAllowlist,
105}
106
107impl RpcMetricsMakeCallbackHandler {
108 pub fn new(metrics: Arc<RpcMetrics>) -> Self {
113 Self::with_grpc_method_allowlist(metrics, Arc::new(HashSet::new()))
114 }
115
116 pub fn with_grpc_method_allowlist(
119 metrics: Arc<RpcMetrics>,
120 allowlist: GrpcMethodAllowlist,
121 ) -> Self {
122 Self {
123 metrics,
124 grpc_method_allowlist: allowlist,
125 }
126 }
127}
128
129impl MakeCallbackHandler for RpcMetricsMakeCallbackHandler {
130 type Handler = RpcMetricsCallbackHandler;
131
132 fn make_handler(&self, request: &http::request::Parts) -> Self::Handler {
133 let start = Instant::now();
134 let metrics = self.metrics.clone();
135
136 let matched_path = request
137 .extensions
138 .get::<axum::extract::MatchedPath>()
139 .map(|m| m.as_str());
140 let is_grpc = request
141 .headers
142 .get(&http::header::CONTENT_TYPE)
143 .is_some_and(is_grpc_content_type);
144
145 let path = compute_metric_label(
146 is_grpc,
147 request.uri.path(),
148 matched_path,
149 &self.grpc_method_allowlist,
150 );
151
152 metrics
153 .inflight_requests
154 .with_label_values(&[path.as_ref()])
155 .inc();
156
157 RpcMetricsCallbackHandler {
158 metrics,
159 path,
160 start,
161 counted_response: false,
162 }
163 }
164}
165
166fn compute_metric_label(
174 is_grpc: bool,
175 uri_path: &str,
176 matched_path: Option<&str>,
177 grpc_method_allowlist: &HashSet<String>,
178) -> Cow<'static, str> {
179 match (is_grpc, matched_path) {
180 (true, _) if grpc_method_allowlist.contains(uri_path) => Cow::Owned(uri_path.to_owned()),
181 (true, Some(matched)) => Cow::Owned(matched.to_owned()),
182 (false, Some(matched)) => Cow::Owned(matched.to_owned()),
183 (_, None) => Cow::Borrowed("unknown"),
184 }
185}
186
187fn is_grpc_content_type(content_type: &http::HeaderValue) -> bool {
188 content_type
189 .as_bytes()
190 .starts_with(tonic::metadata::GRPC_CONTENT_TYPE.as_bytes())
191}
192
193pub struct RpcMetricsCallbackHandler {
194 metrics: Arc<RpcMetrics>,
195 path: Cow<'static, str>,
196 start: Instant,
197 counted_response: bool,
200}
201
202impl ResponseHandler for RpcMetricsCallbackHandler {
203 fn on_response(&mut self, response: &http::response::Parts) {
204 const GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
205
206 let status = if response
207 .headers
208 .get(&http::header::CONTENT_TYPE)
209 .is_some_and(is_grpc_content_type)
210 {
211 let code = response
212 .headers
213 .get(&GRPC_STATUS)
214 .map(http::HeaderValue::as_bytes)
215 .map(tonic::Code::from_bytes)
216 .unwrap_or(tonic::Code::Ok);
217
218 code_as_str(code)
219 } else {
220 response.status.as_str()
221 };
222
223 self.metrics
224 .num_requests
225 .with_label_values(&[self.path.as_ref(), status])
226 .inc();
227
228 self.counted_response = true;
229 }
230
231 fn on_error<E>(&mut self, _error: &E) {
232 }
237}
238
239impl Drop for RpcMetricsCallbackHandler {
240 fn drop(&mut self) {
241 self.metrics
242 .inflight_requests
243 .with_label_values(&[self.path.as_ref()])
244 .dec();
245
246 let latency = self.start.elapsed().as_secs_f64();
247 self.metrics
248 .request_latency
249 .with_label_values(&[self.path.as_ref()])
250 .observe(latency);
251
252 if !self.counted_response {
253 self.metrics
254 .num_requests
255 .with_label_values(&[self.path.as_ref(), "canceled"])
256 .inc();
257 }
258 }
259}
260
261fn code_as_str(code: tonic::Code) -> &'static str {
262 match code {
263 tonic::Code::Ok => "ok",
264 tonic::Code::Cancelled => "canceled",
265 tonic::Code::Unknown => "unknown",
266 tonic::Code::InvalidArgument => "invalid-argument",
267 tonic::Code::DeadlineExceeded => "deadline-exceeded",
268 tonic::Code::NotFound => "not-found",
269 tonic::Code::AlreadyExists => "already-exists",
270 tonic::Code::PermissionDenied => "permission-denied",
271 tonic::Code::ResourceExhausted => "resource-exhausted",
272 tonic::Code::FailedPrecondition => "failed-precondition",
273 tonic::Code::Aborted => "aborted",
274 tonic::Code::OutOfRange => "out-of-range",
275 tonic::Code::Unimplemented => "unimplemented",
276 tonic::Code::Internal => "internal",
277 tonic::Code::Unavailable => "unavailable",
278 tonic::Code::DataLoss => "data-loss",
279 tonic::Code::Unauthenticated => "unauthenticated",
280 }
281}
282
283#[derive(Clone)]
284pub(crate) struct SubscriptionMetrics {
285 pub inflight_subscribers: IntGauge,
286 pub last_recieved_checkpoint: IntGauge,
287}
288
289impl SubscriptionMetrics {
290 pub fn new(registry: &Registry) -> Self {
291 Self {
292 inflight_subscribers: register_int_gauge_with_registry!(
293 "subscription_inflight_subscribers",
294 "Total in-flight subscriptions",
295 registry,
296 )
297 .unwrap(),
298 last_recieved_checkpoint: register_int_gauge_with_registry!(
299 "subscription_last_recieved_checkpoint",
300 "Last recieved checkpoint by the subscription service",
301 registry,
302 )
303 .unwrap(),
304 }
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use prost_types::{
312 FileDescriptorProto, FileDescriptorSet, MethodDescriptorProto, ServiceDescriptorProto,
313 };
314
315 fn encode(set: FileDescriptorSet) -> Vec<u8> {
316 let mut buf = Vec::with_capacity(set.encoded_len());
317 set.encode(&mut buf).unwrap();
318 buf
319 }
320
321 fn fds(package: &str, services: &[(&str, &[&str])]) -> Vec<u8> {
322 encode(FileDescriptorSet {
323 file: vec![FileDescriptorProto {
324 package: Some(package.to_owned()),
325 service: services
326 .iter()
327 .map(|(name, methods)| ServiceDescriptorProto {
328 name: Some((*name).to_owned()),
329 method: methods
330 .iter()
331 .map(|m| MethodDescriptorProto {
332 name: Some((*m).to_owned()),
333 ..Default::default()
334 })
335 .collect(),
336 ..Default::default()
337 })
338 .collect(),
339 ..Default::default()
340 }],
341 })
342 }
343
344 #[test]
345 fn parses_method_paths_from_file_descriptor_sets() {
346 let v2 = fds(
347 "sui.rpc.v2",
348 &[("LedgerService", &["GetCheckpoint", "GetTransaction"])],
349 );
350 let alpha = fds("sui.rpc.alpha", &[("EventService", &["Subscribe"])]);
351
352 let paths = grpc_method_paths_from_file_descriptor_sets(&[&v2, &alpha]).unwrap();
353
354 assert_eq!(paths.len(), 3);
355 assert!(paths.contains("/sui.rpc.v2.LedgerService/GetCheckpoint"));
356 assert!(paths.contains("/sui.rpc.v2.LedgerService/GetTransaction"));
357 assert!(paths.contains("/sui.rpc.alpha.EventService/Subscribe"));
358 }
359
360 #[test]
361 fn parser_handles_files_without_a_package() {
362 let bare = fds("", &[("BareService", &["Ping"])]);
363 let paths = grpc_method_paths_from_file_descriptor_sets(&[&bare]).unwrap();
364 assert!(paths.contains("/BareService/Ping"));
365 }
366
367 #[test]
368 fn known_grpc_method_uses_uri_path_label() {
369 let mut allowlist = HashSet::new();
370 allowlist.insert("/sui.rpc.v2.LedgerService/GetCheckpoint".to_owned());
371
372 let label = compute_metric_label(
373 true,
374 "/sui.rpc.v2.LedgerService/GetCheckpoint",
375 Some("/sui.rpc.v2.LedgerService/{*rest}"),
376 &allowlist,
377 );
378 assert_eq!(label, "/sui.rpc.v2.LedgerService/GetCheckpoint");
379 }
380
381 #[test]
382 fn known_grpc_method_without_matched_path_uses_uri_path_label() {
383 let mut allowlist = HashSet::new();
384 allowlist.insert("/sui.rpc.v2alpha.LedgerService/ListTransactions".to_owned());
385
386 let label = compute_metric_label(
387 true,
388 "/sui.rpc.v2alpha.LedgerService/ListTransactions",
389 None,
390 &allowlist,
391 );
392 assert_eq!(label, "/sui.rpc.v2alpha.LedgerService/ListTransactions");
393 }
394
395 #[test]
396 fn unknown_grpc_method_falls_back_to_route_pattern() {
397 let allowlist = HashSet::new();
402 let label = compute_metric_label(
403 true,
404 "/sui.rpc.v2.LedgerService/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
405 Some("/sui.rpc.v2.LedgerService/{*rest}"),
406 &allowlist,
407 );
408 assert_eq!(label, "/sui.rpc.v2.LedgerService/{*rest}");
409 }
410
411 #[test]
412 fn non_grpc_request_uses_matched_path() {
413 let allowlist = HashSet::new();
414 let label = compute_metric_label(false, "/health", Some("/health"), &allowlist);
415 assert_eq!(label, "/health");
416 }
417
418 #[test]
419 fn request_without_matched_path_is_labelled_unknown() {
420 let allowlist = HashSet::new();
421 let label = compute_metric_label(true, "/no/match", None, &allowlist);
422 assert_eq!(label, "unknown");
423 }
424
425 #[test]
426 fn grpc_content_type_accepts_codec_suffixes() {
427 assert!(is_grpc_content_type(&http::HeaderValue::from_static(
428 "application/grpc"
429 )));
430 assert!(is_grpc_content_type(&http::HeaderValue::from_static(
431 "application/grpc+proto"
432 )));
433 assert!(!is_grpc_content_type(&http::HeaderValue::from_static(
434 "application/json"
435 )));
436 }
437}