mysten_network/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use mysten_metrics::{BYTES_BUCKETS, LATENCY_SEC_BUCKETS};
9use prometheus::{
10    HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
11    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
12    register_int_gauge_vec_with_registry,
13};
14use tonic::codegen::http::header::HeaderName;
15use tonic::codegen::http::{HeaderValue, Request, Response};
16use tonic::{Code, Status};
17use tower_http::classify::GrpcFailureClass;
18use tower_http::trace::{OnFailure, OnRequest, OnResponse};
19use tracing::{Span, warn};
20
21pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
22
23/// The trait to be implemented when you want to be notified about
24/// a new request and related metrics around it. When a request
25/// is performed (up to the point that a response is created) the
26/// on_response method is called with the corresponding metrics
27/// details. The on_request method will be called when the request
28/// is received, but not further processing has happened at this
29/// point.
30pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
31    /// Method will be called when a request has been received.
32    /// `path`: the endpoint uri path
33    fn on_request(&self, path: String);
34
35    /// Method to be called from the server when a request is performed.
36    /// `path`: the endpoint uri path
37    /// `latency`: the time when the request was received and when the response was created
38    /// `status`: the http status code of the response
39    /// `grpc_status_code`: the grpc status code (see <https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc>)
40    fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
41
42    /// Called when request call is started
43    fn on_start(&self, _path: &str) {}
44
45    /// Called when request call is dropped.
46    /// It is guaranteed that for each on_start there will be corresponding on_drop
47    fn on_drop(&self, _path: &str) {}
48}
49
50#[derive(Clone, Default)]
51pub struct DefaultMetricsCallbackProvider {}
52impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
53    fn on_request(&self, _path: String) {}
54
55    fn on_response(
56        &self,
57        _path: String,
58        _latency: Duration,
59        _status: u16,
60        _grpc_status_code: Code,
61    ) {
62    }
63}
64
65#[derive(Clone)]
66pub struct MetricsHandler<M: MetricsCallbackProvider> {
67    metrics_provider: M,
68}
69
70impl<M: MetricsCallbackProvider> MetricsHandler<M> {
71    pub fn new(metrics_provider: M) -> Self {
72        Self { metrics_provider }
73    }
74}
75
76impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
77    fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
78        let grpc_status = Status::from_header_map(response.headers());
79        let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
80
81        let path: HeaderValue = response
82            .headers()
83            .get(&GRPC_ENDPOINT_PATH_HEADER)
84            .unwrap()
85            .clone();
86
87        self.metrics_provider.on_response(
88            path.to_str().unwrap().to_string(),
89            latency,
90            response.status().as_u16(),
91            grpc_status_code,
92        );
93    }
94}
95
96impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
97    fn on_request(&mut self, request: &Request<B>, _span: &Span) {
98        self.metrics_provider
99            .on_request(request.uri().path().to_string());
100    }
101}
102
103impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
104    fn on_failure(
105        &mut self,
106        _failure_classification: GrpcFailureClass,
107        _latency: Duration,
108        _span: &Span,
109    ) {
110        // just do nothing for now so we avoid printing unnecessary logs
111    }
112}
113
114#[derive(Clone)]
115pub struct NetworkMetrics {
116    /// Counter of requests by route
117    requests: IntCounterVec,
118    /// Request latency by route
119    request_latency: HistogramVec,
120    /// Request size by route
121    request_size: HistogramVec,
122    /// Response size by route
123    response_size: HistogramVec,
124    /// Counter of requests exceeding the "excessive" size limit
125    excessive_size_requests: IntCounterVec,
126    /// Counter of responses exceeding the "excessive" size limit
127    excessive_size_responses: IntCounterVec,
128    /// Gauge of the number of inflight requests at any given time by route
129    inflight_requests: IntGaugeVec,
130    /// Failed requests by route
131    errors: IntCounterVec,
132}
133
134impl NetworkMetrics {
135    pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
136        let requests = register_int_counter_vec_with_registry!(
137            format!("{node}_{direction}_requests"),
138            "The number of requests made on the network",
139            &["route"],
140            registry
141        )
142        .unwrap();
143
144        let request_latency = register_histogram_vec_with_registry!(
145            format!("{node}_{direction}_request_latency"),
146            "Latency of a request by route",
147            &["route"],
148            LATENCY_SEC_BUCKETS.to_vec(),
149            registry,
150        )
151        .unwrap();
152
153        let request_size = register_histogram_vec_with_registry!(
154            format!("{node}_{direction}_request_size"),
155            "Size of a request by route",
156            &["route"],
157            BYTES_BUCKETS.to_vec(),
158            registry,
159        )
160        .unwrap();
161
162        let response_size = register_histogram_vec_with_registry!(
163            format!("{node}_{direction}_response_size"),
164            "Size of a response by route",
165            &["route"],
166            BYTES_BUCKETS.to_vec(),
167            registry,
168        )
169        .unwrap();
170
171        let excessive_size_requests = register_int_counter_vec_with_registry!(
172            format!("{node}_{direction}_excessive_size_requests"),
173            "The number of excessively large request messages sent",
174            &["route"],
175            registry
176        )
177        .unwrap();
178
179        let excessive_size_responses = register_int_counter_vec_with_registry!(
180            format!("{node}_{direction}_excessive_size_responses"),
181            "The number of excessively large response messages seen",
182            &["route"],
183            registry
184        )
185        .unwrap();
186
187        let inflight_requests = register_int_gauge_vec_with_registry!(
188            format!("{node}_{direction}_inflight_requests"),
189            "The number of inflight network requests",
190            &["route"],
191            registry
192        )
193        .unwrap();
194
195        let errors = register_int_counter_vec_with_registry!(
196            format!("{node}_{direction}_request_errors"),
197            "Number of errors by route",
198            &["route", "status"],
199            registry,
200        )
201        .unwrap();
202
203        Self {
204            requests,
205            request_latency,
206            request_size,
207            response_size,
208            excessive_size_requests,
209            excessive_size_responses,
210            inflight_requests,
211            errors,
212        }
213    }
214}
215
216#[derive(Clone)]
217pub struct MetricsMakeCallbackHandler {
218    metrics: Arc<NetworkMetrics>,
219    /// Size in bytes above which a request or response message is considered excessively large
220    excessive_message_size: usize,
221}
222
223impl MetricsMakeCallbackHandler {
224    pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
225        Self {
226            metrics,
227            excessive_message_size,
228        }
229    }
230}
231
232impl MakeCallbackHandler for MetricsMakeCallbackHandler {
233    type Handler = MetricsResponseHandler;
234
235    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
236        let route = request.route().to_owned();
237
238        self.metrics.requests.with_label_values(&[&route]).inc();
239        self.metrics
240            .inflight_requests
241            .with_label_values(&[&route])
242            .inc();
243        let body_len = request.body().len();
244        self.metrics
245            .request_size
246            .with_label_values(&[&route])
247            .observe(body_len as f64);
248        if body_len > self.excessive_message_size {
249            warn!(
250                "Saw excessively large request with size {body_len} for {route} with peer {:?}",
251                request.peer_id()
252            );
253            self.metrics
254                .excessive_size_requests
255                .with_label_values(&[&route])
256                .inc();
257        }
258
259        let timer = self
260            .metrics
261            .request_latency
262            .with_label_values(&[&route])
263            .start_timer();
264
265        MetricsResponseHandler {
266            metrics: self.metrics.clone(),
267            timer,
268            route,
269            excessive_message_size: self.excessive_message_size,
270        }
271    }
272}
273
274pub struct MetricsResponseHandler {
275    metrics: Arc<NetworkMetrics>,
276    // The timer is held on to and "observed" once dropped
277    #[allow(unused)]
278    timer: HistogramTimer,
279    route: String,
280    excessive_message_size: usize,
281}
282
283impl ResponseHandler for MetricsResponseHandler {
284    fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
285        let body_len = response.body().len();
286        self.metrics
287            .response_size
288            .with_label_values(&[&self.route])
289            .observe(body_len as f64);
290        if body_len > self.excessive_message_size {
291            warn!(
292                "Saw excessively large response with size {body_len} for {} with peer {:?}",
293                self.route,
294                response.peer_id()
295            );
296            self.metrics
297                .excessive_size_responses
298                .with_label_values(&[&self.route])
299                .inc();
300        }
301
302        if !response.status().is_success() {
303            let status = response.status().to_u16().to_string();
304            self.metrics
305                .errors
306                .with_label_values(&[&self.route, &status])
307                .inc();
308        }
309    }
310
311    fn on_error<E>(self, _error: &E) {
312        self.metrics
313            .errors
314            .with_label_values(&[&self.route, "unknown"])
315            .inc();
316    }
317}
318
319impl Drop for MetricsResponseHandler {
320    fn drop(&mut self) {
321        self.metrics
322            .inflight_requests
323            .with_label_values(&[&self.route])
324            .dec();
325    }
326}