consensus_core/network/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
7use prometheus::{
8    HistogramTimer, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
9    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14// Fields for network-agnostic metrics can be added here
15pub(crate) struct NetworkMetrics {
16    pub(crate) network_type: IntGaugeVec,
17    pub(crate) inbound: Arc<NetworkRouteMetrics>,
18    pub(crate) outbound: Arc<NetworkRouteMetrics>,
19    #[cfg_attr(msim, allow(dead_code))]
20    pub(crate) tcp_connection_metrics: Arc<TcpConnectionMetrics>,
21    pub(crate) quinn_connection_metrics: Arc<QuinnConnectionMetrics>,
22}
23
24impl NetworkMetrics {
25    pub(crate) fn new(registry: &Registry) -> Self {
26        Self {
27            network_type: register_int_gauge_vec_with_registry!(
28                "network_type",
29                "Type of the network used: anemo or tonic",
30                &["type"],
31                registry
32            )
33            .unwrap(),
34            inbound: Arc::new(NetworkRouteMetrics::new("", "inbound", registry)),
35            outbound: Arc::new(NetworkRouteMetrics::new("", "outbound", registry)),
36            tcp_connection_metrics: Arc::new(TcpConnectionMetrics::new(registry)),
37            quinn_connection_metrics: Arc::new(QuinnConnectionMetrics::new("", registry)),
38        }
39    }
40}
41
42#[cfg_attr(msim, allow(dead_code))]
43pub(crate) struct TcpConnectionMetrics {
44    /// Send buffer size of consensus TCP socket.
45    pub(crate) socket_send_buffer_size: IntGauge,
46    /// Receive buffer size of consensus TCP socket.
47    pub(crate) socket_recv_buffer_size: IntGauge,
48    /// Max send buffer size of TCP socket.
49    pub(crate) socket_send_buffer_max_size: IntGauge,
50    /// Max receive buffer size of TCP socket.
51    pub(crate) socket_recv_buffer_max_size: IntGauge,
52}
53
54impl TcpConnectionMetrics {
55    pub fn new(registry: &Registry) -> Self {
56        Self {
57            socket_send_buffer_size: register_int_gauge_with_registry!(
58                "tcp_socket_send_buffer_size",
59                "Send buffer size of consensus TCP socket.",
60                registry
61            )
62            .unwrap(),
63            socket_recv_buffer_size: register_int_gauge_with_registry!(
64                "tcp_socket_recv_buffer_size",
65                "Receive buffer size of consensus TCP socket.",
66                registry
67            )
68            .unwrap(),
69            socket_send_buffer_max_size: register_int_gauge_with_registry!(
70                "tcp_socket_send_buffer_max_size",
71                "Max send buffer size of TCP socket.",
72                registry
73            )
74            .unwrap(),
75            socket_recv_buffer_max_size: register_int_gauge_with_registry!(
76                "tcp_socket_recv_buffer_max_size",
77                "Max receive buffer size of TCP socket.",
78                registry
79            )
80            .unwrap(),
81        }
82    }
83}
84
85pub struct QuinnConnectionMetrics {
86    /// The connection status of known peers. 0 if not connected, 1 if connected.
87    pub network_peer_connected: IntGaugeVec,
88    /// The number of connected peers
89    pub network_peers: IntGauge,
90    /// Number of disconnect events per peer.
91    pub network_peer_disconnects: IntCounterVec,
92    /// Receive buffer size of Anemo socket.
93    pub socket_receive_buffer_size: IntGauge,
94    /// Send buffer size of Anemo socket.
95    pub socket_send_buffer_size: IntGauge,
96
97    /// PathStats
98    /// The rtt for a peer connection in ms.
99    pub network_peer_rtt: IntGaugeVec,
100    /// The total number of lost packets for a peer connection.
101    pub network_peer_lost_packets: IntGaugeVec,
102    /// The total number of lost bytes for a peer connection.
103    pub network_peer_lost_bytes: IntGaugeVec,
104    /// The total number of packets sent for a peer connection.
105    pub network_peer_sent_packets: IntGaugeVec,
106    /// The total number of congestion events for a peer connection.
107    pub network_peer_congestion_events: IntGaugeVec,
108    /// The congestion window for a peer connection.
109    pub network_peer_congestion_window: IntGaugeVec,
110
111    /// FrameStats
112    /// The number of max data frames for a peer connection.
113    pub network_peer_max_data: IntGaugeVec,
114    /// The number of closed connections frames for a peer connection.
115    pub network_peer_closed_connections: IntGaugeVec,
116    /// The number of data blocked frames for a peer connection.
117    pub network_peer_data_blocked: IntGaugeVec,
118
119    /// UDPStats
120    /// The total number datagrams observed by the UDP peer connection.
121    pub network_peer_udp_datagrams: IntGaugeVec,
122    /// The total number bytes observed by the UDP peer connection.
123    pub network_peer_udp_bytes: IntGaugeVec,
124    /// The total number transmits observed by the UDP peer connection.
125    pub network_peer_udp_transmits: IntGaugeVec,
126}
127
128impl QuinnConnectionMetrics {
129    pub fn new(node: &'static str, registry: &Registry) -> Self {
130        Self {
131            network_peer_connected: register_int_gauge_vec_with_registry!(
132                format!("{node}_quinn_network_peer_connected"),
133                "The connection status of a peer. 0 if not connected, 1 if connected",
134                &["peer_id", "peer_label"],
135                registry
136            )
137            .unwrap(),
138            network_peers: register_int_gauge_with_registry!(
139                format!("{node}_quinn_network_peers"),
140                "The number of connected peers.",
141                registry
142            )
143            .unwrap(),
144            network_peer_disconnects: register_int_counter_vec_with_registry!(
145                format!("{node}_quinn_network_peer_disconnects"),
146                "Number of disconnect events per peer.",
147                &["peer_id", "peer_label", "reason"],
148                registry
149            )
150            .unwrap(),
151            socket_receive_buffer_size: register_int_gauge_with_registry!(
152                format!("{node}_quinn_socket_receive_buffer_size"),
153                "Receive buffer size of Anemo socket.",
154                registry
155            )
156            .unwrap(),
157            socket_send_buffer_size: register_int_gauge_with_registry!(
158                format!("{node}_quinn_socket_send_buffer_size"),
159                "Send buffer size of Anemo socket.",
160                registry
161            )
162            .unwrap(),
163
164            // PathStats
165            network_peer_rtt: register_int_gauge_vec_with_registry!(
166                format!("{node}_quinn_network_peer_rtt"),
167                "The rtt for a peer connection in ms.",
168                &["peer_id", "peer_label"],
169                registry
170            )
171            .unwrap(),
172            network_peer_lost_packets: register_int_gauge_vec_with_registry!(
173                format!("{node}_quinn_network_peer_lost_packets"),
174                "The total number of lost packets for a peer connection.",
175                &["peer_id", "peer_label"],
176                registry
177            )
178            .unwrap(),
179            network_peer_lost_bytes: register_int_gauge_vec_with_registry!(
180                format!("{node}_quinn_network_peer_lost_bytes"),
181                "The total number of lost bytes for a peer connection.",
182                &["peer_id", "peer_label"],
183                registry
184            )
185            .unwrap(),
186            network_peer_sent_packets: register_int_gauge_vec_with_registry!(
187                format!("{node}_quinn_network_peer_sent_packets"),
188                "The total number of sent packets for a peer connection.",
189                &["peer_id", "peer_label"],
190                registry
191            )
192            .unwrap(),
193            network_peer_congestion_events: register_int_gauge_vec_with_registry!(
194                format!("{node}_quinn_network_peer_congestion_events"),
195                "The total number of congestion events for a peer connection.",
196                &["peer_id", "peer_label"],
197                registry
198            )
199            .unwrap(),
200            network_peer_congestion_window: register_int_gauge_vec_with_registry!(
201                format!("{node}_quinn_network_peer_congestion_window"),
202                "The congestion window for a peer connection.",
203                &["peer_id", "peer_label"],
204                registry
205            )
206            .unwrap(),
207
208            // FrameStats
209            network_peer_closed_connections: register_int_gauge_vec_with_registry!(
210                format!("{node}_quinn_network_peer_closed_connections"),
211                "The number of closed connections for a peer connection.",
212                &["peer_id", "peer_label", "direction"],
213                registry
214            )
215            .unwrap(),
216            network_peer_max_data: register_int_gauge_vec_with_registry!(
217                format!("{node}_quinn_network_peer_max_data"),
218                "The number of max data frames for a peer connection.",
219                &["peer_id", "peer_label", "direction"],
220                registry
221            )
222            .unwrap(),
223            network_peer_data_blocked: register_int_gauge_vec_with_registry!(
224                format!("{node}_quinn_network_peer_data_blocked"),
225                "The number of data blocked frames for a peer connection.",
226                &["peer_id", "peer_label", "direction"],
227                registry
228            )
229            .unwrap(),
230
231            // UDPStats
232            network_peer_udp_datagrams: register_int_gauge_vec_with_registry!(
233                format!("{node}_quinn_network_peer_udp_datagrams"),
234                "The total number datagrams observed by the UDP peer connection.",
235                &["peer_id", "peer_label", "direction"],
236                registry
237            )
238            .unwrap(),
239            network_peer_udp_bytes: register_int_gauge_vec_with_registry!(
240                format!("{node}_quinn_network_peer_udp_bytes"),
241                "The total number bytes observed by the UDP peer connection.",
242                &["peer_id", "peer_label", "direction"],
243                registry
244            )
245            .unwrap(),
246            network_peer_udp_transmits: register_int_gauge_vec_with_registry!(
247                format!("{node}_quinn_network_peer_udp_transmits"),
248                "The total number transmits observed by the UDP peer connection.",
249                &["peer_id", "peer_label", "direction"],
250                registry
251            )
252            .unwrap(),
253        }
254    }
255}
256
257#[derive(Clone)]
258pub struct NetworkRouteMetrics {
259    /// Counter of requests by route
260    pub requests: IntCounterVec,
261    /// Request latency by route
262    pub request_latency: HistogramVec,
263    /// Request size by route
264    pub request_size: HistogramVec,
265    /// Response size by route
266    pub response_size: HistogramVec,
267    /// Counter of requests exceeding the "excessive" size limit
268    pub excessive_size_requests: IntCounterVec,
269    /// Counter of responses exceeding the "excessive" size limit
270    pub excessive_size_responses: IntCounterVec,
271    /// Gauge of the number of inflight requests at any given time by route
272    pub inflight_requests: IntGaugeVec,
273    /// Failed requests by route
274    pub errors: IntCounterVec,
275}
276
277const LATENCY_SEC_BUCKETS: &[f64] = &[
278    0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
279];
280
281// Arbitrarily chosen buckets for message size, with gradually-lowering exponent to give us
282// better resolution at high sizes.
283const SIZE_BYTE_BUCKETS: &[f64] = &[
284    2048., 8192., // *4
285    16384., 32768., 65536., 131072., 262144., 524288., 1048576., // *2
286    1572864., 2359256., 3538944., // *1.5
287    4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
288    48787348., 63423553., // *1.3
289];
290
291impl NetworkRouteMetrics {
292    pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
293        let requests = register_int_counter_vec_with_registry!(
294            format!("{node}_{direction}_requests"),
295            "The number of requests made on the network",
296            &["route"],
297            registry
298        )
299        .unwrap();
300
301        let request_latency = register_histogram_vec_with_registry!(
302            format!("{node}_{direction}_request_latency"),
303            "Latency of a request by route",
304            &["route"],
305            LATENCY_SEC_BUCKETS.to_vec(),
306            registry,
307        )
308        .unwrap();
309
310        let request_size = register_histogram_vec_with_registry!(
311            format!("{node}_{direction}_request_size"),
312            "Size of a request by route",
313            &["route"],
314            SIZE_BYTE_BUCKETS.to_vec(),
315            registry,
316        )
317        .unwrap();
318
319        let response_size = register_histogram_vec_with_registry!(
320            format!("{node}_{direction}_response_size"),
321            "Size of a response by route",
322            &["route"],
323            SIZE_BYTE_BUCKETS.to_vec(),
324            registry,
325        )
326        .unwrap();
327
328        let excessive_size_requests = register_int_counter_vec_with_registry!(
329            format!("{node}_{direction}_excessive_size_requests"),
330            "The number of excessively large request messages sent",
331            &["route"],
332            registry
333        )
334        .unwrap();
335
336        let excessive_size_responses = register_int_counter_vec_with_registry!(
337            format!("{node}_{direction}_excessive_size_responses"),
338            "The number of excessively large response messages seen",
339            &["route"],
340            registry
341        )
342        .unwrap();
343
344        let inflight_requests = register_int_gauge_vec_with_registry!(
345            format!("{node}_{direction}_inflight_requests"),
346            "The number of inflight network requests",
347            &["route"],
348            registry
349        )
350        .unwrap();
351
352        let errors = register_int_counter_vec_with_registry!(
353            format!("{node}_{direction}_request_errors"),
354            "Number of errors by route",
355            &["route", "status"],
356            registry,
357        )
358        .unwrap();
359
360        Self {
361            requests,
362            request_latency,
363            request_size,
364            response_size,
365            excessive_size_requests,
366            excessive_size_responses,
367            inflight_requests,
368            errors,
369        }
370    }
371}
372
373#[derive(Clone)]
374pub struct MetricsMakeCallbackHandler {
375    metrics: Arc<NetworkRouteMetrics>,
376    /// Size in bytes above which a request or response message is considered excessively large
377    excessive_message_size: usize,
378}
379
380impl MetricsMakeCallbackHandler {
381    pub fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
382        Self {
383            metrics,
384            excessive_message_size,
385        }
386    }
387}
388
389impl MakeCallbackHandler for MetricsMakeCallbackHandler {
390    type Handler = MetricsResponseHandler;
391
392    fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
393        let route = request.route().to_owned();
394
395        self.metrics.requests.with_label_values(&[&route]).inc();
396        self.metrics
397            .inflight_requests
398            .with_label_values(&[&route])
399            .inc();
400        let body_len = request.body().len();
401        self.metrics
402            .request_size
403            .with_label_values(&[&route])
404            .observe(body_len as f64);
405        if body_len > self.excessive_message_size {
406            warn!(
407                "Saw excessively large request with size {body_len} for {route} with peer {:?}",
408                request.peer_id()
409            );
410            self.metrics
411                .excessive_size_requests
412                .with_label_values(&[&route])
413                .inc();
414        }
415
416        let timer = self
417            .metrics
418            .request_latency
419            .with_label_values(&[&route])
420            .start_timer();
421
422        MetricsResponseHandler {
423            metrics: self.metrics.clone(),
424            timer,
425            route,
426            excessive_message_size: self.excessive_message_size,
427        }
428    }
429}
430
431pub struct MetricsResponseHandler {
432    metrics: Arc<NetworkRouteMetrics>,
433    // The timer is held on to and "observed" once dropped
434    #[allow(unused)]
435    timer: HistogramTimer,
436    route: String,
437    excessive_message_size: usize,
438}
439
440impl ResponseHandler for MetricsResponseHandler {
441    fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
442        let body_len = response.body().len();
443        self.metrics
444            .response_size
445            .with_label_values(&[&self.route])
446            .observe(body_len as f64);
447        if body_len > self.excessive_message_size {
448            warn!(
449                "Saw excessively large response with size {body_len} for {} with peer {:?}",
450                self.route,
451                response.peer_id()
452            );
453            self.metrics
454                .excessive_size_responses
455                .with_label_values(&[&self.route])
456                .inc();
457        }
458
459        if !response.status().is_success() {
460            let status = response.status().to_u16().to_string();
461            self.metrics
462                .errors
463                .with_label_values(&[&self.route, &status])
464                .inc();
465        }
466    }
467
468    fn on_error<E>(self, _error: &E) {
469        self.metrics
470            .errors
471            .with_label_values(&[&self.route, "unknown"])
472            .inc();
473    }
474}
475
476impl Drop for MetricsResponseHandler {
477    fn drop(&mut self) {
478        self.metrics
479            .inflight_requests
480            .with_label_values(&[&self.route])
481            .dec();
482    }
483}