1use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use mysten_metrics::{BYTES_BUCKETS, LATENCY_SEC_BUCKETS};
9use prometheus::{
10 HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
11 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
12 register_int_gauge_vec_with_registry,
13};
14use tonic::codegen::http::header::HeaderName;
15use tonic::codegen::http::{Request, Response};
16use tonic::{Code, Status};
17use tower_http::classify::GrpcFailureClass;
18use tower_http::trace::{OnFailure, OnRequest, OnResponse};
19use tracing::{Span, warn};
20
21pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
22
23pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
31 fn on_request(&self, path: String);
34
35 fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
41
42 fn on_start(&self, _path: &str) {}
44
45 fn on_drop(&self, _path: &str) {}
48}
49
50#[derive(Clone, Default)]
51pub struct DefaultMetricsCallbackProvider {}
52impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
53 fn on_request(&self, _path: String) {}
54
55 fn on_response(
56 &self,
57 _path: String,
58 _latency: Duration,
59 _status: u16,
60 _grpc_status_code: Code,
61 ) {
62 }
63}
64
65#[derive(Clone)]
66pub struct MetricsHandler<M: MetricsCallbackProvider> {
67 metrics_provider: M,
68}
69
70impl<M: MetricsCallbackProvider> MetricsHandler<M> {
71 pub fn new(metrics_provider: M) -> Self {
72 Self { metrics_provider }
73 }
74}
75
76impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
77 fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
78 let grpc_status = Status::from_header_map(response.headers());
79 let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
80
81 let path = response
82 .headers()
83 .get(&GRPC_ENDPOINT_PATH_HEADER)
84 .and_then(|v| v.to_str().ok())
85 .unwrap_or("invalid")
86 .to_string();
87
88 self.metrics_provider.on_response(
89 path,
90 latency,
91 response.status().as_u16(),
92 grpc_status_code,
93 );
94 }
95}
96
97impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
98 fn on_request(&mut self, request: &Request<B>, _span: &Span) {
99 self.metrics_provider
100 .on_request(request.uri().path().to_string());
101 }
102}
103
104impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
105 fn on_failure(
106 &mut self,
107 _failure_classification: GrpcFailureClass,
108 _latency: Duration,
109 _span: &Span,
110 ) {
111 }
113}
114
115#[derive(Clone)]
116pub struct NetworkMetrics {
117 requests: IntCounterVec,
119 request_latency: HistogramVec,
121 request_size: HistogramVec,
123 response_size: HistogramVec,
125 excessive_size_requests: IntCounterVec,
127 excessive_size_responses: IntCounterVec,
129 inflight_requests: IntGaugeVec,
131 errors: IntCounterVec,
133}
134
135impl NetworkMetrics {
136 pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
137 let requests = register_int_counter_vec_with_registry!(
138 format!("{node}_{direction}_requests"),
139 "The number of requests made on the network",
140 &["route"],
141 registry
142 )
143 .unwrap();
144
145 let request_latency = register_histogram_vec_with_registry!(
146 format!("{node}_{direction}_request_latency"),
147 "Latency of a request by route",
148 &["route"],
149 LATENCY_SEC_BUCKETS.to_vec(),
150 registry,
151 )
152 .unwrap();
153
154 let request_size = register_histogram_vec_with_registry!(
155 format!("{node}_{direction}_request_size"),
156 "Size of a request by route",
157 &["route"],
158 BYTES_BUCKETS.to_vec(),
159 registry,
160 )
161 .unwrap();
162
163 let response_size = register_histogram_vec_with_registry!(
164 format!("{node}_{direction}_response_size"),
165 "Size of a response by route",
166 &["route"],
167 BYTES_BUCKETS.to_vec(),
168 registry,
169 )
170 .unwrap();
171
172 let excessive_size_requests = register_int_counter_vec_with_registry!(
173 format!("{node}_{direction}_excessive_size_requests"),
174 "The number of excessively large request messages sent",
175 &["route"],
176 registry
177 )
178 .unwrap();
179
180 let excessive_size_responses = register_int_counter_vec_with_registry!(
181 format!("{node}_{direction}_excessive_size_responses"),
182 "The number of excessively large response messages seen",
183 &["route"],
184 registry
185 )
186 .unwrap();
187
188 let inflight_requests = register_int_gauge_vec_with_registry!(
189 format!("{node}_{direction}_inflight_requests"),
190 "The number of inflight network requests",
191 &["route"],
192 registry
193 )
194 .unwrap();
195
196 let errors = register_int_counter_vec_with_registry!(
197 format!("{node}_{direction}_request_errors"),
198 "Number of errors by route",
199 &["route", "status"],
200 registry,
201 )
202 .unwrap();
203
204 Self {
205 requests,
206 request_latency,
207 request_size,
208 response_size,
209 excessive_size_requests,
210 excessive_size_responses,
211 inflight_requests,
212 errors,
213 }
214 }
215}
216
217#[derive(Clone)]
218pub struct MetricsMakeCallbackHandler {
219 metrics: Arc<NetworkMetrics>,
220 excessive_message_size: usize,
222}
223
224impl MetricsMakeCallbackHandler {
225 pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
226 Self {
227 metrics,
228 excessive_message_size,
229 }
230 }
231}
232
233impl MakeCallbackHandler for MetricsMakeCallbackHandler {
234 type Handler = MetricsResponseHandler;
235
236 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
237 let route = request.route().to_owned();
238
239 self.metrics.requests.with_label_values(&[&route]).inc();
240 self.metrics
241 .inflight_requests
242 .with_label_values(&[&route])
243 .inc();
244 let body_len = request.body().len();
245 self.metrics
246 .request_size
247 .with_label_values(&[&route])
248 .observe(body_len as f64);
249 if body_len > self.excessive_message_size {
250 warn!(
251 "Saw excessively large request with size {body_len} for {route} with peer {:?}",
252 request.peer_id()
253 );
254 self.metrics
255 .excessive_size_requests
256 .with_label_values(&[&route])
257 .inc();
258 }
259
260 let timer = self
261 .metrics
262 .request_latency
263 .with_label_values(&[&route])
264 .start_timer();
265
266 MetricsResponseHandler {
267 metrics: self.metrics.clone(),
268 timer,
269 route,
270 excessive_message_size: self.excessive_message_size,
271 }
272 }
273}
274
275pub struct MetricsResponseHandler {
276 metrics: Arc<NetworkMetrics>,
277 #[allow(unused)]
279 timer: HistogramTimer,
280 route: String,
281 excessive_message_size: usize,
282}
283
284impl ResponseHandler for MetricsResponseHandler {
285 fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
286 let body_len = response.body().len();
287 self.metrics
288 .response_size
289 .with_label_values(&[&self.route])
290 .observe(body_len as f64);
291 if body_len > self.excessive_message_size {
292 warn!(
293 "Saw excessively large response with size {body_len} for {} with peer {:?}",
294 self.route,
295 response.peer_id()
296 );
297 self.metrics
298 .excessive_size_responses
299 .with_label_values(&[&self.route])
300 .inc();
301 }
302
303 if !response.status().is_success() {
304 let status = response.status().to_u16().to_string();
305 self.metrics
306 .errors
307 .with_label_values(&[&self.route, &status])
308 .inc();
309 }
310 }
311
312 fn on_error<E>(self, _error: &E) {
313 self.metrics
314 .errors
315 .with_label_values(&[self.route.as_str(), "unknown"])
316 .inc();
317 }
318}
319
320impl Drop for MetricsResponseHandler {
321 fn drop(&mut self) {
322 self.metrics
323 .inflight_requests
324 .with_label_values(&[&self.route])
325 .dec();
326 }
327}