1use axum::{Router, extract::Extension, http::StatusCode, routing::get};
5use dashmap::DashMap;
6use parking_lot::Mutex;
7use prometheus::core::{AtomicI64, GenericGauge};
8use simple_server_timing_header::Timer;
9use std::future::Future;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Instant;
15
16use once_cell::sync::OnceCell;
17use prometheus::{
18 Histogram, IntCounterVec, IntGaugeVec, Registry, TextEncoder, register_histogram_with_registry,
19 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
20};
21use tap::TapFallible;
22use tracing::{Span, warn};
23
24pub use scopeguard;
25use uuid::Uuid;
26
27mod guards;
28pub mod histogram;
29pub mod metered_channel;
30pub mod monitored_mpsc;
31pub mod thread_stall_monitor;
32pub use guards::*;
33
34pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
35pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
36
37pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
39 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.125, 0.15, 0.175, 0.2, 0.225, 0.25, 0.275, 0.3,
40 0.325, 0.35, 0.375, 0.4, 0.425, 0.45, 0.475, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
41 0.95, 1., 2., 5., 10., 20., 30., 60., 90.,
42];
43
44pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
46 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
47 90.,
48];
49
50pub const LATENCY_SEC_BUCKETS: &[f64] = &[
52 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.125, 0.15, 0.175, 0.2, 0.225, 0.25, 0.275, 0.3,
53 0.325, 0.35, 0.375, 0.4, 0.425, 0.45, 0.475, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
54 0.95, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5., 6., 7.,
55 8., 9., 10., 15., 20., 25., 30., 60., 90.,
56];
57
58pub const COUNT_BUCKETS: &[f64] = &[
59 1., 2., 3., 4., 5., 7., 10., 15., 20., 25., 30., 40., 50., 75., 100., 150., 200., 500., 1000.,
60 2000., 5000., 10000.,
61];
62
63pub const BYTES_BUCKETS: &[f64] = &[
64 1., 4., 16., 64., 256., 1024., 4096., 8192., 16384., 32768., 65536., 131072., 262144., 524288.,
65 1048576., 2097152., 4194304., 8388608., 16777216., 33554432., 67108864.,
66];
67
68#[derive(Debug)]
69pub struct Metrics {
70 pub tasks: IntGaugeVec,
71 pub futures: IntGaugeVec,
72 pub channel_inflight: IntGaugeVec,
73 pub channel_sent: IntGaugeVec,
74 pub channel_received: IntGaugeVec,
75 pub future_active_duration_ns: IntGaugeVec,
76 pub scope_iterations: IntGaugeVec,
77 pub scope_duration_ns: IntGaugeVec,
78 pub scope_entrance: IntGaugeVec,
79 pub thread_stall_duration_sec: Histogram,
80 pub system_invariant_violations: IntCounterVec,
81}
82
83impl Metrics {
84 fn new(registry: &Registry) -> Self {
85 Self {
86 tasks: register_int_gauge_vec_with_registry!(
87 "monitored_tasks",
88 "Number of running tasks per callsite.",
89 &["callsite"],
90 registry,
91 )
92 .unwrap(),
93 futures: register_int_gauge_vec_with_registry!(
94 "monitored_futures",
95 "Number of pending futures per callsite.",
96 &["callsite"],
97 registry,
98 )
99 .unwrap(),
100 channel_inflight: register_int_gauge_vec_with_registry!(
101 "monitored_channel_inflight",
102 "Inflight items in channels.",
103 &["name"],
104 registry,
105 )
106 .unwrap(),
107 channel_sent: register_int_gauge_vec_with_registry!(
108 "monitored_channel_sent",
109 "Sent items in channels.",
110 &["name"],
111 registry,
112 )
113 .unwrap(),
114 channel_received: register_int_gauge_vec_with_registry!(
115 "monitored_channel_received",
116 "Received items in channels.",
117 &["name"],
118 registry,
119 )
120 .unwrap(),
121 future_active_duration_ns: register_int_gauge_vec_with_registry!(
122 "monitored_future_active_duration_ns",
123 "Total duration in nanosecs where the monitored future is active (consuming CPU time)",
124 &["name"],
125 registry,
126 )
127 .unwrap(),
128 scope_entrance: register_int_gauge_vec_with_registry!(
129 "monitored_scope_entrance",
130 "Number of entrance in the scope.",
131 &["name"],
132 registry,
133 )
134 .unwrap(),
135 scope_iterations: register_int_gauge_vec_with_registry!(
136 "monitored_scope_iterations",
137 "Total number of times where the monitored scope runs",
138 &["name"],
139 registry,
140 )
141 .unwrap(),
142 scope_duration_ns: register_int_gauge_vec_with_registry!(
143 "monitored_scope_duration_ns",
144 "Total duration in nanosecs where the monitored scope is running",
145 &["name"],
146 registry,
147 )
148 .unwrap(),
149 thread_stall_duration_sec: register_histogram_with_registry!(
150 "thread_stall_duration_sec",
151 "Duration of thread stalls in seconds.",
152 registry,
153 )
154 .unwrap(),
155 system_invariant_violations: register_int_counter_vec_with_registry!(
156 "system_invariant_violations",
157 "Number of system invariant violations",
158 &["name"],
159 registry,
160 ).unwrap(),
161 }
162 }
163}
164
165static METRICS: OnceCell<Metrics> = OnceCell::new();
166
167pub fn init_metrics(registry: &Registry) {
168 let _ = METRICS
169 .set(Metrics::new(registry))
170 .tap_err(|_| warn!("init_metrics registry overwritten"));
172}
173
174pub fn get_metrics() -> Option<&'static Metrics> {
175 METRICS.get()
176}
177
178tokio::task_local! {
179 static SERVER_TIMING: Arc<Mutex<Timer>>;
180}
181
182pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
186 let timer = Arc::new(Mutex::new(Timer::new()));
187
188 let mut ret = None;
189 SERVER_TIMING
190 .scope(timer, async {
191 ret = Some(fut.await);
192 })
193 .await;
194
195 ret.unwrap()
196}
197
198pub async fn with_server_timing<T>(
201 timer: Arc<Mutex<Timer>>,
202 fut: impl Future<Output = T> + Send + 'static,
203) -> T {
204 let mut ret = None;
205 SERVER_TIMING
206 .scope(timer, async {
207 ret = Some(fut.await);
208 })
209 .await;
210
211 ret.unwrap()
212}
213
214pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
216 SERVER_TIMING.try_with(|timer| timer.clone()).ok()
217}
218
219pub fn add_server_timing(name: &str) {
223 let res = SERVER_TIMING.try_with(|timer| {
224 timer.lock().add(name);
225 });
226
227 if res.is_err() {
228 tracing::error!("Server timing context not found");
229 }
230}
231
232#[macro_export]
233macro_rules! monitored_future {
234 ($fut: expr) => {{ monitored_future!(futures, $fut, "", INFO, false) }};
235
236 ($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
237 let location: &str = if $name.is_empty() {
238 concat!(file!(), ':', line!())
239 } else {
240 concat!(file!(), ':', $name)
241 };
242
243 async move {
244 let metrics = $crate::get_metrics();
245
246 let _metrics_guard = if let Some(m) = metrics {
247 m.$metric.with_label_values(&[location]).inc();
248 Some($crate::scopeguard::guard(m, |_| {
249 m.$metric.with_label_values(&[location]).dec();
250 }))
251 } else {
252 None
253 };
254 let _logging_guard = if $logging_enabled {
255 Some($crate::scopeguard::guard((), |_| {
256 tracing::event!(
257 tracing::Level::$logging_level,
258 "Future {} completed",
259 location
260 );
261 }))
262 } else {
263 None
264 };
265
266 if $logging_enabled {
267 tracing::event!(
268 tracing::Level::$logging_level,
269 "Spawning future {}",
270 location
271 );
272 }
273
274 $fut.await
275 }
276 }};
277}
278
279#[macro_export]
280macro_rules! forward_server_timing_and_spawn {
281 ($fut: expr) => {
282 if let Some(timing) = $crate::get_server_timing() {
283 tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
284 } else {
285 tokio::task::spawn($fut)
286 }
287 };
288}
289
290#[macro_export]
291macro_rules! spawn_monitored_task {
292 ($fut: expr) => {
293 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
294 tasks, $fut, "", INFO, false
295 ))
296 };
297}
298
299#[macro_export]
300macro_rules! spawn_logged_monitored_task {
301 ($fut: expr) => {
302 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
303 tasks, $fut, "", INFO, true
304 ))
305 };
306
307 ($fut: expr, $name: expr) => {
308 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
309 tasks, $fut, $name, INFO, true
310 ))
311 };
312
313 ($fut: expr, $name: expr, $logging_level: ident) => {
314 $crate::forward_server_timing_and_spawn!($crate::monitored_future!(
315 tasks,
316 $fut,
317 $name,
318 $logging_level,
319 true
320 ))
321 };
322}
323
324pub struct MonitoredScopeGuard {
325 metrics: &'static Metrics,
326 name: &'static str,
327 timer: Instant,
328}
329
330impl Drop for MonitoredScopeGuard {
331 fn drop(&mut self) {
332 self.metrics
333 .scope_duration_ns
334 .with_label_values(&[self.name])
335 .add(self.timer.elapsed().as_nanos() as i64);
336 self.metrics
337 .scope_entrance
338 .with_label_values(&[self.name])
339 .dec();
340 }
341}
342
343pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
352 let metrics = get_metrics();
353 if let Some(m) = metrics {
354 m.scope_iterations.with_label_values(&[name]).inc();
355 m.scope_entrance.with_label_values(&[name]).inc();
356 Some(MonitoredScopeGuard {
357 metrics: m,
358 name,
359 timer: Instant::now(),
360 })
361 } else {
362 None
363 }
364}
365
366pub trait MonitoredFutureExt: Future + Sized {
367 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
368}
369
370impl<F: Future> MonitoredFutureExt for F {
371 fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
372 MonitoredScopeFuture {
373 f: Box::pin(self),
374 active_duration_metric: get_metrics()
375 .map(|m| m.future_active_duration_ns.with_label_values(&[name])),
376 _scope: monitored_scope(name),
377 }
378 }
379}
380
381pub struct MonitoredScopeFuture<F: Sized> {
382 f: Pin<Box<F>>,
383 active_duration_metric: Option<GenericGauge<AtomicI64>>,
384 _scope: Option<MonitoredScopeGuard>,
385}
386
387impl<F: Future> Future for MonitoredScopeFuture<F> {
388 type Output = F::Output;
389
390 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
391 let active_timer = Instant::now();
392 let ret = self.f.as_mut().poll(cx);
393 if let Some(m) = &self.active_duration_metric {
394 m.add(active_timer.elapsed().as_nanos() as i64);
395 }
396 ret
397 }
398}
399
400pub struct CancelMonitor<F: Sized> {
401 finished: bool,
402 inner: Pin<Box<F>>,
403}
404
405impl<F> CancelMonitor<F>
406where
407 F: Future,
408{
409 pub fn new(inner: F) -> Self {
410 Self {
411 finished: false,
412 inner: Box::pin(inner),
413 }
414 }
415
416 pub fn is_finished(&self) -> bool {
417 self.finished
418 }
419}
420
421impl<F> Future for CancelMonitor<F>
422where
423 F: Future,
424{
425 type Output = F::Output;
426
427 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428 match self.inner.as_mut().poll(cx) {
429 Poll::Ready(output) => {
430 self.finished = true;
431 Poll::Ready(output)
432 }
433 Poll::Pending => Poll::Pending,
434 }
435 }
436}
437
438impl<F: Sized> Drop for CancelMonitor<F> {
439 fn drop(&mut self) {
440 if !self.finished {
441 Span::current().record("cancelled", true);
442 }
443 }
444}
445
446pub trait MonitorCancellation {
450 fn monitor_cancellation(self) -> CancelMonitor<Self>
451 where
452 Self: Sized + Future;
453}
454
455impl<T> MonitorCancellation for T
456where
457 T: Future,
458{
459 fn monitor_cancellation(self) -> CancelMonitor<Self> {
460 CancelMonitor::new(self)
461 }
462}
463
464pub type RegistryID = Uuid;
465
466#[derive(Clone, Debug)]
470pub struct RegistryService {
471 default_registry: Registry,
473 registries_by_id: Arc<DashMap<Uuid, Registry>>,
474}
475
476impl RegistryService {
477 pub fn new(default_registry: Registry) -> Self {
480 Self {
481 default_registry,
482 registries_by_id: Arc::new(DashMap::new()),
483 }
484 }
485
486 pub fn default_registry(&self) -> Registry {
489 self.default_registry.clone()
490 }
491
492 pub fn add(&self, registry: Registry) -> RegistryID {
497 let registry_id = Uuid::new_v4();
498 if self
499 .registries_by_id
500 .insert(registry_id, registry)
501 .is_some()
502 {
503 panic!("Other Registry already detected for the same id {registry_id}");
504 }
505
506 registry_id
507 }
508
509 pub fn remove(&self, registry_id: RegistryID) -> bool {
512 self.registries_by_id.remove(®istry_id).is_some()
513 }
514
515 pub fn get_all(&self) -> Vec<Registry> {
517 let mut registries: Vec<Registry> = self
518 .registries_by_id
519 .iter()
520 .map(|r| r.value().clone())
521 .collect();
522 registries.push(self.default_registry.clone());
523
524 registries
525 }
526
527 pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
529 self.get_all().iter().flat_map(|r| r.gather()).collect()
530 }
531}
532
533pub fn uptime_metric(
539 process: &str,
540 version: &'static str,
541 chain_identifier: &str,
542) -> Box<dyn prometheus::core::Collector> {
543 let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
544 .variable_label("process")
545 .variable_label("version")
546 .variable_label("chain_identifier");
547
548 let start_time = std::time::Instant::now();
549 let uptime = move || start_time.elapsed().as_secs();
550 let metric = prometheus_closure_metric::ClosureMetric::new(
551 opts,
552 prometheus_closure_metric::ValueType::Counter,
553 uptime,
554 &[process, version, chain_identifier],
555 )
556 .unwrap();
557
558 Box::new(metric)
559}
560
561pub fn bridge_uptime_metric(
570 process: &str,
571 version: &'static str,
572 sui_chain_identifier: &str,
573 eth_chain_identifier: &str,
574 client_enabled: bool,
575) -> Box<dyn prometheus::core::Collector> {
576 let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
577 .variable_label("process")
578 .variable_label("version")
579 .variable_label("sui_chain_identifier")
580 .variable_label("eth_chain_identifier")
581 .variable_label("client_enabled");
582
583 let start_time = std::time::Instant::now();
584 let uptime = move || start_time.elapsed().as_secs();
585 let metric = prometheus_closure_metric::ClosureMetric::new(
586 opts,
587 prometheus_closure_metric::ValueType::Counter,
588 uptime,
589 &[
590 process,
591 version,
592 sui_chain_identifier,
593 eth_chain_identifier,
594 if client_enabled { "true" } else { "false" },
595 ],
596 )
597 .unwrap();
598
599 Box::new(metric)
600}
601
602pub const METRICS_ROUTE: &str = "/metrics";
603
604pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
608 let registry = Registry::new();
609
610 let registry_service = RegistryService::new(registry);
611
612 if cfg!(msim) {
613 warn!("not starting prometheus server in simulator");
616 return registry_service;
617 }
618
619 let app = Router::new()
620 .route(METRICS_ROUTE, get(metrics))
621 .layer(Extension(registry_service.clone()));
622
623 tokio::spawn(async move {
624 let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
625 axum::serve(listener, app.into_make_service())
626 .await
627 .unwrap();
628 });
629
630 registry_service
631}
632
633pub async fn metrics(
634 Extension(registry_service): Extension<RegistryService>,
635) -> (StatusCode, String) {
636 let metrics_families = registry_service.gather_all();
637 match TextEncoder.encode_to_string(&metrics_families) {
638 Ok(metrics) => (StatusCode::OK, metrics),
639 Err(error) => (
640 StatusCode::INTERNAL_SERVER_ERROR,
641 format!("unable to encode metrics: {error}"),
642 ),
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use crate::RegistryService;
649 use prometheus::IntCounter;
650 use prometheus::Registry;
651
652 #[test]
653 fn registry_service() {
654 let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
656
657 let registry_service = RegistryService::new(default_registry.clone());
658 let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
659 default_counter.inc();
660 default_registry
661 .register(Box::new(default_counter))
662 .unwrap();
663
664 let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap();
668 registry_1
669 .register(Box::new(
670 IntCounter::new("counter_1", "counter_1_desc").unwrap(),
671 ))
672 .unwrap();
673
674 let registry_1_id = registry_service.add(registry_1);
676
677 let mut metrics = registry_service.gather_all();
679 metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
680
681 assert_eq!(metrics.len(), 2);
682
683 let metric_default = metrics.remove(0);
684 assert_eq!(metric_default.get_name(), "default_counter");
685 assert_eq!(metric_default.get_help(), "counter_desc");
686
687 let metric_1 = metrics.remove(0);
688 assert_eq!(metric_1.get_name(), "narwhal_counter_1");
689 assert_eq!(metric_1.get_help(), "counter_1_desc");
690
691 let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap();
693 registry_2
694 .register(Box::new(
695 IntCounter::new("counter_2", "counter_2_desc").unwrap(),
696 ))
697 .unwrap();
698 let _registry_2_id = registry_service.add(registry_2);
699
700 let mut metrics = registry_service.gather_all();
702 metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
703
704 assert_eq!(metrics.len(), 3);
705
706 let metric_default = metrics.remove(0);
707 assert_eq!(metric_default.get_name(), "default_counter");
708 assert_eq!(metric_default.get_help(), "counter_desc");
709
710 let metric_1 = metrics.remove(0);
711 assert_eq!(metric_1.get_name(), "narwhal_counter_1");
712 assert_eq!(metric_1.get_help(), "counter_1_desc");
713
714 let metric_2 = metrics.remove(0);
715 assert_eq!(metric_2.get_name(), "sui_counter_2");
716 assert_eq!(metric_2.get_help(), "counter_2_desc");
717
718 assert!(registry_service.remove(registry_1_id));
720
721 let mut metrics = registry_service.gather_all();
723 metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
724
725 assert_eq!(metrics.len(), 2);
726
727 let metric_default = metrics.remove(0);
728 assert_eq!(metric_default.get_name(), "default_counter");
729 assert_eq!(metric_default.get_help(), "counter_desc");
730
731 let metric_1 = metrics.remove(0);
732 assert_eq!(metric_1.get_name(), "sui_counter_2");
733 assert_eq!(metric_1.get_help(), "counter_2_desc");
734 }
735}