sui_core/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use parking_lot::Mutex;
5use std::collections::VecDeque;
6use std::default::Default;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use tokio::time::Duration;
10use tokio::time::Instant;
11
12pub struct LatencyObserver {
13    data: Mutex<LatencyObserverInner>,
14    latency_ms: AtomicU64,
15}
16
17#[derive(Default)]
18struct LatencyObserverInner {
19    points: VecDeque<Duration>,
20    sum: Duration,
21}
22
23impl LatencyObserver {
24    pub fn new() -> Self {
25        Self {
26            data: Mutex::new(LatencyObserverInner::default()),
27            latency_ms: AtomicU64::new(u64::MAX),
28        }
29    }
30
31    pub fn report(&self, latency: Duration) {
32        const EXPECTED_SAMPLES: usize = 128;
33        let mut data = self.data.lock();
34        data.points.push_back(latency);
35        data.sum += latency;
36        if data.points.len() < EXPECTED_SAMPLES {
37            // Do not initialize average latency until there are enough samples.
38            return;
39        }
40        while data.points.len() > EXPECTED_SAMPLES {
41            let pop = data.points.pop_front().expect("data vector is not empty");
42            data.sum -= pop; // This does not underflow because of how running sum is calculated
43        }
44        let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
45        self.latency_ms.store(latency, Ordering::Relaxed);
46    }
47
48    pub fn latency(&self) -> Option<Duration> {
49        let latency = self.latency_ms.load(Ordering::Relaxed);
50        if latency == u64::MAX {
51            // Not initialized yet (not enough data points)
52            None
53        } else {
54            Some(Duration::from_millis(latency))
55        }
56    }
57}
58
59impl Default for LatencyObserver {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65/// RateTracker tracks events in a rolling window, and calculates the rate of events.
66/// Internally, the tracker divides the tracking window into multiple BIN_DURATION,
67/// and counts events in each BIN_DURATION in a fixed sized buffer.
68pub struct RateTracker {
69    // Counts the number of events by bins. Each bin is BIN_DURATION long within window_duration.
70    // The size of the buffer = window_duration / BIN_DURATION.
71    event_buffer: Vec<u64>,
72    window_duration: Duration,
73    total_bins: usize,
74
75    // We use the event time and the tracker start time to calculate the bin that an event
76    // belongs to.
77    // event_global_bin_index = (event_time - start_time) / BIN_DURATION.
78    // event_index_in_buffer = event_global_bin_index % buffer_size.
79    start_time: Instant,
80
81    // Last updated global bin index. This tracks the end of the rolling window.
82    global_bin_index: u64,
83}
84
85const BIN_DURATION: Duration = Duration::from_millis(100);
86
87impl RateTracker {
88    /// Create a new RateTracker to track event rate (events/seconds) in `window_duration`.
89    pub fn new(window_duration: Duration) -> Self {
90        assert!(window_duration > BIN_DURATION);
91        let total_bins = (window_duration.as_millis() / BIN_DURATION.as_millis()) as usize;
92        RateTracker {
93            event_buffer: vec![0; total_bins],
94            window_duration,
95            total_bins,
96            start_time: Instant::now(),
97            global_bin_index: 0,
98        }
99    }
100
101    /// Records an event at time `now`.
102    pub fn record_at_time(&mut self, now: Instant) {
103        self.update_window(now);
104        let current_bin_index = self.get_bin_index(now) as usize;
105        if current_bin_index + self.total_bins <= self.global_bin_index as usize {
106            // The bin associated with `now` has passed the rolling window.
107            return;
108        }
109
110        self.event_buffer[current_bin_index % self.total_bins] += 1;
111    }
112
113    /// Records an event at current time.
114    pub fn record(&mut self) {
115        self.record_at_time(Instant::now());
116    }
117
118    /// Returns the rate of events.
119    pub fn rate(&mut self) -> f64 {
120        let now = Instant::now();
121        self.update_window(now);
122        self.event_buffer.iter().sum::<u64>() as f64 / self.window_duration.as_secs_f64()
123    }
124
125    // Given a time `now`, returns the bin index since `start_time`.
126    fn get_bin_index(&self, now: Instant) -> u64 {
127        (now.duration_since(self.start_time).as_millis() / BIN_DURATION.as_millis()) as u64
128    }
129
130    // Updates the rolling window to accommodate the time of interests, `now`. That is, remove any
131    // event counts happened prior to (`now` - `window_duration`).
132    fn update_window(&mut self, now: Instant) {
133        let current_bin_index = self.get_bin_index(now);
134        if self.global_bin_index >= current_bin_index {
135            // The rolling doesn't move.
136            return;
137        }
138
139        for bin_index in (self.global_bin_index + 1)..=current_bin_index {
140            // Time has elapsed from global_bin_index to current_bin_index. Clear all the buffer
141            // counter associated with them.
142            let index_in_buffer = bin_index as usize % self.total_bins;
143            self.event_buffer[index_in_buffer] = 0;
144        }
145        self.global_bin_index = current_bin_index;
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    use rand::Rng;
154    use rand::SeedableRng;
155    use rand::rngs::StdRng;
156    use tokio::time::advance;
157
158    #[tokio::test(flavor = "current_thread", start_paused = true)]
159    pub async fn test_rate_tracker_basic() {
160        // 1 sec rolling window.
161        let mut tracker = RateTracker::new(Duration::from_secs(1));
162        assert_eq!(tracker.rate(), 0.0);
163        tracker.record();
164        tracker.record();
165        tracker.record();
166        assert_eq!(tracker.rate(), 3.0);
167
168        advance(Duration::from_millis(200)).await;
169        tracker.record();
170        tracker.record();
171        tracker.record();
172        assert_eq!(tracker.rate(), 6.0);
173
174        advance(Duration::from_millis(800)).await;
175        assert_eq!(tracker.rate(), 3.0);
176
177        advance(Duration::from_millis(200)).await;
178        assert_eq!(tracker.rate(), 0.0);
179    }
180
181    // Tests rate calculation using different window duration.
182    #[tokio::test(flavor = "current_thread", start_paused = true)]
183    pub async fn test_rate_tracker_window() {
184        let seed = [0; 32];
185        let mut rng = StdRng::from_seed(seed);
186        let random_windows: Vec<u64> = (0..10).map(|_| rng.gen_range(1..=60)).collect();
187        for window in random_windows {
188            let mut tracker = RateTracker::new(Duration::from_secs(window));
189            for _ in 0..23 {
190                tracker.record();
191            }
192            assert_eq!(tracker.rate(), 23.0 / window as f64);
193            advance(Duration::from_secs(window)).await;
194            assert_eq!(tracker.rate(), 0.0);
195        }
196    }
197
198    // Tests rate calculation when window moves continuously.
199    #[tokio::test(flavor = "current_thread", start_paused = true)]
200    pub async fn test_rate_tracker_rolling_window() {
201        let mut tracker = RateTracker::new(Duration::from_secs(1));
202        // Generate event every 100ms.
203        for i in 0..10 {
204            tracker.record();
205            assert_eq!(tracker.rate(), (i + 1) as f64);
206            advance(Duration::from_millis(100)).await;
207        }
208
209        // Generate event every 50ms.
210        for i in 0..10 {
211            tracker.record();
212            advance(Duration::from_millis(50)).await;
213            tracker.record();
214            assert_eq!(tracker.rate(), 11.0 + i as f64);
215            advance(Duration::from_millis(50)).await;
216        }
217
218        // Rate gradually returns to 0.
219        for i in 0..10 {
220            assert_eq!(tracker.rate(), 20.0 - (i as f64 + 1.0) * 2.0);
221            advance(Duration::from_millis(100)).await;
222        }
223        assert_eq!(tracker.rate(), 0.0);
224    }
225
226    // Tests that events happened prior to tracking window shouldn't affect the rate.
227    #[tokio::test(flavor = "current_thread", start_paused = true)]
228    pub async fn test_rate_tracker_outside_of_window() {
229        let mut tracker = RateTracker::new(Duration::from_secs(1));
230        advance(Duration::from_secs(60)).await;
231        tracker.record();
232        tracker.record();
233        tracker.record();
234        assert_eq!(tracker.rate(), 3.0);
235        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
236        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
237        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
238        assert_eq!(tracker.rate(), 3.0);
239    }
240}