1use axum::http;
5use std::{borrow::Cow, sync::Arc, time::Instant};
6
7use mysten_network::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9 HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
10 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
12};
13
14#[derive(Clone)]
15pub struct RpcMetrics {
16 inflight_requests: IntGaugeVec,
17 num_requests: IntCounterVec,
18 request_latency: HistogramVec,
19}
20
21const LATENCY_SEC_BUCKETS: &[f64] = &[
22 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
23];
24
25impl RpcMetrics {
26 pub fn new(registry: &Registry) -> Self {
27 Self {
28 inflight_requests: register_int_gauge_vec_with_registry!(
29 "rpc_inflight_requests",
30 "Total in-flight RPC requests per route",
31 &["path"],
32 registry,
33 )
34 .unwrap(),
35 num_requests: register_int_counter_vec_with_registry!(
36 "rpc_requests",
37 "Total RPC requests per route and their http status",
38 &["path", "status"],
39 registry,
40 )
41 .unwrap(),
42 request_latency: register_histogram_vec_with_registry!(
43 "rpc_request_latency",
44 "Latency of RPC requests per route",
45 &["path"],
46 LATENCY_SEC_BUCKETS.to_vec(),
47 registry,
48 )
49 .unwrap(),
50 }
51 }
52}
53
54#[derive(Clone)]
55pub struct RpcMetricsMakeCallbackHandler {
56 metrics: Arc<RpcMetrics>,
57}
58
59impl RpcMetricsMakeCallbackHandler {
60 pub fn new(metrics: Arc<RpcMetrics>) -> Self {
61 Self { metrics }
62 }
63}
64
65impl MakeCallbackHandler for RpcMetricsMakeCallbackHandler {
66 type Handler = RpcMetricsCallbackHandler;
67
68 fn make_handler(&self, request: &http::request::Parts) -> Self::Handler {
69 let start = Instant::now();
70 let metrics = self.metrics.clone();
71
72 let path =
73 if let Some(matched_path) = request.extensions.get::<axum::extract::MatchedPath>() {
74 if request
75 .headers
76 .get(&http::header::CONTENT_TYPE)
77 .is_some_and(|header| header == tonic::metadata::GRPC_CONTENT_TYPE)
78 {
79 Cow::Owned(request.uri.path().to_owned())
80 } else {
81 Cow::Owned(matched_path.as_str().to_owned())
82 }
83 } else {
84 Cow::Borrowed("unknown")
85 };
86
87 metrics
88 .inflight_requests
89 .with_label_values(&[path.as_ref()])
90 .inc();
91
92 RpcMetricsCallbackHandler {
93 metrics,
94 path,
95 start,
96 counted_response: false,
97 }
98 }
99}
100
101pub struct RpcMetricsCallbackHandler {
102 metrics: Arc<RpcMetrics>,
103 path: Cow<'static, str>,
104 start: Instant,
105 counted_response: bool,
108}
109
110impl ResponseHandler for RpcMetricsCallbackHandler {
111 fn on_response(&mut self, response: &http::response::Parts) {
112 const GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
113
114 let status = if response
115 .headers
116 .get(&http::header::CONTENT_TYPE)
117 .is_some_and(|content_type| {
118 content_type
119 .as_bytes()
120 .starts_with(tonic::metadata::GRPC_CONTENT_TYPE.as_bytes())
125 }) {
126 let code = response
127 .headers
128 .get(&GRPC_STATUS)
129 .map(http::HeaderValue::as_bytes)
130 .map(tonic::Code::from_bytes)
131 .unwrap_or(tonic::Code::Ok);
132
133 code_as_str(code)
134 } else {
135 response.status.as_str()
136 };
137
138 self.metrics
139 .num_requests
140 .with_label_values(&[self.path.as_ref(), status])
141 .inc();
142
143 self.counted_response = true;
144 }
145
146 fn on_error<E>(&mut self, _error: &E) {
147 }
152}
153
154impl Drop for RpcMetricsCallbackHandler {
155 fn drop(&mut self) {
156 self.metrics
157 .inflight_requests
158 .with_label_values(&[self.path.as_ref()])
159 .dec();
160
161 let latency = self.start.elapsed().as_secs_f64();
162 self.metrics
163 .request_latency
164 .with_label_values(&[self.path.as_ref()])
165 .observe(latency);
166
167 if !self.counted_response {
168 self.metrics
169 .num_requests
170 .with_label_values(&[self.path.as_ref(), "canceled"])
171 .inc();
172 }
173 }
174}
175
176fn code_as_str(code: tonic::Code) -> &'static str {
177 match code {
178 tonic::Code::Ok => "ok",
179 tonic::Code::Cancelled => "canceled",
180 tonic::Code::Unknown => "unknown",
181 tonic::Code::InvalidArgument => "invalid-argument",
182 tonic::Code::DeadlineExceeded => "deadline-exceeded",
183 tonic::Code::NotFound => "not-found",
184 tonic::Code::AlreadyExists => "already-exists",
185 tonic::Code::PermissionDenied => "permission-denied",
186 tonic::Code::ResourceExhausted => "resource-exhausted",
187 tonic::Code::FailedPrecondition => "failed-precondition",
188 tonic::Code::Aborted => "aborted",
189 tonic::Code::OutOfRange => "out-of-range",
190 tonic::Code::Unimplemented => "unimplemented",
191 tonic::Code::Internal => "internal",
192 tonic::Code::Unavailable => "unavailable",
193 tonic::Code::DataLoss => "data-loss",
194 tonic::Code::Unauthenticated => "unauthenticated",
195 }
196}
197
198#[derive(Clone)]
199pub(crate) struct SubscriptionMetrics {
200 pub inflight_subscribers: IntGauge,
201 pub last_recieved_checkpoint: IntGauge,
202}
203
204impl SubscriptionMetrics {
205 pub fn new(registry: &Registry) -> Self {
206 Self {
207 inflight_subscribers: register_int_gauge_with_registry!(
208 "subscription_inflight_subscribers",
209 "Total in-flight subscriptions",
210 registry,
211 )
212 .unwrap(),
213 last_recieved_checkpoint: register_int_gauge_with_registry!(
214 "subscription_last_recieved_checkpoint",
215 "Last recieved checkpoint by the subscription service",
216 registry,
217 )
218 .unwrap(),
219 }
220 }
221}