mysten_metrics/
histogram.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::monitored_scope;
5use futures::FutureExt;
6use parking_lot::Mutex;
7use prometheus::{
8    IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
9    register_int_gauge_vec_with_registry,
10};
11use std::collections::hash_map::DefaultHasher;
12use std::collections::{HashMap, HashSet};
13use std::hash::{Hash, Hasher};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::runtime::Handle;
17use tokio::sync::mpsc;
18use tokio::sync::mpsc::error::TrySendError;
19use tokio::time::Instant;
20use tracing::{debug, error};
21
22type Point = u64;
23type HistogramMessage = (HistogramLabels, Point);
24
25#[derive(Clone)]
26pub struct Histogram {
27    labels: HistogramLabels,
28    channel: mpsc::Sender<HistogramMessage>,
29}
30
31pub struct HistogramTimerGuard<'a> {
32    histogram: &'a Histogram,
33    start: Instant,
34}
35
36#[derive(Clone)]
37pub struct HistogramVec {
38    channel: mpsc::Sender<HistogramMessage>,
39}
40
41struct HistogramCollector {
42    reporter: Arc<Mutex<HistogramReporter>>,
43    channel: mpsc::Receiver<HistogramMessage>,
44    _name: String,
45}
46
47struct HistogramReporter {
48    gauge: IntGaugeVec,
49    sum: IntCounterVec,
50    count: IntCounterVec,
51    known_labels: HashSet<HistogramLabels>,
52    percentiles: Vec<usize>,
53}
54
55type HistogramLabels = Arc<HistogramLabelsInner>;
56
57struct HistogramLabelsInner {
58    labels: Vec<String>,
59    hash: u64,
60}
61
62/// Reports the histogram to the given prometheus gauge.
63/// Unlike the histogram from prometheus crate, this histogram does not require to specify buckets
64/// It works by calculating 'true' histogram by aggregating and sorting values.
65///
66/// The values are reported into prometheus gauge with requested labels and additional dimension
67/// for the histogram percentile.
68///
69/// It worth pointing out that due to those more precise calculations, this Histogram usage
70/// is somewhat more limited comparing to original prometheus Histogram.
71///
72/// On the bright side, this histogram exports less data to Prometheus comparing to prometheus::Histogram,
73/// it exports each requested percentile into separate prometheus gauge, while original implementation creates
74/// gauge per bucket.
75/// It also exports _sum and _count aggregates same as original implementation.
76///
77/// It is ok to measure timings for things like network latencies and expensive crypto operations.
78/// However as a rule of thumb this histogram should not be used in places that can produce very high data point count.
79///
80/// As a last round of defence this histogram emits error log when too much data is flowing in and drops data points.
81///
82/// This implementation puts great deal of effort to make sure the metric does not cause any harm to the code itself:
83/// * Reporting data point is a non-blocking send to a channel
84/// * Data point collections tries to clear the channel as fast as possible
85/// * Expensive histogram calculations are done in a separate blocking tokio thread pool to avoid effects on main scheduler
86/// * If histogram data is produced too fast, the data is dropped and error! log is emitted
87impl HistogramVec {
88    pub fn new_in_registry(name: &str, desc: &str, labels: &[&str], registry: &Registry) -> Self {
89        Self::new_in_registry_with_percentiles(
90            name,
91            desc,
92            labels,
93            registry,
94            vec![500usize, 950, 990],
95        )
96    }
97
98    /// Allows to specify percentiles in 1/1000th, e.g. 90pct is specified as 900
99    pub fn new_in_registry_with_percentiles(
100        name: &str,
101        desc: &str,
102        labels: &[&str],
103        registry: &Registry,
104        percentiles: Vec<usize>,
105    ) -> Self {
106        let sum_name = format!("{}_sum", name);
107        let count_name = format!("{}_count", name);
108        let sum =
109            register_int_counter_vec_with_registry!(sum_name, desc, labels, registry).unwrap();
110        let count =
111            register_int_counter_vec_with_registry!(count_name, desc, labels, registry).unwrap();
112        let labels: Vec<_> = labels.iter().cloned().chain(["pct"]).collect();
113        let gauge = register_int_gauge_vec_with_registry!(name, desc, &labels, registry).unwrap();
114        Self::new(gauge, sum, count, percentiles, name)
115    }
116
117    // Do not expose it to public interface because we need labels to have a specific format (e.g. add last label is "pct")
118    fn new(
119        gauge: IntGaugeVec,
120        sum: IntCounterVec,
121        count: IntCounterVec,
122        percentiles: Vec<usize>,
123        name: &str,
124    ) -> Self {
125        let (sender, receiver) = mpsc::channel(1000);
126        let reporter = HistogramReporter {
127            gauge,
128            sum,
129            count,
130            percentiles,
131            known_labels: Default::default(),
132        };
133        let reporter = Arc::new(Mutex::new(reporter));
134        let collector = HistogramCollector {
135            reporter,
136            channel: receiver,
137            _name: name.to_string(),
138        };
139        Handle::current().spawn(collector.run());
140        Self { channel: sender }
141    }
142
143    pub fn with_label_values(&self, labels: &[&str]) -> Histogram {
144        let labels = labels.iter().map(ToString::to_string).collect();
145        let labels = HistogramLabelsInner::new(labels);
146        Histogram {
147            labels,
148            channel: self.channel.clone(),
149        }
150    }
151}
152
153impl HistogramLabelsInner {
154    pub fn new(labels: Vec<String>) -> HistogramLabels {
155        // Not a crypto hash
156        let mut hasher = DefaultHasher::new();
157        labels.hash(&mut hasher);
158        let hash = hasher.finish();
159        Arc::new(Self { labels, hash })
160    }
161}
162
163impl PartialEq for HistogramLabelsInner {
164    fn eq(&self, other: &Self) -> bool {
165        self.hash == other.hash
166    }
167}
168
169impl Eq for HistogramLabelsInner {}
170
171impl Hash for HistogramLabelsInner {
172    fn hash<H: Hasher>(&self, state: &mut H) {
173        self.hash.hash(state)
174    }
175}
176
177impl Histogram {
178    pub fn new_in_registry(name: &str, desc: &str, registry: &Registry) -> Self {
179        HistogramVec::new_in_registry(name, desc, &[], registry).with_label_values(&[])
180    }
181
182    pub fn observe(&self, v: Point) {
183        self.report(v)
184    }
185
186    pub fn report(&self, v: Point) {
187        match self.channel.try_send((self.labels.clone(), v)) {
188            Ok(()) => {}
189            Err(TrySendError::Closed(_)) => {
190                // can happen during runtime shutdown
191            }
192            Err(TrySendError::Full(_)) => debug!("Histogram channel is full, dropping data"),
193        }
194    }
195
196    pub fn start_timer(&self) -> HistogramTimerGuard<'_> {
197        HistogramTimerGuard {
198            histogram: self,
199            start: Instant::now(),
200        }
201    }
202}
203
204impl HistogramCollector {
205    pub async fn run(mut self) {
206        let mut deadline = Instant::now();
207        loop {
208            // We calculate deadline here instead of just using sleep inside cycle to avoid accumulating error
209            #[cfg(test)]
210            const HISTOGRAM_WINDOW_SEC: u64 = 1;
211            #[cfg(not(test))]
212            const HISTOGRAM_WINDOW_SEC: u64 = 60;
213            deadline += Duration::from_secs(HISTOGRAM_WINDOW_SEC);
214            if self.cycle(deadline).await.is_err() {
215                return;
216            }
217        }
218    }
219
220    async fn cycle(&mut self, deadline: Instant) -> Result<(), ()> {
221        let mut labeled_data: HashMap<HistogramLabels, Vec<Point>> = HashMap::new();
222        let mut count = 0usize;
223        let mut timeout = tokio::time::sleep_until(deadline).boxed();
224        const MAX_POINTS: usize = 500_000;
225        loop {
226            tokio::select! {
227                _ = &mut timeout => break,
228                point = self.channel.recv() => {
229                    count += 1;
230                    if count > MAX_POINTS {
231                        continue;
232                    }
233                    if let Some((label, point)) = point {
234                        let values = labeled_data.entry(label).or_default();
235                        values.push(point);
236                    } else {
237                        // Histogram no longer exists
238                        return Err(());
239                    }
240                },
241            }
242        }
243        if count > MAX_POINTS {
244            error!(
245                "Too many data points for histogram, dropping {} points",
246                count - MAX_POINTS
247            );
248        }
249        if Arc::strong_count(&self.reporter) != 1 {
250            #[cfg(not(debug_assertions))]
251            error!(
252                "Histogram data overflow - we receive histogram data for {} faster then can process. Some histogram data is dropped",
253                self._name
254            );
255        } else {
256            let reporter = self.reporter.clone();
257            Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
258        }
259        Ok(())
260    }
261}
262
263impl HistogramReporter {
264    pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
265        let _scope = monitored_scope("HistogramReporter::report");
266        let mut reset_labels = self.known_labels.clone();
267        for (label, mut data) in labeled_data {
268            self.known_labels.insert(label.clone());
269            reset_labels.remove(&label);
270            assert!(!data.is_empty());
271            data.sort_unstable();
272            for pct1000 in self.percentiles.iter() {
273                let index = Self::pct1000_index(data.len(), *pct1000);
274                let point = *data.get(index).unwrap();
275                let pct_str = Self::format_pct1000(*pct1000);
276                let labels = Self::gauge_labels(&label, &pct_str);
277                let metric = self.gauge.with_label_values(&labels);
278                metric.set(point as i64);
279            }
280            let mut sum = 0u64;
281            let count = data.len() as u64;
282            for point in data {
283                sum += point;
284            }
285            let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
286            self.sum.with_label_values(&labels).inc_by(sum);
287            self.count.with_label_values(&labels).inc_by(count);
288        }
289
290        for reset_label in reset_labels {
291            for pct1000 in self.percentiles.iter() {
292                let pct_str = Self::format_pct1000(*pct1000);
293                let labels = Self::gauge_labels(&reset_label, &pct_str);
294                let metric = self.gauge.with_label_values(&labels);
295                metric.set(0);
296            }
297        }
298    }
299
300    fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
301        let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
302        labels.collect()
303    }
304
305    /// Returns value in range [0; len)
306    fn pct1000_index(len: usize, pct1000: usize) -> usize {
307        len * pct1000 / 1000
308    }
309
310    fn format_pct1000(pct1000: usize) -> String {
311        format!("{}", (pct1000 as f64) / 10.)
312    }
313}
314
315impl Drop for HistogramTimerGuard<'_> {
316    fn drop(&mut self) {
317        self.histogram
318            .report(self.start.elapsed().as_millis() as u64);
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use prometheus::proto::MetricFamily;
326
327    #[test]
328    fn pct_index_test() {
329        assert_eq!(200, HistogramReporter::pct1000_index(1000, 200));
330        assert_eq!(100, HistogramReporter::pct1000_index(500, 200));
331        assert_eq!(1800, HistogramReporter::pct1000_index(2000, 900));
332        // Boundary checks
333        assert_eq!(21, HistogramReporter::pct1000_index(22, 999));
334        assert_eq!(0, HistogramReporter::pct1000_index(1, 999));
335        assert_eq!(0, HistogramReporter::pct1000_index(1, 100));
336        assert_eq!(0, HistogramReporter::pct1000_index(1, 1));
337    }
338
339    #[test]
340    fn format_pct1000_test() {
341        assert_eq!(HistogramReporter::format_pct1000(999), "99.9");
342        assert_eq!(HistogramReporter::format_pct1000(990), "99");
343        assert_eq!(HistogramReporter::format_pct1000(900), "90");
344    }
345
346    #[tokio::test]
347    async fn histogram_test() {
348        let registry = Registry::new();
349        let histogram = HistogramVec::new_in_registry_with_percentiles(
350            "test",
351            "xx",
352            &["lab"],
353            &registry,
354            vec![500, 900],
355        );
356        let a = histogram.with_label_values(&["a"]);
357        let b = histogram.with_label_values(&["b"]);
358        a.report(1);
359        a.report(2);
360        a.report(3);
361        a.report(4);
362        b.report(10);
363        b.report(20);
364        b.report(30);
365        b.report(40);
366        tokio::time::sleep(Duration::from_millis(1500)).await;
367        let gather = registry.gather();
368        let gather: HashMap<_, _> = gather
369            .into_iter()
370            .map(|f| (f.get_name().to_string(), f))
371            .collect();
372        let hist = gather.get("test").unwrap();
373        let sum = gather.get("test_sum").unwrap();
374        let count = gather.get("test_count").unwrap();
375        let hist = aggregate_gauge_by_label(hist);
376        let sum = aggregate_counter_by_label(sum);
377        let count = aggregate_counter_by_label(count);
378        assert_eq!(Some(3.), hist.get("::a::50").cloned());
379        assert_eq!(Some(4.), hist.get("::a::90").cloned());
380        assert_eq!(Some(30.), hist.get("::b::50").cloned());
381        assert_eq!(Some(40.), hist.get("::b::90").cloned());
382
383        assert_eq!(Some(10.), sum.get("::a").cloned());
384        assert_eq!(Some(100.), sum.get("::b").cloned());
385
386        assert_eq!(Some(4.), count.get("::a").cloned());
387        assert_eq!(Some(4.), count.get("::b").cloned());
388    }
389
390    fn aggregate_gauge_by_label(family: &MetricFamily) -> HashMap<String, f64> {
391        family
392            .get_metric()
393            .iter()
394            .map(|m| {
395                let value = m.get_gauge().get_value();
396                let mut key = String::new();
397                for label in m.get_label() {
398                    key.push_str("::");
399                    key.push_str(label.get_value());
400                }
401                (key, value)
402            })
403            .collect()
404    }
405
406    fn aggregate_counter_by_label(family: &MetricFamily) -> HashMap<String, f64> {
407        family
408            .get_metric()
409            .iter()
410            .map(|m| {
411                let value = m.get_counter().get_value();
412                let mut key = String::new();
413                for label in m.get_label() {
414                    key.push_str("::");
415                    key.push_str(label.get_value());
416                }
417                (key, value)
418            })
419            .collect()
420    }
421}