sui_aws_orchestrator/
measurement.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    fs,
7    io::BufRead,
8    path::{Path, PathBuf},
9    time::Duration,
10};
11
12use prettytable::{Table, row};
13use prometheus_parse::Scrape;
14use serde::{Deserialize, Serialize};
15
16use crate::{
17    benchmark::{BenchmarkParameters, BenchmarkType},
18    display,
19    protocol::ProtocolMetrics,
20    settings::Settings,
21};
22
23/// The identifier of prometheus latency buckets.
24type BucketId = String;
25
26/// A snapshot measurement at a given time.
27#[derive(Serialize, Deserialize, Default, Clone)]
28pub struct Measurement {
29    /// Duration since the beginning of the benchmark.
30    timestamp: Duration,
31    /// Latency buckets.
32    buckets: HashMap<BucketId, usize>,
33    /// Sum of the latencies of all finalized transactions.
34    sum: Duration,
35    /// Total number of finalized transactions
36    count: usize,
37    /// Square of the latencies of all finalized transactions.
38    squared_sum: Duration,
39}
40
41impl Measurement {
42    // Make a new measurement from the text exposed by prometheus.
43    pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
44        let br = std::io::BufReader::new(text.as_bytes());
45        let parsed = Scrape::parse(br.lines()).unwrap();
46
47        let buckets: HashMap<_, _> = parsed
48            .samples
49            .iter()
50            .find(|x| x.metric == M::LATENCY_BUCKETS)
51            .map(|x| match &x.value {
52                prometheus_parse::Value::Histogram(values) => values
53                    .iter()
54                    .map(|x| {
55                        let bucket_id = x.less_than.to_string();
56                        let count = x.count as usize;
57                        (bucket_id, count)
58                    })
59                    .collect(),
60                _ => panic!("Unexpected scraped value"),
61            })
62            .unwrap_or_default();
63
64        let sum = parsed
65            .samples
66            .iter()
67            .find(|x| x.metric == M::LATENCY_SUM)
68            .map(|x| match x.value {
69                prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
70                _ => panic!("Unexpected scraped value"),
71            })
72            .unwrap_or_default();
73
74        let count = parsed
75            .samples
76            .iter()
77            .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
78            .map(|x| match x.value {
79                prometheus_parse::Value::Untyped(value) => value as usize,
80                _ => panic!("Unexpected scraped value"),
81            })
82            .unwrap_or_default();
83
84        let squared_sum = parsed
85            .samples
86            .iter()
87            .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
88            .map(|x| match x.value {
89                prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
90                _ => panic!("Unexpected scraped value"),
91            })
92            .unwrap_or_default();
93
94        let timestamp = parsed
95            .samples
96            .iter()
97            .find(|x| x.metric == M::BENCHMARK_DURATION)
98            .map(|x| match x.value {
99                prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
100                _ => panic!("Unexpected scraped value"),
101            })
102            .unwrap_or_default();
103
104        Self {
105            timestamp,
106            buckets,
107            sum,
108            count,
109            squared_sum,
110        }
111    }
112
113    /// Compute the tps.
114    /// NOTE: Do not use `self.timestamp` as benchmark duration because some clients may
115    /// be unable to submit transactions passed the first few seconds of the benchmark. This
116    /// may happen as a result of a bad control system withing the nodes.
117    pub fn tps(&self, duration: &Duration) -> u64 {
118        let tps = self.count.checked_div(duration.as_secs() as usize);
119        tps.unwrap_or_default() as u64
120    }
121
122    /// Compute the average latency.
123    pub fn average_latency(&self) -> Duration {
124        self.sum.checked_div(self.count as u32).unwrap_or_default()
125    }
126
127    /// Compute the standard deviation from the sum of squared latencies:
128    /// `stdev = sqrt( squared_sum / count - avg^2 )`
129    pub fn stdev_latency(&self) -> Duration {
130        // Compute `squared_sum / count`.
131        let first_term = if self.count == 0 {
132            0.0
133        } else {
134            self.squared_sum.as_secs_f64() / self.count as f64
135        };
136
137        // Compute `avg^2`.
138        let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
139
140        // Compute `squared_sum / count - avg^2`.
141        let variance = if squared_avg > first_term {
142            0.0
143        } else {
144            first_term - squared_avg
145        };
146
147        // Compute `sqrt( squared_sum / count - avg^2 )`.
148        let stdev = variance.sqrt();
149        Duration::from_secs_f64(stdev)
150    }
151
152    #[cfg(test)]
153    pub fn new_for_test() -> Self {
154        Self {
155            timestamp: Duration::from_secs(30),
156            buckets: HashMap::new(),
157            sum: Duration::from_secs(1265),
158            count: 1860,
159            squared_sum: Duration::from_secs(952),
160        }
161    }
162}
163
164/// The identifier of the scrapers collecting the prometheus metrics.
165type ScraperId = usize;
166
167#[derive(Serialize, Deserialize, Clone)]
168pub struct MeasurementsCollection<T> {
169    /// The machine / instance type.
170    pub machine_specs: String,
171    /// The commit of the codebase.
172    pub commit: String,
173    /// The benchmark parameters of the current run.
174    pub parameters: BenchmarkParameters<T>,
175    /// The data collected by each scraper.
176    pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
177}
178
179impl<T: BenchmarkType> MeasurementsCollection<T> {
180    /// Create a new (empty) collection of measurements.
181    pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
182        Self {
183            machine_specs: settings.specs.clone(),
184            commit: settings.repository.commit.clone(),
185            parameters,
186            scrapers: HashMap::new(),
187        }
188    }
189
190    /// Load a collection of measurement from a json file.
191    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
192        let data = fs::read(path)?;
193        let measurements: Self = serde_json::from_slice(data.as_slice())?;
194        Ok(measurements)
195    }
196
197    /// Add a new measurement to the collection.
198    pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
199        self.scrapers
200            .entry(scraper_id)
201            .or_default()
202            .push(measurement);
203    }
204
205    /// Return the transaction (input) load of the benchmark.
206    pub fn transaction_load(&self) -> usize {
207        self.parameters.load
208    }
209
210    /// Aggregate the benchmark duration of multiple data points by taking the max.
211    pub fn benchmark_duration(&self) -> Duration {
212        self.scrapers
213            .values()
214            .filter_map(|x| x.last())
215            .map(|x| x.timestamp)
216            .max()
217            .unwrap_or_default()
218    }
219
220    /// Aggregate the tps of multiple data points by taking the sum.
221    pub fn aggregate_tps(&self) -> u64 {
222        let duration = self
223            .scrapers
224            .values()
225            .filter_map(|x| x.last())
226            .map(|x| x.timestamp)
227            .max()
228            .unwrap_or_default();
229        self.scrapers
230            .values()
231            .filter_map(|x| x.last())
232            .map(|x| x.tps(&duration))
233            .sum()
234    }
235
236    /// Aggregate the average latency of multiple data points by taking the average.
237    pub fn aggregate_average_latency(&self) -> Duration {
238        let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
239        last_data_points
240            .iter()
241            .map(|x| x.average_latency())
242            .sum::<Duration>()
243            .checked_div(last_data_points.len() as u32)
244            .unwrap_or_default()
245    }
246
247    /// Aggregate the stdev latency of multiple data points by taking the max.
248    pub fn aggregate_stdev_latency(&self) -> Duration {
249        self.scrapers
250            .values()
251            .filter_map(|x| x.last())
252            .map(|x| x.stdev_latency())
253            .max()
254            .unwrap_or_default()
255    }
256
257    /// Save the collection of measurements as a json file.
258    pub fn save<P: AsRef<Path>>(&self, path: P) {
259        let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
260        let mut file = PathBuf::from(path.as_ref());
261        file.push(format!("measurements-{:?}.json", self.parameters));
262        fs::write(file, json).unwrap();
263    }
264
265    /// Display a summary of the measurements.
266    pub fn display_summary(&self) {
267        let duration = self.benchmark_duration();
268        let total_tps = self.aggregate_tps();
269        let average_latency = self.aggregate_average_latency();
270        let stdev_latency = self.aggregate_stdev_latency();
271
272        let mut table = Table::new();
273        table.set_format(display::default_table_format());
274
275        table.set_titles(row![bH2->"Benchmark Summary"]);
276        table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
277        table.add_row(row![bH2->""]);
278        table.add_row(row![b->"Nodes:", self.parameters.nodes]);
279        table.add_row(row![b->"Faults:", self.parameters.faults]);
280        table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
281        table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
282        table.add_row(row![bH2->""]);
283        table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
284        table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
285        table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
286
287        display::newline();
288        table.printstd();
289        display::newline();
290    }
291}
292
293#[cfg(test)]
294mod test {
295    use std::{collections::HashMap, time::Duration};
296
297    use crate::{
298        benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
299        settings::Settings,
300    };
301
302    use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
303
304    #[test]
305    fn average_latency() {
306        let data = Measurement {
307            timestamp: Duration::from_secs(10),
308            buckets: HashMap::new(),
309            sum: Duration::from_secs(2),
310            count: 100,
311            squared_sum: Duration::from_secs(0),
312        };
313
314        assert_eq!(data.average_latency(), Duration::from_millis(20));
315    }
316
317    #[test]
318    fn stdev_latency() {
319        let data = Measurement {
320            timestamp: Duration::from_secs(10),
321            buckets: HashMap::new(),
322            sum: Duration::from_secs(50),
323            count: 100,
324            squared_sum: Duration::from_secs(75),
325        };
326
327        // squared_sum / count
328        assert_eq!(
329            data.squared_sum.checked_div(data.count as u32),
330            Some(Duration::from_secs_f64(0.75))
331        );
332        // avg^2
333        assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
334        // sqrt( squared_sum / count - avg^2 )
335        let stdev = data.stdev_latency();
336        assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
337    }
338
339    #[test]
340    fn prometheus_parse() {
341        let report = r#"
342            # HELP benchmark_duration Duration of the benchmark
343            # TYPE benchmark_duration gauge
344            benchmark_duration 30
345            # HELP latency_s Total time in seconds to return a response
346            # TYPE latency_s histogram
347            latency_s_bucket{workload=transfer_object,le=0.1} 0
348            latency_s_bucket{workload=transfer_object,le=0.25} 0
349            latency_s_bucket{workload=transfer_object,le=0.5} 506
350            latency_s_bucket{workload=transfer_object,le=0.75} 1282
351            latency_s_bucket{workload=transfer_object,le=1} 1693
352            latency_s_bucket{workload="transfer_object",le="1.25"} 1816
353            latency_s_bucket{workload="transfer_object",le="1.5"} 1860
354            latency_s_bucket{workload="transfer_object",le="1.75"} 1860
355            latency_s_bucket{workload="transfer_object",le="2"} 1860
356            latency_s_bucket{workload=transfer_object,le=2.5} 1860
357            latency_s_bucket{workload=transfer_object,le=5} 1860
358            latency_s_bucket{workload=transfer_object,le=10} 1860
359            latency_s_bucket{workload=transfer_object,le=20} 1860
360            latency_s_bucket{workload=transfer_object,le=30} 1860
361            latency_s_bucket{workload=transfer_object,le=60} 1860
362            latency_s_bucket{workload=transfer_object,le=90} 1860
363            latency_s_bucket{workload=transfer_object,le=+Inf} 1860
364            latency_s_sum{workload=transfer_object} 1265.287933130998
365            latency_s_count{workload=transfer_object} 1860
366            # HELP latency_squared_s Square of total time in seconds to return a response
367            # TYPE latency_squared_s counter
368            latency_squared_s{workload="transfer_object"} 952.8160642745289
369        "#;
370
371        let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
372        let settings = Settings::new_for_test();
373        let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
374            &settings,
375            BenchmarkParameters::default(),
376        );
377        let scraper_id = 1;
378        aggregator.add(scraper_id, measurement);
379
380        assert_eq!(aggregator.scrapers.len(), 1);
381        let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
382        assert_eq!(data_points.len(), 1);
383
384        let data = &data_points[0];
385        assert_eq!(
386            data.buckets,
387            ([
388                ("0.1".into(), 0),
389                ("0.25".into(), 0),
390                ("0.5".into(), 506),
391                ("0.75".into(), 1282),
392                ("1".into(), 1693),
393                ("1.25".into(), 1816),
394                ("1.5".into(), 1860),
395                ("1.75".into(), 1860),
396                ("2".into(), 1860),
397                ("2.5".into(), 1860),
398                ("5".into(), 1860),
399                ("10".into(), 1860),
400                ("20".into(), 1860),
401                ("30".into(), 1860),
402                ("60".into(), 1860),
403                ("90".into(), 1860),
404                ("inf".into(), 1860)
405            ])
406            .iter()
407            .cloned()
408            .collect()
409        );
410        assert_eq!(data.sum.as_secs(), 1265);
411        assert_eq!(data.count, 1860);
412        assert_eq!(data.timestamp.as_secs(), 30);
413        assert_eq!(data.squared_sum.as_secs(), 952);
414    }
415}