sui_indexer_alt_framework/pipeline/
logging.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Instant;
5
6use tracing::{debug, info};
7
8use crate::store::CommitterWatermark;
9
10use super::Processor;
11
12/// Tracing message for the watermark update will be logged at info level at least this many
13/// checkpoints.
14const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10;
15
16#[derive(Default)]
17pub(crate) struct LoggerWatermark {
18    checkpoint: i64,
19    transaction: Option<i64>,
20}
21
22pub(crate) struct WatermarkLogger {
23    name: &'static str,
24    timer: Instant,
25    prev_watermark: Option<LoggerWatermark>,
26}
27
28impl WatermarkLogger {
29    pub fn new(name: &'static str) -> Self {
30        Self {
31            name,
32            timer: Instant::now(),
33            prev_watermark: None,
34        }
35    }
36
37    /// Log the watermark update.
38    /// `watermark_update_latency` is the time spent to update the watermark.
39    ///
40    /// Given the new watermark, the logger will compare with the previous watermark to compute the
41    /// average TPS (transactions per second) and CPS (checkpoints per second) since the last update.
42    ///
43    /// If the watermark update is less than `LOUD_WATERMARK_UPDATE_INTERVAL` checkpoints apart,
44    /// the log message will be at debug level. Otherwise, it will be at info level.
45    pub fn log<H: Processor>(
46        &mut self,
47        watermark: impl Into<LoggerWatermark>,
48        watermark_update_latency: f64,
49    ) {
50        let watermark: LoggerWatermark = watermark.into();
51        // If we didn't set a watermark previously, set it and calculate metrics on the next update.
52        let Some(prev_watermark) = &self.prev_watermark else {
53            self.prev_watermark = Some(watermark);
54            self.timer = Instant::now();
55            return;
56        };
57
58        let logger_timer_elapsed = self.timer.elapsed().as_secs_f64();
59        let realtime_average_tps = match (prev_watermark.transaction, watermark.transaction) {
60            (Some(prev), Some(curr)) => Some((curr - prev) as f64 / logger_timer_elapsed),
61            _ => None,
62        };
63        let realtime_average_cps =
64            (watermark.checkpoint - prev_watermark.checkpoint) as f64 / logger_timer_elapsed;
65
66        if watermark.checkpoint < prev_watermark.checkpoint + LOUD_WATERMARK_UPDATE_INTERVAL {
67            debug!(
68                logger = self.name,
69                pipeline = H::NAME,
70                checkpoint = watermark.checkpoint,
71                transaction = watermark.transaction,
72                tps = realtime_average_tps,
73                cps = realtime_average_cps,
74                elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
75                "Updated watermark",
76            );
77            return;
78        }
79
80        info!(
81            logger = self.name,
82            pipeline = H::NAME,
83            checkpoint = watermark.checkpoint,
84            transaction = watermark.transaction,
85            tps = realtime_average_tps,
86            cps = realtime_average_cps,
87            elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
88            "Updated watermark",
89        );
90        self.prev_watermark = Some(watermark);
91        self.timer = Instant::now();
92    }
93}
94
95impl From<&CommitterWatermark> for LoggerWatermark {
96    fn from(watermark: &CommitterWatermark) -> Self {
97        Self {
98            checkpoint: watermark.checkpoint_hi_inclusive as i64,
99            transaction: Some(watermark.tx_hi as i64),
100        }
101    }
102}
103
104impl LoggerWatermark {
105    pub fn checkpoint(checkpoint: u64) -> Self {
106        Self {
107            checkpoint: checkpoint as i64,
108            transaction: None,
109        }
110    }
111}