1use crate::monitored_scope;
5use futures::FutureExt;
6use parking_lot::Mutex;
7use prometheus::{
8 IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
9 register_int_gauge_vec_with_registry,
10};
11use std::collections::hash_map::DefaultHasher;
12use std::collections::{HashMap, HashSet};
13use std::hash::{Hash, Hasher};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::runtime::Handle;
17use tokio::sync::mpsc;
18use tokio::sync::mpsc::error::TrySendError;
19use tokio::time::Instant;
20use tracing::{debug, error};
21
22type Point = u64;
23type HistogramMessage = (HistogramLabels, Point);
24
25#[derive(Clone)]
26pub struct Histogram {
27 labels: HistogramLabels,
28 channel: mpsc::Sender<HistogramMessage>,
29}
30
31pub struct HistogramTimerGuard<'a> {
32 histogram: &'a Histogram,
33 start: Instant,
34}
35
36#[derive(Clone)]
37pub struct HistogramVec {
38 channel: mpsc::Sender<HistogramMessage>,
39}
40
41struct HistogramCollector {
42 reporter: Arc<Mutex<HistogramReporter>>,
43 channel: mpsc::Receiver<HistogramMessage>,
44 _name: String,
45}
46
47struct HistogramReporter {
48 gauge: IntGaugeVec,
49 sum: IntCounterVec,
50 count: IntCounterVec,
51 known_labels: HashSet<HistogramLabels>,
52 percentiles: Vec<usize>,
53}
54
55type HistogramLabels = Arc<HistogramLabelsInner>;
56
57struct HistogramLabelsInner {
58 labels: Vec<String>,
59 hash: u64,
60}
61
62impl HistogramVec {
88 pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
89 Self::new_in_registry_with_percentiles(
90 name,
91 desc,
92 labels,
93 registry,
94 vec![500usize, 950, 990],
95 )
96 }
97
98 pub fn new_in_registry_with_percentiles(
100 name: &str,
101 desc: &str,
102 labels: &[&str],
103 registry: &Registry,
104 percentiles: Vec<usize>,
105 ) -> Self {
106 let sum_name = format!("{}_sum", name);
107 let count_name = format!("{}_count", name);
108 let sum =
109 register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
110 let count =
111 register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
112 let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
113 let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
114 Self::new(gauge, sum, count, percentiles, name)
115 }
116
117 fn new(
119 gauge: IntGaugeVec,
120 sum: IntCounterVec,
121 count: IntCounterVec,
122 percentiles: Vec<usize>,
123 name: &str,
124 ) -> Self {
125 let (sender, receiver) = mpsc::channel(1000);
126 let reporter = HistogramReporter {
127 gauge,
128 sum,
129 count,
130 percentiles,
131 known_labels: Default::default(),
132 };
133 let reporter = Arc::new(Mutex::new(reporter));
134 let collector = HistogramCollector {
135 reporter,
136 channel: receiver,
137 _name: name.to_string(),
138 };
139 Handle::current().spawn(collector.run());
140 Self { channel: sender }
141 }
142
143 pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
144 let labels = labels.iter().map(ToString::to_string).collect();
145 let labels = HistogramLabelsInner::new(labels);
146 Histogram {
147 labels,
148 channel: self.channel.clone(),
149 }
150 }
151}
152
153impl HistogramLabelsInner {
154 pub fn new(labels: Vec<String>) -> HistogramLabels {
155 let mut hasher = DefaultHasher::new();
157 labels.hash(&mut hasher);
158 let hash = hasher.finish();
159 Arc::new(Self { labels, hash })
160 }
161}
162
163impl PartialEq for HistogramLabelsInner {
164 fn eq(&self, other: &Self) -> bool {
165 self.hash == other.hash
166 }
167}
168
169impl Eq for HistogramLabelsInner {}
170
171impl Hash for HistogramLabelsInner {
172 fn hash<H: Hasher>(&self, state: &mut H) {
173 self.hash.hash(state)
174 }
175}
176
177impl Histogram {
178 pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
179 HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
180 }
181
182 pub fn observe(&self, v: Point) {
183 self.report(v)
184 }
185
186 pub fn report(&self, v: Point) {
187 match self.channel.try_send((self.labels.clone(), v)) {
188 Ok(()) => {}
189 Err(TrySendError::Closed(_)) => {
190 }
192 Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
193 }
194 }
195
196 pub fn start_timer(&self) -> HistogramTimerGuard<'_> {
197 HistogramTimerGuard {
198 histogram: self,
199 start: Instant::now(),
200 }
201 }
202}
203
204impl HistogramCollector {
205 pub async fn run(mut self) {
206 let mut deadline = Instant::now();
207 loop {
208 #[cfg(test)]
210 const HISTOGRAM_WINDOW_SEC: u64 = 1;
211 #[cfg(not(test))]
212 const HISTOGRAM_WINDOW_SEC: u64 = 60;
213 deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
214 if self.cycle(deadline).await.is_err() {
215 return;
216 }
217 }
218 }
219
220 async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
221 let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
222 let mut count = 0usize;
223 let mut timeout = tokio::time::sleep_until(deadline).boxed();
224 const MAX_POINTS: usize = 500_000;
225 loop {
226 tokio::select! {
227 _ = &mut timeout => break,
228 point = self.channel.recv() => {
229 count += 1;
230 if count > MAX_POINTS {
231 continue;
232 }
233 if let Some((label, point)) = point {
234 let values = labeled_data.entry(label).or_default();
235 values.push(point);
236 } else {
237 return Err(());
239 }
240 },
241 }
242 }
243 if count > MAX_POINTS {
244 error!(
245 "Too many data points for histogram, dropping {} points",
246 count - MAX_POINTS
247 );
248 }
249 if Arc::strong_count(&self.reporter) != 1 {
250 #[cfg(not(debug_assertions))]
251 error!(
252 "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
253 self._name
254 );
255 } else {
256 let reporter = self.reporter.clone();
257 Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
258 }
259 Ok(())
260 }
261}
262
263impl HistogramReporter {
264 pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
265 let _scope = monitored_scope("HistogramReporter::report");
266 let mut reset_labels = self.known_labels.clone();
267 for (label, mut data) in labeled_data {
268 self.known_labels.insert(label.clone());
269 reset_labels.remove(&label);
270 assert!(!data.is_empty());
271 data.sort_unstable();
272 for pct1000 in self.percentiles.iter() {
273 let index = Self::pct1000_index(data.len(), *pct1000);
274 let point = *data.get(index).unwrap();
275 let pct_str = Self::format_pct1000(*pct1000);
276 let labels = Self::gauge_labels(&label, &pct_str);
277 let metric = self.gauge.with_label_values(&labels);
278 metric.set(point as i64);
279 }
280 let mut sum = 0u64;
281 let count = data.len() as u64;
282 for point in data {
283 sum += point;
284 }
285 let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
286 self.sum.with_label_values(&labels).inc_by(sum);
287 self.count.with_label_values(&labels).inc_by(count);
288 }
289
290 for reset_label in reset_labels {
291 for pct1000 in self.percentiles.iter() {
292 let pct_str = Self::format_pct1000(*pct1000);
293 let labels = Self::gauge_labels(&reset_label, &pct_str);
294 let metric = self.gauge.with_label_values(&labels);
295 metric.set(0);
296 }
297 }
298 }
299
300 fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
301 let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
302 labels.collect()
303 }
304
305 fn pct1000_index(len: usize, pct1000: usize) -> usize {
307 len * pct1000 / 1000
308 }
309
310 fn format_pct1000(pct1000: usize) -> String {
311 format!("{}", (pct1000 as f64) / 10.)
312 }
313}
314
315impl Drop for HistogramTimerGuard<'_> {
316 fn drop(&mut self) {
317 self.histogram
318 .report(self.start.elapsed().as_millis() as u64);
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use prometheus::proto::MetricFamily;
326
327 #[test]
328 fn pct_index_test() {
329 assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
330 assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
331 assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
332 assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
334 assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
335 assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
336 assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
337 }
338
339 #[test]
340 fn format_pct1000_test() {
341 assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
342 assert_eq!(HistogramReporter::format_pct1000(990), "99");
343 assert_eq!(HistogramReporter::format_pct1000(900), "90");
344 }
345
346 #[tokio::test]
347 async fn histogram_test() {
348 let registry = Registry::new();
349 let histogram = HistogramVec::new_in_registry_with_percentiles(
350 "test",
351 "xx",
352 &["lab"],
353 ®istry,
354 vec![500, 900],
355 );
356 let a = histogram.with_label_values(&["a"]);
357 let b = histogram.with_label_values(&["b"]);
358 a.report(1);
359 a.report(2);
360 a.report(3);
361 a.report(4);
362 b.report(10);
363 b.report(20);
364 b.report(30);
365 b.report(40);
366 tokio::time::sleep(Duration::from_millis(1500)).await;
367 let gather = registry.gather();
368 let gather: HashMap<_, _> = gather
369 .into_iter()
370 .map(|f| (f.get_name().to_string(), f))
371 .collect();
372 let hist = gather.get("test").unwrap();
373 let sum = gather.get("test_sum").unwrap();
374 let count = gather.get("test_count").unwrap();
375 let hist = aggregate_gauge_by_label(hist);
376 let sum = aggregate_counter_by_label(sum);
377 let count = aggregate_counter_by_label(count);
378 assert_eq!(Some(3.), hist.get("::a::50").cloned());
379 assert_eq!(Some(4.), hist.get("::a::90").cloned());
380 assert_eq!(Some(30.), hist.get("::b::50").cloned());
381 assert_eq!(Some(40.), hist.get("::b::90").cloned());
382
383 assert_eq!(Some(10.), sum.get("::a").cloned());
384 assert_eq!(Some(100.), sum.get("::b").cloned());
385
386 assert_eq!(Some(4.), count.get("::a").cloned());
387 assert_eq!(Some(4.), count.get("::b").cloned());
388 }
389
390 fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
391 family
392 .get_metric()
393 .iter()
394 .map(|m| {
395 let value = m.get_gauge().get_value();
396 let mut key = String::new();
397 for label in m.get_label() {
398 key.push_str("::");
399 key.push_str(label.get_value());
400 }
401 (key, value)
402 })
403 .collect()
404 }
405
406 fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
407 family
408 .get_metric()
409 .iter()
410 .map(|m| {
411 let value = m.get_counter().get_value();
412 let mut key = String::new();
413 for label in m.get_label() {
414 key.push_str("::");
415 key.push_str(label.get_value());
416 }
417 (key, value)
418 })
419 .collect()
420 }
421}