sui_indexer_alt_framework/pipeline/
logging.rsuse std::time::Instant;
use tracing::{debug, info};
use crate::store::CommitterWatermark;
use super::Processor;
const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10;
#[derive(Default)]
pub(crate) struct LoggerWatermark {
checkpoint: i64,
transaction: Option<i64>,
}
pub(crate) struct WatermarkLogger {
name: &'static str,
timer: Instant,
prev_watermark: LoggerWatermark,
}
impl WatermarkLogger {
pub fn new(name: &'static str, init_watermark: impl Into<LoggerWatermark>) -> Self {
Self {
name,
timer: Instant::now(),
prev_watermark: init_watermark.into(),
}
}
pub fn log<H: Processor>(
&mut self,
watermark: impl Into<LoggerWatermark>,
watermark_update_latency: f64,
) {
let watermark: LoggerWatermark = watermark.into();
let logger_timer_elapsed = self.timer.elapsed().as_secs_f64();
let realtime_average_tps = match (self.prev_watermark.transaction, watermark.transaction) {
(Some(prev), Some(curr)) => Some((curr - prev) as f64 / logger_timer_elapsed),
_ => None,
};
let realtime_average_cps =
(watermark.checkpoint - self.prev_watermark.checkpoint) as f64 / logger_timer_elapsed;
if watermark.checkpoint < self.prev_watermark.checkpoint + LOUD_WATERMARK_UPDATE_INTERVAL {
debug!(
logger = self.name,
pipeline = H::NAME,
checkpoint = watermark.checkpoint,
transaction = watermark.transaction,
tps = realtime_average_tps,
cps = realtime_average_cps,
elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
"Updated watermark",
);
return;
}
info!(
logger = self.name,
pipeline = H::NAME,
checkpoint = watermark.checkpoint,
transaction = watermark.transaction,
tps = realtime_average_tps,
cps = realtime_average_cps,
elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
"Updated watermark",
);
self.prev_watermark = watermark;
self.timer = Instant::now();
}
}
impl From<&CommitterWatermark> for LoggerWatermark {
fn from(watermark: &CommitterWatermark) -> Self {
Self {
checkpoint: watermark.checkpoint_hi_inclusive as i64,
transaction: Some(watermark.tx_hi as i64),
}
}
}
impl LoggerWatermark {
pub fn checkpoint(checkpoint: u64) -> Self {
Self {
checkpoint: checkpoint as i64,
transaction: None,
}
}
}