mysten_network/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use mysten_metrics::{BYTES_BUCKETS, LATENCY_SEC_BUCKETS};
9use prometheus::{
10    HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
11    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
12    register_int_gauge_vec_with_registry,
13};
14use tonic::codegen::http::header::HeaderName;
15use tonic::codegen::http::{Request, Response};
16use tonic::{Code, Status};
17use tower_http::classify::GrpcFailureClass;
18use tower_http::trace::{OnFailure, OnRequest, OnResponse};
19use tracing::{Span, warn};
20
21pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
22
23/// The trait to be implemented when you want to be notified about
24/// a new request and related metrics around it. When a request
25/// is performed (up to the point that a response is created) the
26/// on_response method is called with the corresponding metrics
27/// details. The on_request method will be called when the request
28/// is received, but not further processing has happened at this
29/// point.
30pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
31    /// Method will be called when a request has been received.
32    /// `path`: the endpoint uri path
33    fn on_request(&self, path: String);
34
35    /// Method to be called from the server when a request is performed.
36    /// `path`: the endpoint uri path
37    /// `latency`: the time when the request was received and when the response was created
38    /// `status`: the http status code of the response
39    /// `grpc_status_code`: the grpc status code (see <https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc>)
40    fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
41
42    /// Called when request call is started
43    fn on_start(&self, _path: &str) {}
44
45    /// Called when request call is dropped.
46    /// It is guaranteed that for each on_start there will be corresponding on_drop
47    fn on_drop(&self, _path: &str) {}
48}
49
50#[derive(Clone, Default)]
51pub struct DefaultMetricsCallbackProvider {}
52impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
53    fn on_request(&self, _path: String) {}
54
55    fn on_response(
56        &self,
57        _path: String,
58        _latency: Duration,
59        _status: u16,
60        _grpc_status_code: Code,
61    ) {
62    }
63}
64
65#[derive(Clone)]
66pub struct MetricsHandler<M: MetricsCallbackProvider> {
67    metrics_provider: M,
68}
69
70impl<M: MetricsCallbackProvider> MetricsHandler<M> {
71    pub fn new(metrics_provider: M) -> Self {
72        Self { metrics_provider }
73    }
74}
75
76impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
77    fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
78        let grpc_status = Status::from_header_map(response.headers());
79        let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
80
81        let path = response
82            .headers()
83            .get(&GRPC_ENDPOINT_PATH_HEADER)
84            .and_then(|v| v.to_str().ok())
85            .unwrap_or("invalid")
86            .to_string();
87
88        self.metrics_provider.on_response(
89            path,
90            latency,
91            response.status().as_u16(),
92            grpc_status_code,
93        );
94    }
95}
96
97impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
98    fn on_request(&mut self, request: &Request<B>, _span: &Span) {
99        self.metrics_provider
100            .on_request(request.uri().path().to_string());
101    }
102}
103
104impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
105    fn on_failure(
106        &mut self,
107        _failure_classification: GrpcFailureClass,
108        _latency: Duration,
109        _span: &Span,
110    ) {
111        // just do nothing for now so we avoid printing unnecessary logs
112    }
113}
114
115#[derive(Clone)]
116pub struct NetworkMetrics {
117    /// Counter of requests by route
118    requests: IntCounterVec,
119    /// Request latency by route
120    request_latency: HistogramVec,
121    /// Request size by route
122    request_size: HistogramVec,
123    /// Response size by route
124    response_size: HistogramVec,
125    /// Counter of requests exceeding the "excessive" size limit
126    excessive_size_requests: IntCounterVec,
127    /// Counter of responses exceeding the "excessive" size limit
128    excessive_size_responses: IntCounterVec,
129    /// Gauge of the number of inflight requests at any given time by route
130    inflight_requests: IntGaugeVec,
131    /// Failed requests by route
132    errors: IntCounterVec,
133}
134
135impl NetworkMetrics {
136    pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
137        let requests = register_int_counter_vec_with_registry!(
138            format!("{node}_{direction}_requests"),
139            "The number of requests made on the network",
140            &["route"],
141            registry
142        )
143        .unwrap();
144
145        let request_latency = register_histogram_vec_with_registry!(
146            format!("{node}_{direction}_request_latency"),
147            "Latency of a request by route",
148            &["route"],
149            LATENCY_SEC_BUCKETS.to_vec(),
150            registry,
151        )
152        .unwrap();
153
154        let request_size = register_histogram_vec_with_registry!(
155            format!("{node}_{direction}_request_size"),
156            "Size of a request by route",
157            &["route"],
158            BYTES_BUCKETS.to_vec(),
159            registry,
160        )
161        .unwrap();
162
163        let response_size = register_histogram_vec_with_registry!(
164            format!("{node}_{direction}_response_size"),
165            "Size of a response by route",
166            &["route"],
167            BYTES_BUCKETS.to_vec(),
168            registry,
169        )
170        .unwrap();
171
172        let excessive_size_requests = register_int_counter_vec_with_registry!(
173            format!("{node}_{direction}_excessive_size_requests"),
174            "The number of excessively large request messages sent",
175            &["route"],
176            registry
177        )
178        .unwrap();
179
180        let excessive_size_responses = register_int_counter_vec_with_registry!(
181            format!("{node}_{direction}_excessive_size_responses"),
182            "The number of excessively large response messages seen",
183            &["route"],
184            registry
185        )
186        .unwrap();
187
188        let inflight_requests = register_int_gauge_vec_with_registry!(
189            format!("{node}_{direction}_inflight_requests"),
190            "The number of inflight network requests",
191            &["route"],
192            registry
193        )
194        .unwrap();
195
196        let errors = register_int_counter_vec_with_registry!(
197            format!("{node}_{direction}_request_errors"),
198            "Number of errors by route",
199            &["route", "status"],
200            registry,
201        )
202        .unwrap();
203
204        Self {
205            requests,
206            request_latency,
207            request_size,
208            response_size,
209            excessive_size_requests,
210            excessive_size_responses,
211            inflight_requests,
212            errors,
213        }
214    }
215}
216
217#[derive(Clone)]
218pub struct MetricsMakeCallbackHandler {
219    metrics: Arc<NetworkMetrics>,
220    /// Size in bytes above which a request or response message is considered excessively large
221    excessive_message_size: usize,
222}
223
224impl MetricsMakeCallbackHandler {
225    pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
226        Self {
227            metrics,
228            excessive_message_size,
229        }
230    }
231}
232
233impl MakeCallbackHandler for MetricsMakeCallbackHandler {
234    type Handler = MetricsResponseHandler;
235
236    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
237        let route = request.route().to_owned();
238
239        self.metrics.requests.with_label_values(&[&route]).inc();
240        self.metrics
241            .inflight_requests
242            .with_label_values(&[&route])
243            .inc();
244        let body_len = request.body().len();
245        self.metrics
246            .request_size
247            .with_label_values(&[&route])
248            .observe(body_len as f64);
249        if body_len > self.excessive_message_size {
250            warn!(
251                "Saw excessively large request with size {body_len} for {route} with peer {:?}",
252                request.peer_id()
253            );
254            self.metrics
255                .excessive_size_requests
256                .with_label_values(&[&route])
257                .inc();
258        }
259
260        let timer = self
261            .metrics
262            .request_latency
263            .with_label_values(&[&route])
264            .start_timer();
265
266        MetricsResponseHandler {
267            metrics: self.metrics.clone(),
268            timer,
269            route,
270            excessive_message_size: self.excessive_message_size,
271        }
272    }
273}
274
275pub struct MetricsResponseHandler {
276    metrics: Arc<NetworkMetrics>,
277    // The timer is held on to and "observed" once dropped
278    #[allow(unused)]
279    timer: HistogramTimer,
280    route: String,
281    excessive_message_size: usize,
282}
283
284impl ResponseHandler for MetricsResponseHandler {
285    fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
286        let body_len = response.body().len();
287        self.metrics
288            .response_size
289            .with_label_values(&[&self.route])
290            .observe(body_len as f64);
291        if body_len > self.excessive_message_size {
292            warn!(
293                "Saw excessively large response with size {body_len} for {} with peer {:?}",
294                self.route,
295                response.peer_id()
296            );
297            self.metrics
298                .excessive_size_responses
299                .with_label_values(&[&self.route])
300                .inc();
301        }
302
303        if !response.status().is_success() {
304            let status = response.status().to_u16().to_string();
305            self.metrics
306                .errors
307                .with_label_values(&[&self.route, &status])
308                .inc();
309        }
310    }
311
312    fn on_error<E>(self, _error: &E) {
313        self.metrics
314            .errors
315            .with_label_values(&[self.route.as_str(), "unknown"])
316            .inc();
317    }
318}
319
320impl Drop for MetricsResponseHandler {
321    fn drop(&mut self) {
322        self.metrics
323            .inflight_requests
324            .with_label_values(&[&self.route])
325            .dec();
326    }
327}