mysten_network/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
8use prometheus::{
9    HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
10    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
11    register_int_gauge_vec_with_registry,
12};
13use tonic::codegen::http::header::HeaderName;
14use tonic::codegen::http::{HeaderValue, Request, Response};
15use tonic::{Code, Status};
16use tower_http::classify::GrpcFailureClass;
17use tower_http::trace::{OnFailure, OnRequest, OnResponse};
18use tracing::{Span, warn};
19
20const LATENCY_SEC_BUCKETS: &[f64] = &[
21    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
22];
23
24// Arbitrarily chosen buckets for message size, with gradually-lowering exponent to give us
25// better resolution at high sizes.
26const SIZE_BYTE_BUCKETS: &[f64] = &[
27    2048., 8192., // *4
28    16384., 32768., 65536., 131072., 262144., 524288., 1048576., // *2
29    1572864., 2359256., 3538944., // *1.5
30    4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
31    48787348., 63423553., // *1.3
32];
33
34pub static GRPC_ENDPOINT_PATH_HEADER: HeaderName = HeaderName::from_static("grpc-path-req");
35
36/// The trait to be implemented when you want to be notified about
37/// a new request and related metrics around it. When a request
38/// is performed (up to the point that a response is created) the
39/// on_response method is called with the corresponding metrics
40/// details. The on_request method will be called when the request
41/// is received, but not further processing has happened at this
42/// point.
43pub trait MetricsCallbackProvider: Send + Sync + Clone + 'static {
44    /// Method will be called when a request has been received.
45    /// `path`: the endpoint uri path
46    fn on_request(&self, path: String);
47
48    /// Method to be called from the server when a request is performed.
49    /// `path`: the endpoint uri path
50    /// `latency`: the time when the request was received and when the response was created
51    /// `status`: the http status code of the response
52    /// `grpc_status_code`: the grpc status code (see <https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc>)
53    fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code);
54
55    /// Called when request call is started
56    fn on_start(&self, _path: &str) {}
57
58    /// Called when request call is dropped.
59    /// It is guaranteed that for each on_start there will be corresponding on_drop
60    fn on_drop(&self, _path: &str) {}
61}
62
63#[derive(Clone, Default)]
64pub struct DefaultMetricsCallbackProvider {}
65impl MetricsCallbackProvider for DefaultMetricsCallbackProvider {
66    fn on_request(&self, _path: String) {}
67
68    fn on_response(
69        &self,
70        _path: String,
71        _latency: Duration,
72        _status: u16,
73        _grpc_status_code: Code,
74    ) {
75    }
76}
77
78#[derive(Clone)]
79pub struct MetricsHandler<M: MetricsCallbackProvider> {
80    metrics_provider: M,
81}
82
83impl<M: MetricsCallbackProvider> MetricsHandler<M> {
84    pub fn new(metrics_provider: M) -> Self {
85        Self { metrics_provider }
86    }
87}
88
89impl<B, M: MetricsCallbackProvider> OnResponse<B> for MetricsHandler<M> {
90    fn on_response(self, response: &Response<B>, latency: Duration, _span: &Span) {
91        let grpc_status = Status::from_header_map(response.headers());
92        let grpc_status_code = grpc_status.map_or(Code::Ok, |s| s.code());
93
94        let path: HeaderValue = response
95            .headers()
96            .get(&GRPC_ENDPOINT_PATH_HEADER)
97            .unwrap()
98            .clone();
99
100        self.metrics_provider.on_response(
101            path.to_str().unwrap().to_string(),
102            latency,
103            response.status().as_u16(),
104            grpc_status_code,
105        );
106    }
107}
108
109impl<B, M: MetricsCallbackProvider> OnRequest<B> for MetricsHandler<M> {
110    fn on_request(&mut self, request: &Request<B>, _span: &Span) {
111        self.metrics_provider
112            .on_request(request.uri().path().to_string());
113    }
114}
115
116impl<M: MetricsCallbackProvider> OnFailure<GrpcFailureClass> for MetricsHandler<M> {
117    fn on_failure(
118        &mut self,
119        _failure_classification: GrpcFailureClass,
120        _latency: Duration,
121        _span: &Span,
122    ) {
123        // just do nothing for now so we avoid printing unnecessary logs
124    }
125}
126
127#[derive(Clone)]
128pub struct NetworkMetrics {
129    /// Counter of requests by route
130    requests: IntCounterVec,
131    /// Request latency by route
132    request_latency: HistogramVec,
133    /// Request size by route
134    request_size: HistogramVec,
135    /// Response size by route
136    response_size: HistogramVec,
137    /// Counter of requests exceeding the "excessive" size limit
138    excessive_size_requests: IntCounterVec,
139    /// Counter of responses exceeding the "excessive" size limit
140    excessive_size_responses: IntCounterVec,
141    /// Gauge of the number of inflight requests at any given time by route
142    inflight_requests: IntGaugeVec,
143    /// Failed requests by route
144    errors: IntCounterVec,
145}
146
147impl NetworkMetrics {
148    pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
149        let requests = register_int_counter_vec_with_registry!(
150            format!("{node}_{direction}_requests"),
151            "The number of requests made on the network",
152            &["route"],
153            registry
154        )
155        .unwrap();
156
157        let request_latency = register_histogram_vec_with_registry!(
158            format!("{node}_{direction}_request_latency"),
159            "Latency of a request by route",
160            &["route"],
161            LATENCY_SEC_BUCKETS.to_vec(),
162            registry,
163        )
164        .unwrap();
165
166        let request_size = register_histogram_vec_with_registry!(
167            format!("{node}_{direction}_request_size"),
168            "Size of a request by route",
169            &["route"],
170            SIZE_BYTE_BUCKETS.to_vec(),
171            registry,
172        )
173        .unwrap();
174
175        let response_size = register_histogram_vec_with_registry!(
176            format!("{node}_{direction}_response_size"),
177            "Size of a response by route",
178            &["route"],
179            SIZE_BYTE_BUCKETS.to_vec(),
180            registry,
181        )
182        .unwrap();
183
184        let excessive_size_requests = register_int_counter_vec_with_registry!(
185            format!("{node}_{direction}_excessive_size_requests"),
186            "The number of excessively large request messages sent",
187            &["route"],
188            registry
189        )
190        .unwrap();
191
192        let excessive_size_responses = register_int_counter_vec_with_registry!(
193            format!("{node}_{direction}_excessive_size_responses"),
194            "The number of excessively large response messages seen",
195            &["route"],
196            registry
197        )
198        .unwrap();
199
200        let inflight_requests = register_int_gauge_vec_with_registry!(
201            format!("{node}_{direction}_inflight_requests"),
202            "The number of inflight network requests",
203            &["route"],
204            registry
205        )
206        .unwrap();
207
208        let errors = register_int_counter_vec_with_registry!(
209            format!("{node}_{direction}_request_errors"),
210            "Number of errors by route",
211            &["route", "status"],
212            registry,
213        )
214        .unwrap();
215
216        Self {
217            requests,
218            request_latency,
219            request_size,
220            response_size,
221            excessive_size_requests,
222            excessive_size_responses,
223            inflight_requests,
224            errors,
225        }
226    }
227}
228
229#[derive(Clone)]
230pub struct MetricsMakeCallbackHandler {
231    metrics: Arc<NetworkMetrics>,
232    /// Size in bytes above which a request or response message is considered excessively large
233    excessive_message_size: usize,
234}
235
236impl MetricsMakeCallbackHandler {
237    pub fn new(metrics: Arc<NetworkMetrics>, excessive_message_size: usize) -> Self {
238        Self {
239            metrics,
240            excessive_message_size,
241        }
242    }
243}
244
245impl MakeCallbackHandler for MetricsMakeCallbackHandler {
246    type Handler = MetricsResponseHandler;
247
248    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
249        let route = request.route().to_owned();
250
251        self.metrics.requests.with_label_values(&[&route]).inc();
252        self.metrics
253            .inflight_requests
254            .with_label_values(&[&route])
255            .inc();
256        let body_len = request.body().len();
257        self.metrics
258            .request_size
259            .with_label_values(&[&route])
260            .observe(body_len as f64);
261        if body_len > self.excessive_message_size {
262            warn!(
263                "Saw excessively large request with size {body_len} for {route} with peer {:?}",
264                request.peer_id()
265            );
266            self.metrics
267                .excessive_size_requests
268                .with_label_values(&[&route])
269                .inc();
270        }
271
272        let timer = self
273            .metrics
274            .request_latency
275            .with_label_values(&[&route])
276            .start_timer();
277
278        MetricsResponseHandler {
279            metrics: self.metrics.clone(),
280            timer,
281            route,
282            excessive_message_size: self.excessive_message_size,
283        }
284    }
285}
286
287pub struct MetricsResponseHandler {
288    metrics: Arc<NetworkMetrics>,
289    // The timer is held on to and "observed" once dropped
290    #[allow(unused)]
291    timer: HistogramTimer,
292    route: String,
293    excessive_message_size: usize,
294}
295
296impl ResponseHandler for MetricsResponseHandler {
297    fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
298        let body_len = response.body().len();
299        self.metrics
300            .response_size
301            .with_label_values(&[&self.route])
302            .observe(body_len as f64);
303        if body_len > self.excessive_message_size {
304            warn!(
305                "Saw excessively large response with size {body_len} for {} with peer {:?}",
306                self.route,
307                response.peer_id()
308            );
309            self.metrics
310                .excessive_size_responses
311                .with_label_values(&[&self.route])
312                .inc();
313        }
314
315        if !response.status().is_success() {
316            let status = response.status().to_u16().to_string();
317            self.metrics
318                .errors
319                .with_label_values(&[&self.route, &status])
320                .inc();
321        }
322    }
323
324    fn on_error<E>(self, _error: &E) {
325        self.metrics
326            .errors
327            .with_label_values(&[&self.route, "unknown"])
328            .inc();
329    }
330}
331
332impl Drop for MetricsResponseHandler {
333    fn drop(&mut self) {
334        self.metrics
335            .inflight_requests
336            .with_label_values(&[&self.route])
337            .dec();
338    }
339}