sui_indexer_alt_framework/pipeline/
logging.rs1use std::time::Instant;
5
6use tracing::{debug, info};
7
8use crate::store::CommitterWatermark;
9
10use super::Processor;
11
12const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10;
15
16#[derive(Default)]
17pub(crate) struct LoggerWatermark {
18 checkpoint: i64,
19 transaction: Option<i64>,
20}
21
22pub(crate) struct WatermarkLogger {
23 name: &'static str,
24 timer: Instant,
25 prev_watermark: Option<LoggerWatermark>,
26}
27
28impl WatermarkLogger {
29 pub fn new(name: &'static str) -> Self {
30 Self {
31 name,
32 timer: Instant::now(),
33 prev_watermark: None,
34 }
35 }
36
37 pub fn log<H: Processor>(
46 &mut self,
47 watermark: impl Into<LoggerWatermark>,
48 watermark_update_latency: f64,
49 ) {
50 let watermark: LoggerWatermark = watermark.into();
51 let Some(prev_watermark) = &self.prev_watermark else {
53 self.prev_watermark = Some(watermark);
54 self.timer = Instant::now();
55 return;
56 };
57
58 let logger_timer_elapsed = self.timer.elapsed().as_secs_f64();
59 let realtime_average_tps = match (prev_watermark.transaction, watermark.transaction) {
60 (Some(prev), Some(curr)) => Some((curr - prev) as f64 / logger_timer_elapsed),
61 _ => None,
62 };
63 let realtime_average_cps =
64 (watermark.checkpoint - prev_watermark.checkpoint) as f64 / logger_timer_elapsed;
65
66 if watermark.checkpoint < prev_watermark.checkpoint + LOUD_WATERMARK_UPDATE_INTERVAL {
67 debug!(
68 logger = self.name,
69 pipeline = H::NAME,
70 checkpoint = watermark.checkpoint,
71 transaction = watermark.transaction,
72 tps = realtime_average_tps,
73 cps = realtime_average_cps,
74 elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
75 "Updated watermark",
76 );
77 return;
78 }
79
80 info!(
81 logger = self.name,
82 pipeline = H::NAME,
83 checkpoint = watermark.checkpoint,
84 transaction = watermark.transaction,
85 tps = realtime_average_tps,
86 cps = realtime_average_cps,
87 elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
88 "Updated watermark",
89 );
90 self.prev_watermark = Some(watermark);
91 self.timer = Instant::now();
92 }
93}
94
95impl From<&CommitterWatermark> for LoggerWatermark {
96 fn from(watermark: &CommitterWatermark) -> Self {
97 Self {
98 checkpoint: watermark.checkpoint_hi_inclusive as i64,
99 transaction: Some(watermark.tx_hi as i64),
100 }
101 }
102}
103
104impl LoggerWatermark {
105 pub fn checkpoint(checkpoint: u64) -> Self {
106 Self {
107 checkpoint: checkpoint as i64,
108 transaction: None,
109 }
110 }
111}