1use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9 HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
10 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11 register_int_gauge_vec_with_registry,
12};
13use tonic::codegen::http::header::HeaderName;
14use tonic::codegen::http::{HeaderValue, Request, Response};
15use tonic::{Code, Status};
16use tower_http::classify::GrpcFailureClass;
17use tower_http::trace::{OnFailure, OnRequest, OnResponse};
18use tracing::{Span, warn};
19
20const LATENCY_SEC_BUCKETS: &[f64] = &[
21 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
22];
23
24const SIZE_BYTE_BUCKETS: &[f64] = &[
27 2048., 8192., 16384., 32768., 65536., 131072., 262144., 524288., 1048576., 1572864., 2359256., 3538944., 4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
31 48787348., 63423553., ];
33
34pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
35
36pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
44 fn on_request(&self, path: String);
47
48 fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
54
55 fn on_start(&self, _path: &str) {}
57
58 fn on_drop(&self, _path: &str) {}
61}
62
63#[derive(Clone, Default)]
64pub struct DefaultMetricsCallbackProvider {}
65impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
66 fn on_request(&self, _path: String) {}
67
68 fn on_response(
69 &self,
70 _path: String,
71 _latency: Duration,
72 _status: u16,
73 _grpc_status_code: Code,
74 ) {
75 }
76}
77
78#[derive(Clone)]
79pub struct MetricsHandler<M: MetricsCallbackProvider> {
80 metrics_provider: M,
81}
82
83impl<M: MetricsCallbackProvider> MetricsHandler<M> {
84 pub fn new(metrics_provider: M) -> Self {
85 Self { metrics_provider }
86 }
87}
88
89impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
90 fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
91 let grpc_status = Status::from_header_map(response.headers());
92 let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
93
94 let path: HeaderValue = response
95 .headers()
96 .get(&GRPC_ENDPOINT_PATH_HEADER)
97 .unwrap()
98 .clone();
99
100 self.metrics_provider.on_response(
101 path.to_str().unwrap().to_string(),
102 latency,
103 response.status().as_u16(),
104 grpc_status_code,
105 );
106 }
107}
108
109impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
110 fn on_request(&mut self, request: &Request<B>, _span: &Span) {
111 self.metrics_provider
112 .on_request(request.uri().path().to_string());
113 }
114}
115
116impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
117 fn on_failure(
118 &mut self,
119 _failure_classification: GrpcFailureClass,
120 _latency: Duration,
121 _span: &Span,
122 ) {
123 }
125}
126
127#[derive(Clone)]
128pub struct NetworkMetrics {
129 requests: IntCounterVec,
131 request_latency: HistogramVec,
133 request_size: HistogramVec,
135 response_size: HistogramVec,
137 excessive_size_requests: IntCounterVec,
139 excessive_size_responses: IntCounterVec,
141 inflight_requests: IntGaugeVec,
143 errors: IntCounterVec,
145}
146
147impl NetworkMetrics {
148 pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
149 let requests = register_int_counter_vec_with_registry!(
150 format!("{node}_{direction}_requests"),
151 "The number of requests made on the network",
152 &["route"],
153 registry
154 )
155 .unwrap();
156
157 let request_latency = register_histogram_vec_with_registry!(
158 format!("{node}_{direction}_request_latency"),
159 "Latency of a request by route",
160 &["route"],
161 LATENCY_SEC_BUCKETS.to_vec(),
162 registry,
163 )
164 .unwrap();
165
166 let request_size = register_histogram_vec_with_registry!(
167 format!("{node}_{direction}_request_size"),
168 "Size of a request by route",
169 &["route"],
170 SIZE_BYTE_BUCKETS.to_vec(),
171 registry,
172 )
173 .unwrap();
174
175 let response_size = register_histogram_vec_with_registry!(
176 format!("{node}_{direction}_response_size"),
177 "Size of a response by route",
178 &["route"],
179 SIZE_BYTE_BUCKETS.to_vec(),
180 registry,
181 )
182 .unwrap();
183
184 let excessive_size_requests = register_int_counter_vec_with_registry!(
185 format!("{node}_{direction}_excessive_size_requests"),
186 "The number of excessively large request messages sent",
187 &["route"],
188 registry
189 )
190 .unwrap();
191
192 let excessive_size_responses = register_int_counter_vec_with_registry!(
193 format!("{node}_{direction}_excessive_size_responses"),
194 "The number of excessively large response messages seen",
195 &["route"],
196 registry
197 )
198 .unwrap();
199
200 let inflight_requests = register_int_gauge_vec_with_registry!(
201 format!("{node}_{direction}_inflight_requests"),
202 "The number of inflight network requests",
203 &["route"],
204 registry
205 )
206 .unwrap();
207
208 let errors = register_int_counter_vec_with_registry!(
209 format!("{node}_{direction}_request_errors"),
210 "Number of errors by route",
211 &["route", "status"],
212 registry,
213 )
214 .unwrap();
215
216 Self {
217 requests,
218 request_latency,
219 request_size,
220 response_size,
221 excessive_size_requests,
222 excessive_size_responses,
223 inflight_requests,
224 errors,
225 }
226 }
227}
228
229#[derive(Clone)]
230pub struct MetricsMakeCallbackHandler {
231 metrics: Arc<NetworkMetrics>,
232 excessive_message_size: usize,
234}
235
236impl MetricsMakeCallbackHandler {
237 pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
238 Self {
239 metrics,
240 excessive_message_size,
241 }
242 }
243}
244
245impl MakeCallbackHandler for MetricsMakeCallbackHandler {
246 type Handler = MetricsResponseHandler;
247
248 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
249 let route = request.route().to_owned();
250
251 self.metrics.requests.with_label_values(&[&route]).inc();
252 self.metrics
253 .inflight_requests
254 .with_label_values(&[&route])
255 .inc();
256 let body_len = request.body().len();
257 self.metrics
258 .request_size
259 .with_label_values(&[&route])
260 .observe(body_len as f64);
261 if body_len > self.excessive_message_size {
262 warn!(
263 "Saw excessively large request with size {body_len} for {route} with peer {:?}",
264 request.peer_id()
265 );
266 self.metrics
267 .excessive_size_requests
268 .with_label_values(&[&route])
269 .inc();
270 }
271
272 let timer = self
273 .metrics
274 .request_latency
275 .with_label_values(&[&route])
276 .start_timer();
277
278 MetricsResponseHandler {
279 metrics: self.metrics.clone(),
280 timer,
281 route,
282 excessive_message_size: self.excessive_message_size,
283 }
284 }
285}
286
287pub struct MetricsResponseHandler {
288 metrics: Arc<NetworkMetrics>,
289 #[allow(unused)]
291 timer: HistogramTimer,
292 route: String,
293 excessive_message_size: usize,
294}
295
296impl ResponseHandler for MetricsResponseHandler {
297 fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
298 let body_len = response.body().len();
299 self.metrics
300 .response_size
301 .with_label_values(&[&self.route])
302 .observe(body_len as f64);
303 if body_len > self.excessive_message_size {
304 warn!(
305 "Saw excessively large response with size {body_len} for {} with peer {:?}",
306 self.route,
307 response.peer_id()
308 );
309 self.metrics
310 .excessive_size_responses
311 .with_label_values(&[&self.route])
312 .inc();
313 }
314
315 if !response.status().is_success() {
316 let status = response.status().to_u16().to_string();
317 self.metrics
318 .errors
319 .with_label_values(&[&self.route, &status])
320 .inc();
321 }
322 }
323
324 fn on_error<E>(self, _error: &E) {
325 self.metrics
326 .errors
327 .with_label_values(&[&self.route, "unknown"])
328 .inc();
329 }
330}
331
332impl Drop for MetricsResponseHandler {
333 fn drop(&mut self) {
334 self.metrics
335 .inflight_requests
336 .with_label_values(&[&self.route])
337 .dec();
338 }
339}