1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use parking_lot::Mutex;
use std::collections::VecDeque;
use std::default::Default;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use tokio::time::Duration;
use tokio::time::Instant;

pub struct LatencyObserver {
    data: Mutex<LatencyObserverInner>,
    latency_ms: AtomicU64,
}

#[derive(Default)]
struct LatencyObserverInner {
    points: VecDeque<Duration>,
    sum: Duration,
}

impl LatencyObserver {
    pub fn new() -> Self {
        Self {
            data: Mutex::new(LatencyObserverInner::default()),
            latency_ms: AtomicU64::new(u64::MAX),
        }
    }

    pub fn report(&self, latency: Duration) {
        const MAX_SAMPLES: usize = 64;
        let mut data = self.data.lock();
        data.points.push_back(latency);
        data.sum += latency;
        if data.points.len() >= MAX_SAMPLES {
            let pop = data.points.pop_front().expect("data vector is not empty");
            data.sum -= pop; // This does not overflow because of how running sum is calculated
        }
        let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
        self.latency_ms.store(latency, Ordering::Relaxed);
    }

    pub fn latency(&self) -> Option<Duration> {
        let latency = self.latency_ms.load(Ordering::Relaxed);
        if latency == u64::MAX {
            // Not initialized yet (0 data points)
            None
        } else {
            Some(Duration::from_millis(latency))
        }
    }
}

impl Default for LatencyObserver {
    fn default() -> Self {
        Self::new()
    }
}

/// RateTracker tracks events in a rolling window, and calculates the rate of events.
/// Internally, the tracker divides the tracking window into multiple BIN_DURATION,
/// and counts events in each BIN_DURATION in a fixed sized buffer.
pub struct RateTracker {
    // Counts the number of events by bins. Each bin is BIN_DURATION long within window_duration.
    // The size of the buffer = window_duration / BIN_DURATION.
    event_buffer: Vec<u64>,
    window_duration: Duration,
    total_bins: usize,

    // We use the event time and the tracker start time to calculate the bin that a event
    // belongs to.
    // event_global_bin_index = (event_time - start_time) / BIN_DURATION.
    // event_index_in_buffer = event_global_bin_index % buffer_size.
    start_time: Instant,

    // Last updated global bin index. This tracks the end of the rolling window.
    global_bin_index: u64,
}

const BIN_DURATION: Duration = Duration::from_millis(100);

impl RateTracker {
    /// Create a new RateTracker to track event rate (events/seconds) in `window_duration`.
    pub fn new(window_duration: Duration) -> Self {
        assert!(window_duration > BIN_DURATION);
        let total_bins = (window_duration.as_millis() / BIN_DURATION.as_millis()) as usize;
        RateTracker {
            event_buffer: vec![0; total_bins],
            window_duration,
            total_bins,
            start_time: Instant::now(),
            global_bin_index: 0,
        }
    }

    /// Records an event at time `now`.
    pub fn record_at_time(&mut self, now: Instant) {
        self.update_window(now);
        let current_bin_index = self.get_bin_index(now) as usize;
        if current_bin_index + self.total_bins <= self.global_bin_index as usize {
            // The bin associated with `now` has passed the rolling window.
            return;
        }

        self.event_buffer[current_bin_index % self.total_bins] += 1;
    }

    /// Records an event at current time.
    pub fn record(&mut self) {
        self.record_at_time(Instant::now());
    }

    /// Returns the rate of events.
    pub fn rate(&mut self) -> f64 {
        let now = Instant::now();
        self.update_window(now);
        self.event_buffer.iter().sum::<u64>() as f64 / self.window_duration.as_secs_f64()
    }

    // Given a time `now`, returns the bin index since `start_time`.
    fn get_bin_index(&self, now: Instant) -> u64 {
        (now.duration_since(self.start_time).as_millis() / BIN_DURATION.as_millis()) as u64
    }

    // Updates the rolling window to accommodate the time of interests, `now`. That is, remove any
    // event counts happened prior to (`now` - `window_duration`).
    fn update_window(&mut self, now: Instant) {
        let current_bin_index = self.get_bin_index(now);
        if self.global_bin_index >= current_bin_index {
            // The rolling doesn't move.
            return;
        }

        for bin_index in (self.global_bin_index + 1)..=current_bin_index {
            // Time has elapsed from global_bin_index to current_bin_index. Clear all the buffer
            // counter associated with them.
            let index_in_buffer = bin_index as usize % self.total_bins;
            self.event_buffer[index_in_buffer] = 0;
        }
        self.global_bin_index = current_bin_index;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use rand::rngs::StdRng;
    use rand::Rng;
    use rand::SeedableRng;
    use tokio::time::advance;

    #[tokio::test(flavor = "current_thread", start_paused = true)]
    pub async fn test_rate_tracker_basic() {
        // 1 sec rolling window.
        let mut tracker = RateTracker::new(Duration::from_secs(1));
        assert_eq!(tracker.rate(), 0.0);
        tracker.record();
        tracker.record();
        tracker.record();
        assert_eq!(tracker.rate(), 3.0);

        advance(Duration::from_millis(200)).await;
        tracker.record();
        tracker.record();
        tracker.record();
        assert_eq!(tracker.rate(), 6.0);

        advance(Duration::from_millis(800)).await;
        assert_eq!(tracker.rate(), 3.0);

        advance(Duration::from_millis(200)).await;
        assert_eq!(tracker.rate(), 0.0);
    }

    // Tests rate calculation using different window duration.
    #[tokio::test(flavor = "current_thread", start_paused = true)]
    pub async fn test_rate_tracker_window() {
        let seed = [0; 32];
        let mut rng = StdRng::from_seed(seed);
        let random_windows: Vec<u64> = (0..10).map(|_| rng.gen_range(1..=60)).collect();
        for window in random_windows {
            let mut tracker = RateTracker::new(Duration::from_secs(window));
            for _ in 0..23 {
                tracker.record();
            }
            assert_eq!(tracker.rate(), 23.0 / window as f64);
            advance(Duration::from_secs(window)).await;
            assert_eq!(tracker.rate(), 0.0);
        }
    }

    // Tests rate calculation when window moves continuously.
    #[tokio::test(flavor = "current_thread", start_paused = true)]
    pub async fn test_rate_tracker_rolling_window() {
        let mut tracker = RateTracker::new(Duration::from_secs(1));
        // Generate event every 100ms.
        for i in 0..10 {
            tracker.record();
            assert_eq!(tracker.rate(), (i + 1) as f64);
            advance(Duration::from_millis(100)).await;
        }

        // Generate event every 50ms.
        for i in 0..10 {
            tracker.record();
            advance(Duration::from_millis(50)).await;
            tracker.record();
            assert_eq!(tracker.rate(), 11.0 + i as f64);
            advance(Duration::from_millis(50)).await;
        }

        // Rate gradually returns to 0.
        for i in 0..10 {
            assert_eq!(tracker.rate(), 20.0 - (i as f64 + 1.0) * 2.0);
            advance(Duration::from_millis(100)).await;
        }
        assert_eq!(tracker.rate(), 0.0);
    }

    // Tests that events happened prior to tracking window shouldn't affect the rate.
    #[tokio::test(flavor = "current_thread", start_paused = true)]
    pub async fn test_rate_tracker_outside_of_window() {
        let mut tracker = RateTracker::new(Duration::from_secs(1));
        advance(Duration::from_secs(60)).await;
        tracker.record();
        tracker.record();
        tracker.record();
        assert_eq!(tracker.rate(), 3.0);
        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
        assert_eq!(tracker.rate(), 3.0);
    }
}