1use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use mysten_metrics::{BYTES_BUCKETS, LATENCY_SEC_BUCKETS};
9use prometheus::{
10 HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
11 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
12 register_int_gauge_vec_with_registry,
13};
14use tonic::codegen::http::header::HeaderName;
15use tonic::codegen::http::{HeaderValue, Request, Response};
16use tonic::{Code, Status};
17use tower_http::classify::GrpcFailureClass;
18use tower_http::trace::{OnFailure, OnRequest, OnResponse};
19use tracing::{Span, warn};
20
21pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
22
23pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
31 fn on_request(&self, path: String);
34
35 fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
41
42 fn on_start(&self, _path: &str) {}
44
45 fn on_drop(&self, _path: &str) {}
48}
49
50#[derive(Clone, Default)]
51pub struct DefaultMetricsCallbackProvider {}
52impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
53 fn on_request(&self, _path: String) {}
54
55 fn on_response(
56 &self,
57 _path: String,
58 _latency: Duration,
59 _status: u16,
60 _grpc_status_code: Code,
61 ) {
62 }
63}
64
65#[derive(Clone)]
66pub struct MetricsHandler<M: MetricsCallbackProvider> {
67 metrics_provider: M,
68}
69
70impl<M: MetricsCallbackProvider> MetricsHandler<M> {
71 pub fn new(metrics_provider: M) -> Self {
72 Self { metrics_provider }
73 }
74}
75
76impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
77 fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
78 let grpc_status = Status::from_header_map(response.headers());
79 let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
80
81 let path: HeaderValue = response
82 .headers()
83 .get(&GRPC_ENDPOINT_PATH_HEADER)
84 .unwrap()
85 .clone();
86
87 self.metrics_provider.on_response(
88 path.to_str().unwrap().to_string(),
89 latency,
90 response.status().as_u16(),
91 grpc_status_code,
92 );
93 }
94}
95
96impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
97 fn on_request(&mut self, request: &Request<B>, _span: &Span) {
98 self.metrics_provider
99 .on_request(request.uri().path().to_string());
100 }
101}
102
103impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
104 fn on_failure(
105 &mut self,
106 _failure_classification: GrpcFailureClass,
107 _latency: Duration,
108 _span: &Span,
109 ) {
110 }
112}
113
114#[derive(Clone)]
115pub struct NetworkMetrics {
116 requests: IntCounterVec,
118 request_latency: HistogramVec,
120 request_size: HistogramVec,
122 response_size: HistogramVec,
124 excessive_size_requests: IntCounterVec,
126 excessive_size_responses: IntCounterVec,
128 inflight_requests: IntGaugeVec,
130 errors: IntCounterVec,
132}
133
134impl NetworkMetrics {
135 pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
136 let requests = register_int_counter_vec_with_registry!(
137 format!("{node}_{direction}_requests"),
138 "The number of requests made on the network",
139 &["route"],
140 registry
141 )
142 .unwrap();
143
144 let request_latency = register_histogram_vec_with_registry!(
145 format!("{node}_{direction}_request_latency"),
146 "Latency of a request by route",
147 &["route"],
148 LATENCY_SEC_BUCKETS.to_vec(),
149 registry,
150 )
151 .unwrap();
152
153 let request_size = register_histogram_vec_with_registry!(
154 format!("{node}_{direction}_request_size"),
155 "Size of a request by route",
156 &["route"],
157 BYTES_BUCKETS.to_vec(),
158 registry,
159 )
160 .unwrap();
161
162 let response_size = register_histogram_vec_with_registry!(
163 format!("{node}_{direction}_response_size"),
164 "Size of a response by route",
165 &["route"],
166 BYTES_BUCKETS.to_vec(),
167 registry,
168 )
169 .unwrap();
170
171 let excessive_size_requests = register_int_counter_vec_with_registry!(
172 format!("{node}_{direction}_excessive_size_requests"),
173 "The number of excessively large request messages sent",
174 &["route"],
175 registry
176 )
177 .unwrap();
178
179 let excessive_size_responses = register_int_counter_vec_with_registry!(
180 format!("{node}_{direction}_excessive_size_responses"),
181 "The number of excessively large response messages seen",
182 &["route"],
183 registry
184 )
185 .unwrap();
186
187 let inflight_requests = register_int_gauge_vec_with_registry!(
188 format!("{node}_{direction}_inflight_requests"),
189 "The number of inflight network requests",
190 &["route"],
191 registry
192 )
193 .unwrap();
194
195 let errors = register_int_counter_vec_with_registry!(
196 format!("{node}_{direction}_request_errors"),
197 "Number of errors by route",
198 &["route", "status"],
199 registry,
200 )
201 .unwrap();
202
203 Self {
204 requests,
205 request_latency,
206 request_size,
207 response_size,
208 excessive_size_requests,
209 excessive_size_responses,
210 inflight_requests,
211 errors,
212 }
213 }
214}
215
216#[derive(Clone)]
217pub struct MetricsMakeCallbackHandler {
218 metrics: Arc<NetworkMetrics>,
219 excessive_message_size: usize,
221}
222
223impl MetricsMakeCallbackHandler {
224 pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
225 Self {
226 metrics,
227 excessive_message_size,
228 }
229 }
230}
231
232impl MakeCallbackHandler for MetricsMakeCallbackHandler {
233 type Handler = MetricsResponseHandler;
234
235 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
236 let route = request.route().to_owned();
237
238 self.metrics.requests.with_label_values(&[&route]).inc();
239 self.metrics
240 .inflight_requests
241 .with_label_values(&[&route])
242 .inc();
243 let body_len = request.body().len();
244 self.metrics
245 .request_size
246 .with_label_values(&[&route])
247 .observe(body_len as f64);
248 if body_len > self.excessive_message_size {
249 warn!(
250 "Saw excessively large request with size {body_len} for {route} with peer {:?}",
251 request.peer_id()
252 );
253 self.metrics
254 .excessive_size_requests
255 .with_label_values(&[&route])
256 .inc();
257 }
258
259 let timer = self
260 .metrics
261 .request_latency
262 .with_label_values(&[&route])
263 .start_timer();
264
265 MetricsResponseHandler {
266 metrics: self.metrics.clone(),
267 timer,
268 route,
269 excessive_message_size: self.excessive_message_size,
270 }
271 }
272}
273
274pub struct MetricsResponseHandler {
275 metrics: Arc<NetworkMetrics>,
276 #[allow(unused)]
278 timer: HistogramTimer,
279 route: String,
280 excessive_message_size: usize,
281}
282
283impl ResponseHandler for MetricsResponseHandler {
284 fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
285 let body_len = response.body().len();
286 self.metrics
287 .response_size
288 .with_label_values(&[&self.route])
289 .observe(body_len as f64);
290 if body_len > self.excessive_message_size {
291 warn!(
292 "Saw excessively large response with size {body_len} for {} with peer {:?}",
293 self.route,
294 response.peer_id()
295 );
296 self.metrics
297 .excessive_size_responses
298 .with_label_values(&[&self.route])
299 .inc();
300 }
301
302 if !response.status().is_success() {
303 let status = response.status().to_u16().to_string();
304 self.metrics
305 .errors
306 .with_label_values(&[&self.route, &status])
307 .inc();
308 }
309 }
310
311 fn on_error<E>(self, _error: &E) {
312 self.metrics
313 .errors
314 .with_label_values(&[&self.route, "unknown"])
315 .inc();
316 }
317}
318
319impl Drop for MetricsResponseHandler {
320 fn drop(&mut self) {
321 self.metrics
322 .inflight_requests
323 .with_label_values(&[&self.route])
324 .dec();
325 }
326}