1use parking_lot::Mutex;
5use std::collections::VecDeque;
6use std::default::Default;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use tokio::time::Duration;
10use tokio::time::Instant;
11
12pub struct LatencyObserver {
13 data: Mutex<LatencyObserverInner>,
14 latency_ms: AtomicU64,
15}
16
17#[derive(Default)]
18struct LatencyObserverInner {
19 points: VecDeque<Duration>,
20 sum: Duration,
21}
22
23impl LatencyObserver {
24 pub fn new() -> Self {
25 Self {
26 data: Mutex::new(LatencyObserverInner::default()),
27 latency_ms: AtomicU64::new(u64::MAX),
28 }
29 }
30
31 pub fn report(&self, latency: Duration) {
32 const EXPECTED_SAMPLES: usize = 128;
33 let mut data = self.data.lock();
34 data.points.push_back(latency);
35 data.sum += latency;
36 if data.points.len() < EXPECTED_SAMPLES {
37 return;
39 }
40 while data.points.len() > EXPECTED_SAMPLES {
41 let pop = data.points.pop_front().expect("data vector is not empty");
42 data.sum -= pop; }
44 let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
45 self.latency_ms.store(latency, Ordering::Relaxed);
46 }
47
48 pub fn latency(&self) -> Option<Duration> {
49 let latency = self.latency_ms.load(Ordering::Relaxed);
50 if latency == u64::MAX {
51 None
53 } else {
54 Some(Duration::from_millis(latency))
55 }
56 }
57}
58
59impl Default for LatencyObserver {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65pub struct RateTracker {
69 event_buffer: Vec<u64>,
72 window_duration: Duration,
73 total_bins: usize,
74
75 start_time: Instant,
80
81 global_bin_index: u64,
83}
84
85const BIN_DURATION: Duration = Duration::from_millis(100);
86
87impl RateTracker {
88 pub fn new(window_duration: Duration) -> Self {
90 assert!(window_duration > BIN_DURATION);
91 let total_bins = (window_duration.as_millis() / BIN_DURATION.as_millis()) as usize;
92 RateTracker {
93 event_buffer: vec![0; total_bins],
94 window_duration,
95 total_bins,
96 start_time: Instant::now(),
97 global_bin_index: 0,
98 }
99 }
100
101 pub fn record_at_time(&mut self, now: Instant) {
103 self.update_window(now);
104 let current_bin_index = self.get_bin_index(now) as usize;
105 if current_bin_index + self.total_bins <= self.global_bin_index as usize {
106 return;
108 }
109
110 self.event_buffer[current_bin_index % self.total_bins] += 1;
111 }
112
113 pub fn record(&mut self) {
115 self.record_at_time(Instant::now());
116 }
117
118 pub fn rate(&mut self) -> f64 {
120 let now = Instant::now();
121 self.update_window(now);
122 self.event_buffer.iter().sum::<u64>() as f64 / self.window_duration.as_secs_f64()
123 }
124
125 fn get_bin_index(&self, now: Instant) -> u64 {
127 (now.duration_since(self.start_time).as_millis() / BIN_DURATION.as_millis()) as u64
128 }
129
130 fn update_window(&mut self, now: Instant) {
133 let current_bin_index = self.get_bin_index(now);
134 if self.global_bin_index >= current_bin_index {
135 return;
137 }
138
139 for bin_index in (self.global_bin_index + 1)..=current_bin_index {
140 let index_in_buffer = bin_index as usize % self.total_bins;
143 self.event_buffer[index_in_buffer] = 0;
144 }
145 self.global_bin_index = current_bin_index;
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 use rand::Rng;
154 use rand::SeedableRng;
155 use rand::rngs::StdRng;
156 use tokio::time::advance;
157
158 #[tokio::test(flavor = "current_thread", start_paused = true)]
159 pub async fn test_rate_tracker_basic() {
160 let mut tracker = RateTracker::new(Duration::from_secs(1));
162 assert_eq!(tracker.rate(), 0.0);
163 tracker.record();
164 tracker.record();
165 tracker.record();
166 assert_eq!(tracker.rate(), 3.0);
167
168 advance(Duration::from_millis(200)).await;
169 tracker.record();
170 tracker.record();
171 tracker.record();
172 assert_eq!(tracker.rate(), 6.0);
173
174 advance(Duration::from_millis(800)).await;
175 assert_eq!(tracker.rate(), 3.0);
176
177 advance(Duration::from_millis(200)).await;
178 assert_eq!(tracker.rate(), 0.0);
179 }
180
181 #[tokio::test(flavor = "current_thread", start_paused = true)]
183 pub async fn test_rate_tracker_window() {
184 let seed = [0; 32];
185 let mut rng = StdRng::from_seed(seed);
186 let random_windows: Vec<u64> = (0..10).map(|_| rng.gen_range(1..=60)).collect();
187 for window in random_windows {
188 let mut tracker = RateTracker::new(Duration::from_secs(window));
189 for _ in 0..23 {
190 tracker.record();
191 }
192 assert_eq!(tracker.rate(), 23.0 / window as f64);
193 advance(Duration::from_secs(window)).await;
194 assert_eq!(tracker.rate(), 0.0);
195 }
196 }
197
198 #[tokio::test(flavor = "current_thread", start_paused = true)]
200 pub async fn test_rate_tracker_rolling_window() {
201 let mut tracker = RateTracker::new(Duration::from_secs(1));
202 for i in 0..10 {
204 tracker.record();
205 assert_eq!(tracker.rate(), (i + 1) as f64);
206 advance(Duration::from_millis(100)).await;
207 }
208
209 for i in 0..10 {
211 tracker.record();
212 advance(Duration::from_millis(50)).await;
213 tracker.record();
214 assert_eq!(tracker.rate(), 11.0 + i as f64);
215 advance(Duration::from_millis(50)).await;
216 }
217
218 for i in 0..10 {
220 assert_eq!(tracker.rate(), 20.0 - (i as f64 + 1.0) * 2.0);
221 advance(Duration::from_millis(100)).await;
222 }
223 assert_eq!(tracker.rate(), 0.0);
224 }
225
226 #[tokio::test(flavor = "current_thread", start_paused = true)]
228 pub async fn test_rate_tracker_outside_of_window() {
229 let mut tracker = RateTracker::new(Duration::from_secs(1));
230 advance(Duration::from_secs(60)).await;
231 tracker.record();
232 tracker.record();
233 tracker.record();
234 assert_eq!(tracker.rate(), 3.0);
235 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
236 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
237 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
238 assert_eq!(tracker.rate(), 3.0);
239 }
240}