1use std::sync::Arc;
5
6use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler};
7use prometheus::{
8 HistogramTimer, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
9 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14pub(crate) struct NetworkMetrics {
16 pub(crate) network_type: IntGaugeVec,
17 pub(crate) inbound: Arc<NetworkRouteMetrics>,
18 pub(crate) outbound: Arc<NetworkRouteMetrics>,
19 #[cfg_attr(msim, allow(dead_code))]
20 pub(crate) tcp_connection_metrics: Arc<TcpConnectionMetrics>,
21 pub(crate) quinn_connection_metrics: Arc<QuinnConnectionMetrics>,
22}
23
24impl NetworkMetrics {
25 pub(crate) fn new(registry: &Registry) -> Self {
26 Self {
27 network_type: register_int_gauge_vec_with_registry!(
28 "network_type",
29 "Type of the network used: anemo or tonic",
30 &["type"],
31 registry
32 )
33 .unwrap(),
34 inbound: Arc::new(NetworkRouteMetrics::new("", "inbound", registry)),
35 outbound: Arc::new(NetworkRouteMetrics::new("", "outbound", registry)),
36 tcp_connection_metrics: Arc::new(TcpConnectionMetrics::new(registry)),
37 quinn_connection_metrics: Arc::new(QuinnConnectionMetrics::new("", registry)),
38 }
39 }
40}
41
42#[cfg_attr(msim, allow(dead_code))]
43pub(crate) struct TcpConnectionMetrics {
44 pub(crate) socket_send_buffer_size: IntGauge,
46 pub(crate) socket_recv_buffer_size: IntGauge,
48 pub(crate) socket_send_buffer_max_size: IntGauge,
50 pub(crate) socket_recv_buffer_max_size: IntGauge,
52}
53
54impl TcpConnectionMetrics {
55 pub fn new(registry: &Registry) -> Self {
56 Self {
57 socket_send_buffer_size: register_int_gauge_with_registry!(
58 "tcp_socket_send_buffer_size",
59 "Send buffer size of consensus TCP socket.",
60 registry
61 )
62 .unwrap(),
63 socket_recv_buffer_size: register_int_gauge_with_registry!(
64 "tcp_socket_recv_buffer_size",
65 "Receive buffer size of consensus TCP socket.",
66 registry
67 )
68 .unwrap(),
69 socket_send_buffer_max_size: register_int_gauge_with_registry!(
70 "tcp_socket_send_buffer_max_size",
71 "Max send buffer size of TCP socket.",
72 registry
73 )
74 .unwrap(),
75 socket_recv_buffer_max_size: register_int_gauge_with_registry!(
76 "tcp_socket_recv_buffer_max_size",
77 "Max receive buffer size of TCP socket.",
78 registry
79 )
80 .unwrap(),
81 }
82 }
83}
84
85pub struct QuinnConnectionMetrics {
86 pub network_peer_connected: IntGaugeVec,
88 pub network_peers: IntGauge,
90 pub network_peer_disconnects: IntCounterVec,
92 pub socket_receive_buffer_size: IntGauge,
94 pub socket_send_buffer_size: IntGauge,
96
97 pub network_peer_rtt: IntGaugeVec,
100 pub network_peer_lost_packets: IntGaugeVec,
102 pub network_peer_lost_bytes: IntGaugeVec,
104 pub network_peer_sent_packets: IntGaugeVec,
106 pub network_peer_congestion_events: IntGaugeVec,
108 pub network_peer_congestion_window: IntGaugeVec,
110
111 pub network_peer_max_data: IntGaugeVec,
114 pub network_peer_closed_connections: IntGaugeVec,
116 pub network_peer_data_blocked: IntGaugeVec,
118
119 pub network_peer_udp_datagrams: IntGaugeVec,
122 pub network_peer_udp_bytes: IntGaugeVec,
124 pub network_peer_udp_transmits: IntGaugeVec,
126}
127
128impl QuinnConnectionMetrics {
129 pub fn new(node: &'static str, registry: &Registry) -> Self {
130 Self {
131 network_peer_connected: register_int_gauge_vec_with_registry!(
132 format!("{node}_quinn_network_peer_connected"),
133 "The connection status of a peer. 0 if not connected, 1 if connected",
134 &["peer_id", "peer_label"],
135 registry
136 )
137 .unwrap(),
138 network_peers: register_int_gauge_with_registry!(
139 format!("{node}_quinn_network_peers"),
140 "The number of connected peers.",
141 registry
142 )
143 .unwrap(),
144 network_peer_disconnects: register_int_counter_vec_with_registry!(
145 format!("{node}_quinn_network_peer_disconnects"),
146 "Number of disconnect events per peer.",
147 &["peer_id", "peer_label", "reason"],
148 registry
149 )
150 .unwrap(),
151 socket_receive_buffer_size: register_int_gauge_with_registry!(
152 format!("{node}_quinn_socket_receive_buffer_size"),
153 "Receive buffer size of Anemo socket.",
154 registry
155 )
156 .unwrap(),
157 socket_send_buffer_size: register_int_gauge_with_registry!(
158 format!("{node}_quinn_socket_send_buffer_size"),
159 "Send buffer size of Anemo socket.",
160 registry
161 )
162 .unwrap(),
163
164 network_peer_rtt: register_int_gauge_vec_with_registry!(
166 format!("{node}_quinn_network_peer_rtt"),
167 "The rtt for a peer connection in ms.",
168 &["peer_id", "peer_label"],
169 registry
170 )
171 .unwrap(),
172 network_peer_lost_packets: register_int_gauge_vec_with_registry!(
173 format!("{node}_quinn_network_peer_lost_packets"),
174 "The total number of lost packets for a peer connection.",
175 &["peer_id", "peer_label"],
176 registry
177 )
178 .unwrap(),
179 network_peer_lost_bytes: register_int_gauge_vec_with_registry!(
180 format!("{node}_quinn_network_peer_lost_bytes"),
181 "The total number of lost bytes for a peer connection.",
182 &["peer_id", "peer_label"],
183 registry
184 )
185 .unwrap(),
186 network_peer_sent_packets: register_int_gauge_vec_with_registry!(
187 format!("{node}_quinn_network_peer_sent_packets"),
188 "The total number of sent packets for a peer connection.",
189 &["peer_id", "peer_label"],
190 registry
191 )
192 .unwrap(),
193 network_peer_congestion_events: register_int_gauge_vec_with_registry!(
194 format!("{node}_quinn_network_peer_congestion_events"),
195 "The total number of congestion events for a peer connection.",
196 &["peer_id", "peer_label"],
197 registry
198 )
199 .unwrap(),
200 network_peer_congestion_window: register_int_gauge_vec_with_registry!(
201 format!("{node}_quinn_network_peer_congestion_window"),
202 "The congestion window for a peer connection.",
203 &["peer_id", "peer_label"],
204 registry
205 )
206 .unwrap(),
207
208 network_peer_closed_connections: register_int_gauge_vec_with_registry!(
210 format!("{node}_quinn_network_peer_closed_connections"),
211 "The number of closed connections for a peer connection.",
212 &["peer_id", "peer_label", "direction"],
213 registry
214 )
215 .unwrap(),
216 network_peer_max_data: register_int_gauge_vec_with_registry!(
217 format!("{node}_quinn_network_peer_max_data"),
218 "The number of max data frames for a peer connection.",
219 &["peer_id", "peer_label", "direction"],
220 registry
221 )
222 .unwrap(),
223 network_peer_data_blocked: register_int_gauge_vec_with_registry!(
224 format!("{node}_quinn_network_peer_data_blocked"),
225 "The number of data blocked frames for a peer connection.",
226 &["peer_id", "peer_label", "direction"],
227 registry
228 )
229 .unwrap(),
230
231 network_peer_udp_datagrams: register_int_gauge_vec_with_registry!(
233 format!("{node}_quinn_network_peer_udp_datagrams"),
234 "The total number datagrams observed by the UDP peer connection.",
235 &["peer_id", "peer_label", "direction"],
236 registry
237 )
238 .unwrap(),
239 network_peer_udp_bytes: register_int_gauge_vec_with_registry!(
240 format!("{node}_quinn_network_peer_udp_bytes"),
241 "The total number bytes observed by the UDP peer connection.",
242 &["peer_id", "peer_label", "direction"],
243 registry
244 )
245 .unwrap(),
246 network_peer_udp_transmits: register_int_gauge_vec_with_registry!(
247 format!("{node}_quinn_network_peer_udp_transmits"),
248 "The total number transmits observed by the UDP peer connection.",
249 &["peer_id", "peer_label", "direction"],
250 registry
251 )
252 .unwrap(),
253 }
254 }
255}
256
257#[derive(Clone)]
258pub struct NetworkRouteMetrics {
259 pub requests: IntCounterVec,
261 pub request_latency: HistogramVec,
263 pub request_size: HistogramVec,
265 pub response_size: HistogramVec,
267 pub excessive_size_requests: IntCounterVec,
269 pub excessive_size_responses: IntCounterVec,
271 pub inflight_requests: IntGaugeVec,
273 pub errors: IntCounterVec,
275}
276
277const LATENCY_SEC_BUCKETS: &[f64] = &[
278 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
279];
280
281const SIZE_BYTE_BUCKETS: &[f64] = &[
284 2048., 8192., 16384., 32768., 65536., 131072., 262144., 524288., 1048576., 1572864., 2359256., 3538944., 4600627., 5980815., 7775060., 10107578., 13139851., 17081807., 22206349., 28868253., 37528729.,
288 48787348., 63423553., ];
290
291impl NetworkRouteMetrics {
292 pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self {
293 let requests = register_int_counter_vec_with_registry!(
294 format!("{node}_{direction}_requests"),
295 "The number of requests made on the network",
296 &["route"],
297 registry
298 )
299 .unwrap();
300
301 let request_latency = register_histogram_vec_with_registry!(
302 format!("{node}_{direction}_request_latency"),
303 "Latency of a request by route",
304 &["route"],
305 LATENCY_SEC_BUCKETS.to_vec(),
306 registry,
307 )
308 .unwrap();
309
310 let request_size = register_histogram_vec_with_registry!(
311 format!("{node}_{direction}_request_size"),
312 "Size of a request by route",
313 &["route"],
314 SIZE_BYTE_BUCKETS.to_vec(),
315 registry,
316 )
317 .unwrap();
318
319 let response_size = register_histogram_vec_with_registry!(
320 format!("{node}_{direction}_response_size"),
321 "Size of a response by route",
322 &["route"],
323 SIZE_BYTE_BUCKETS.to_vec(),
324 registry,
325 )
326 .unwrap();
327
328 let excessive_size_requests = register_int_counter_vec_with_registry!(
329 format!("{node}_{direction}_excessive_size_requests"),
330 "The number of excessively large request messages sent",
331 &["route"],
332 registry
333 )
334 .unwrap();
335
336 let excessive_size_responses = register_int_counter_vec_with_registry!(
337 format!("{node}_{direction}_excessive_size_responses"),
338 "The number of excessively large response messages seen",
339 &["route"],
340 registry
341 )
342 .unwrap();
343
344 let inflight_requests = register_int_gauge_vec_with_registry!(
345 format!("{node}_{direction}_inflight_requests"),
346 "The number of inflight network requests",
347 &["route"],
348 registry
349 )
350 .unwrap();
351
352 let errors = register_int_counter_vec_with_registry!(
353 format!("{node}_{direction}_request_errors"),
354 "Number of errors by route",
355 &["route", "status"],
356 registry,
357 )
358 .unwrap();
359
360 Self {
361 requests,
362 request_latency,
363 request_size,
364 response_size,
365 excessive_size_requests,
366 excessive_size_responses,
367 inflight_requests,
368 errors,
369 }
370 }
371}
372
373#[derive(Clone)]
374pub struct MetricsMakeCallbackHandler {
375 metrics: Arc<NetworkRouteMetrics>,
376 excessive_message_size: usize,
378}
379
380impl MetricsMakeCallbackHandler {
381 pub fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
382 Self {
383 metrics,
384 excessive_message_size,
385 }
386 }
387}
388
389impl MakeCallbackHandler for MetricsMakeCallbackHandler {
390 type Handler = MetricsResponseHandler;
391
392 fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
393 let route = request.route().to_owned();
394
395 self.metrics.requests.with_label_values(&[&route]).inc();
396 self.metrics
397 .inflight_requests
398 .with_label_values(&[&route])
399 .inc();
400 let body_len = request.body().len();
401 self.metrics
402 .request_size
403 .with_label_values(&[&route])
404 .observe(body_len as f64);
405 if body_len > self.excessive_message_size {
406 warn!(
407 "Saw excessively large request with size {body_len} for {route} with peer {:?}",
408 request.peer_id()
409 );
410 self.metrics
411 .excessive_size_requests
412 .with_label_values(&[&route])
413 .inc();
414 }
415
416 let timer = self
417 .metrics
418 .request_latency
419 .with_label_values(&[&route])
420 .start_timer();
421
422 MetricsResponseHandler {
423 metrics: self.metrics.clone(),
424 timer,
425 route,
426 excessive_message_size: self.excessive_message_size,
427 }
428 }
429}
430
431pub struct MetricsResponseHandler {
432 metrics: Arc<NetworkRouteMetrics>,
433 #[allow(unused)]
435 timer: HistogramTimer,
436 route: String,
437 excessive_message_size: usize,
438}
439
440impl ResponseHandler for MetricsResponseHandler {
441 fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
442 let body_len = response.body().len();
443 self.metrics
444 .response_size
445 .with_label_values(&[&self.route])
446 .observe(body_len as f64);
447 if body_len > self.excessive_message_size {
448 warn!(
449 "Saw excessively large response with size {body_len} for {} with peer {:?}",
450 self.route,
451 response.peer_id()
452 );
453 self.metrics
454 .excessive_size_responses
455 .with_label_values(&[&self.route])
456 .inc();
457 }
458
459 if !response.status().is_success() {
460 let status = response.status().to_u16().to_string();
461 self.metrics
462 .errors
463 .with_label_values(&[&self.route, &status])
464 .inc();
465 }
466 }
467
468 fn on_error<E>(self, _error: &E) {
469 self.metrics
470 .errors
471 .with_label_values(&[&self.route, "unknown"])
472 .inc();
473 }
474}
475
476impl Drop for MetricsResponseHandler {
477 fn drop(&mut self) {
478 self.metrics
479 .inflight_requests
480 .with_label_values(&[&self.route])
481 .dec();
482 }
483}