use axum::{extract::Extension, http::StatusCode, routing::get, Router};
use dashmap::DashMap;
use parking_lot::Mutex;
use prometheus::core::{AtomicI64, GenericGauge};
use simple_server_timing_header::Timer;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use once_cell::sync::OnceCell;
use prometheus::{
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, Histogram, IntCounterVec, IntGaugeVec, Registry,
TextEncoder,
};
use tap::TapFallible;
use tracing::{warn, Span};
pub use scopeguard;
use uuid::Uuid;
mod guards;
pub mod histogram;
pub mod metered_channel;
pub mod monitored_mpsc;
pub mod thread_stall_monitor;
pub use guards::*;
pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
pub const SUBSECOND_LATENCY_SEC_BUCKETS: &[f64] = &[
0.005, 0.01, 0.02, 0.03, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1.,
];
pub const COARSE_LATENCY_SEC_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.5, 0.7, 1., 2., 3., 5., 10., 20., 30., 60.,
];
pub const LATENCY_SEC_BUCKETS: &[f64] = &[
0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6,
0.7, 0.8, 0.9, 1., 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2., 2.5, 3., 3.5, 4., 4.5, 5.,
6., 7., 8., 9., 10., 15., 20., 25., 30., 60., 90.,
];
pub const COUNT_BUCKETS: &[f64] = &[
2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000.,
];
pub const BYTES_BUCKETS: &[f64] = &[
1024., 4096., 16384., 65536., 262144., 524288., 1048576., 2097152., 4194304., 8388608.,
16777216., 33554432., 67108864.,
];
#[derive(Debug)]
pub struct Metrics {
pub tasks: IntGaugeVec,
pub futures: IntGaugeVec,
pub channel_inflight: IntGaugeVec,
pub channel_sent: IntGaugeVec,
pub channel_received: IntGaugeVec,
pub future_active_duration_ns: IntGaugeVec,
pub scope_iterations: IntGaugeVec,
pub scope_duration_ns: IntGaugeVec,
pub scope_entrance: IntGaugeVec,
pub thread_stall_duration_sec: Histogram,
pub system_invariant_violations: IntCounterVec,
}
impl Metrics {
fn new(registry: &Registry) -> Self {
Self {
tasks: register_int_gauge_vec_with_registry!(
"monitored_tasks",
"Number of running tasks per callsite.",
&["callsite"],
registry,
)
.unwrap(),
futures: register_int_gauge_vec_with_registry!(
"monitored_futures",
"Number of pending futures per callsite.",
&["callsite"],
registry,
)
.unwrap(),
channel_inflight: register_int_gauge_vec_with_registry!(
"monitored_channel_inflight",
"Inflight items in channels.",
&["name"],
registry,
)
.unwrap(),
channel_sent: register_int_gauge_vec_with_registry!(
"monitored_channel_sent",
"Sent items in channels.",
&["name"],
registry,
)
.unwrap(),
channel_received: register_int_gauge_vec_with_registry!(
"monitored_channel_received",
"Received items in channels.",
&["name"],
registry,
)
.unwrap(),
future_active_duration_ns: register_int_gauge_vec_with_registry!(
"monitored_future_active_duration_ns",
"Total duration in nanosecs where the monitored future is active (consuming CPU time)",
&["name"],
registry,
)
.unwrap(),
scope_entrance: register_int_gauge_vec_with_registry!(
"monitored_scope_entrance",
"Number of entrance in the scope.",
&["name"],
registry,
)
.unwrap(),
scope_iterations: register_int_gauge_vec_with_registry!(
"monitored_scope_iterations",
"Total number of times where the monitored scope runs",
&["name"],
registry,
)
.unwrap(),
scope_duration_ns: register_int_gauge_vec_with_registry!(
"monitored_scope_duration_ns",
"Total duration in nanosecs where the monitored scope is running",
&["name"],
registry,
)
.unwrap(),
thread_stall_duration_sec: register_histogram_with_registry!(
"thread_stall_duration_sec",
"Duration of thread stalls in seconds.",
registry,
)
.unwrap(),
system_invariant_violations: register_int_counter_vec_with_registry!(
"system_invariant_violations",
"Number of system invariant violations",
&["name"],
registry,
).unwrap(),
}
}
}
static METRICS: OnceCell<Metrics> = OnceCell::new();
pub fn init_metrics(registry: &Registry) {
let _ = METRICS
.set(Metrics::new(registry))
.tap_err(|_| warn!("init_metrics registry overwritten"));
}
pub fn get_metrics() -> Option<&'static Metrics> {
METRICS.get()
}
tokio::task_local! {
static SERVER_TIMING: Arc<Mutex<Timer>>;
}
pub async fn with_new_server_timing<T>(fut: impl Future<Output = T> + Send + 'static) -> T {
let timer = Arc::new(Mutex::new(Timer::new()));
let mut ret = None;
SERVER_TIMING
.scope(timer, async {
ret = Some(fut.await);
})
.await;
ret.unwrap()
}
pub async fn with_server_timing<T>(
timer: Arc<Mutex<Timer>>,
fut: impl Future<Output = T> + Send + 'static,
) -> T {
let mut ret = None;
SERVER_TIMING
.scope(timer, async {
ret = Some(fut.await);
})
.await;
ret.unwrap()
}
pub fn get_server_timing() -> Option<Arc<Mutex<Timer>>> {
SERVER_TIMING.try_with(|timer| timer.clone()).ok()
}
pub fn add_server_timing(name: &str) {
let res = SERVER_TIMING.try_with(|timer| {
timer.lock().add(name);
});
if res.is_err() {
tracing::error!("Server timing context not found");
}
}
#[macro_export]
macro_rules! monitored_future {
($fut: expr) => {{
monitored_future!(futures, $fut, "", INFO, false)
}};
($metric: ident, $fut: expr, $name: expr, $logging_level: ident, $logging_enabled: expr) => {{
let location: &str = if $name.is_empty() {
concat!(file!(), ':', line!())
} else {
concat!(file!(), ':', $name)
};
async move {
let metrics = $crate::get_metrics();
let _metrics_guard = if let Some(m) = metrics {
m.$metric.with_label_values(&[location]).inc();
Some($crate::scopeguard::guard(m, |_| {
m.$metric.with_label_values(&[location]).dec();
}))
} else {
None
};
let _logging_guard = if $logging_enabled {
Some($crate::scopeguard::guard((), |_| {
tracing::event!(
tracing::Level::$logging_level,
"Future {} completed",
location
);
}))
} else {
None
};
if $logging_enabled {
tracing::event!(
tracing::Level::$logging_level,
"Spawning future {}",
location
);
}
$fut.await
}
}};
}
#[macro_export]
macro_rules! forward_server_timing_and_spawn {
($fut: expr) => {
if let Some(timing) = $crate::get_server_timing() {
tokio::task::spawn(async move { $crate::with_server_timing(timing, $fut).await })
} else {
tokio::task::spawn($fut)
}
};
}
#[macro_export]
macro_rules! spawn_monitored_task {
($fut: expr) => {
$crate::forward_server_timing_and_spawn!($crate::monitored_future!(
tasks, $fut, "", INFO, false
))
};
}
#[macro_export]
macro_rules! spawn_logged_monitored_task {
($fut: expr) => {
$crate::forward_server_timing_and_spawn!($crate::monitored_future!(
tasks, $fut, "", INFO, true
))
};
($fut: expr, $name: expr) => {
$crate::forward_server_timing_and_spawn!($crate::monitored_future!(
tasks, $fut, $name, INFO, true
))
};
($fut: expr, $name: expr, $logging_level: ident) => {
$crate::forward_server_timing_and_spawn!($crate::monitored_future!(
tasks,
$fut,
$name,
$logging_level,
true
))
};
}
pub struct MonitoredScopeGuard {
metrics: &'static Metrics,
name: &'static str,
timer: Instant,
}
impl Drop for MonitoredScopeGuard {
fn drop(&mut self) {
self.metrics
.scope_duration_ns
.with_label_values(&[self.name])
.add(self.timer.elapsed().as_nanos() as i64);
self.metrics
.scope_entrance
.with_label_values(&[self.name])
.dec();
}
}
pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
let metrics = get_metrics();
if let Some(m) = metrics {
m.scope_iterations.with_label_values(&[name]).inc();
m.scope_entrance.with_label_values(&[name]).inc();
Some(MonitoredScopeGuard {
metrics: m,
name,
timer: Instant::now(),
})
} else {
None
}
}
pub trait MonitoredFutureExt: Future + Sized {
fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self>;
}
impl<F: Future> MonitoredFutureExt for F {
fn in_monitored_scope(self, name: &'static str) -> MonitoredScopeFuture<Self> {
MonitoredScopeFuture {
f: Box::pin(self),
active_duration_metric: get_metrics()
.map(|m| m.future_active_duration_ns.with_label_values(&[name])),
_scope: monitored_scope(name),
}
}
}
pub struct MonitoredScopeFuture<F: Sized> {
f: Pin<Box<F>>,
active_duration_metric: Option<GenericGauge<AtomicI64>>,
_scope: Option<MonitoredScopeGuard>,
}
impl<F: Future> Future for MonitoredScopeFuture<F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let active_timer = Instant::now();
let ret = self.f.as_mut().poll(cx);
if let Some(m) = &self.active_duration_metric {
m.add(active_timer.elapsed().as_nanos() as i64);
}
ret
}
}
pub struct CancelMonitor<F: Sized> {
finished: bool,
inner: Pin<Box<F>>,
}
impl<F> CancelMonitor<F>
where
F: Future,
{
pub fn new(inner: F) -> Self {
Self {
finished: false,
inner: Box::pin(inner),
}
}
pub fn is_finished(&self) -> bool {
self.finished
}
}
impl<F> Future for CancelMonitor<F>
where
F: Future,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.as_mut().poll(cx) {
Poll::Ready(output) => {
self.finished = true;
Poll::Ready(output)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<F: Sized> Drop for CancelMonitor<F> {
fn drop(&mut self) {
if !self.finished {
Span::current().record("cancelled", true);
}
}
}
pub trait MonitorCancellation {
fn monitor_cancellation(self) -> CancelMonitor<Self>
where
Self: Sized + Future;
}
impl<T> MonitorCancellation for T
where
T: Future,
{
fn monitor_cancellation(self) -> CancelMonitor<Self> {
CancelMonitor::new(self)
}
}
pub type RegistryID = Uuid;
#[derive(Clone)]
pub struct RegistryService {
default_registry: Registry,
registries_by_id: Arc<DashMap<Uuid, Registry>>,
}
impl RegistryService {
pub fn new(default_registry: Registry) -> Self {
Self {
default_registry,
registries_by_id: Arc::new(DashMap::new()),
}
}
pub fn default_registry(&self) -> Registry {
self.default_registry.clone()
}
pub fn add(&self, registry: Registry) -> RegistryID {
let registry_id = Uuid::new_v4();
if self
.registries_by_id
.insert(registry_id, registry)
.is_some()
{
panic!("Other Registry already detected for the same id {registry_id}");
}
registry_id
}
pub fn remove(&self, registry_id: RegistryID) -> bool {
self.registries_by_id.remove(®istry_id).is_some()
}
pub fn get_all(&self) -> Vec<Registry> {
let mut registries: Vec<Registry> = self
.registries_by_id
.iter()
.map(|r| r.value().clone())
.collect();
registries.push(self.default_registry.clone());
registries
}
pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
self.get_all().iter().flat_map(|r| r.gather()).collect()
}
}
pub fn uptime_metric(
process: &str,
version: &'static str,
chain_identifier: &str,
) -> Box<dyn prometheus::core::Collector> {
let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
.variable_label("process")
.variable_label("version")
.variable_label("chain_identifier");
let start_time = std::time::Instant::now();
let uptime = move || start_time.elapsed().as_secs();
let metric = prometheus_closure_metric::ClosureMetric::new(
opts,
prometheus_closure_metric::ValueType::Counter,
uptime,
&[process, version, chain_identifier],
)
.unwrap();
Box::new(metric)
}
pub fn bridge_uptime_metric(
process: &str,
version: &'static str,
sui_chain_identifier: &str,
eth_chain_identifier: &str,
client_enabled: bool,
) -> Box<dyn prometheus::core::Collector> {
let opts = prometheus::opts!("uptime", "uptime of the node service in seconds")
.variable_label("process")
.variable_label("version")
.variable_label("sui_chain_identifier")
.variable_label("eth_chain_identifier")
.variable_label("client_enabled");
let start_time = std::time::Instant::now();
let uptime = move || start_time.elapsed().as_secs();
let metric = prometheus_closure_metric::ClosureMetric::new(
opts,
prometheus_closure_metric::ValueType::Counter,
uptime,
&[
process,
version,
sui_chain_identifier,
eth_chain_identifier,
if client_enabled { "true" } else { "false" },
],
)
.unwrap();
Box::new(metric)
}
pub const METRICS_ROUTE: &str = "/metrics";
pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
let registry = Registry::new();
let registry_service = RegistryService::new(registry);
if cfg!(msim) {
warn!("not starting prometheus server in simulator");
return registry_service;
}
let app = Router::new()
.route(METRICS_ROUTE, get(metrics))
.layer(Extension(registry_service.clone()));
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
});
registry_service
}
pub async fn metrics(
Extension(registry_service): Extension<RegistryService>,
) -> (StatusCode, String) {
let metrics_families = registry_service.gather_all();
match TextEncoder.encode_to_string(&metrics_families) {
Ok(metrics) => (StatusCode::OK, metrics),
Err(error) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("unable to encode metrics: {error}"),
),
}
}
#[cfg(test)]
mod tests {
use crate::RegistryService;
use prometheus::IntCounter;
use prometheus::Registry;
#[test]
fn registry_service() {
let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();
let registry_service = RegistryService::new(default_registry.clone());
let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
default_counter.inc();
default_registry
.register(Box::new(default_counter))
.unwrap();
let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap();
registry_1
.register(Box::new(
IntCounter::new("counter_1", "counter_1_desc").unwrap(),
))
.unwrap();
let registry_1_id = registry_service.add(registry_1);
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
assert_eq!(metrics.len(), 2);
let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");
let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "narwhal_counter_1");
assert_eq!(metric_1.get_help(), "counter_1_desc");
let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap();
registry_2
.register(Box::new(
IntCounter::new("counter_2", "counter_2_desc").unwrap(),
))
.unwrap();
let _registry_2_id = registry_service.add(registry_2);
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
assert_eq!(metrics.len(), 3);
let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");
let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "narwhal_counter_1");
assert_eq!(metric_1.get_help(), "counter_1_desc");
let metric_2 = metrics.remove(0);
assert_eq!(metric_2.get_name(), "sui_counter_2");
assert_eq!(metric_2.get_help(), "counter_2_desc");
assert!(registry_service.remove(registry_1_id));
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));
assert_eq!(metrics.len(), 2);
let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");
let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "sui_counter_2");
assert_eq!(metric_1.get_help(), "counter_2_desc");
}
}