1use std::{
5 collections::HashMap,
6 fs,
7 io::BufRead,
8 path::{Path, PathBuf},
9 time::Duration,
10};
11
12use prettytable::{Table, row};
13use prometheus_parse::Scrape;
14use serde::{Deserialize, Serialize};
15
16use crate::{
17 benchmark::{BenchmarkParameters, BenchmarkType},
18 display,
19 protocol::ProtocolMetrics,
20 settings::Settings,
21};
22
23type BucketId = String;
25
26#[derive(Serialize, Deserialize, Default, Clone)]
28pub struct Measurement {
29 timestamp: Duration,
31 buckets: HashMap<BucketId, usize>,
33 sum: Duration,
35 count: usize,
37 squared_sum: Duration,
39}
40
41impl Measurement {
42 pub fn from_prometheus<M: ProtocolMetrics>(text: &str) -> Self {
44 let br = std::io::BufReader::new(text.as_bytes());
45 let parsed = Scrape::parse(br.lines()).unwrap();
46
47 let buckets: HashMap<_, _> = parsed
48 .samples
49 .iter()
50 .find(|x| x.metric == M::LATENCY_BUCKETS)
51 .map(|x| match &x.value {
52 prometheus_parse::Value::Histogram(values) => values
53 .iter()
54 .map(|x| {
55 let bucket_id = x.less_than.to_string();
56 let count = x.count as usize;
57 (bucket_id, count)
58 })
59 .collect(),
60 _ => panic!("Unexpected scraped value"),
61 })
62 .unwrap_or_default();
63
64 let sum = parsed
65 .samples
66 .iter()
67 .find(|x| x.metric == M::LATENCY_SUM)
68 .map(|x| match x.value {
69 prometheus_parse::Value::Untyped(value) => Duration::from_secs_f64(value),
70 _ => panic!("Unexpected scraped value"),
71 })
72 .unwrap_or_default();
73
74 let count = parsed
75 .samples
76 .iter()
77 .find(|x| x.metric == M::TOTAL_TRANSACTIONS)
78 .map(|x| match x.value {
79 prometheus_parse::Value::Untyped(value) => value as usize,
80 _ => panic!("Unexpected scraped value"),
81 })
82 .unwrap_or_default();
83
84 let squared_sum = parsed
85 .samples
86 .iter()
87 .find(|x| x.metric == M::LATENCY_SQUARED_SUM)
88 .map(|x| match x.value {
89 prometheus_parse::Value::Counter(value) => Duration::from_secs_f64(value),
90 _ => panic!("Unexpected scraped value"),
91 })
92 .unwrap_or_default();
93
94 let timestamp = parsed
95 .samples
96 .iter()
97 .find(|x| x.metric == M::BENCHMARK_DURATION)
98 .map(|x| match x.value {
99 prometheus_parse::Value::Gauge(value) => Duration::from_secs(value as u64),
100 _ => panic!("Unexpected scraped value"),
101 })
102 .unwrap_or_default();
103
104 Self {
105 timestamp,
106 buckets,
107 sum,
108 count,
109 squared_sum,
110 }
111 }
112
113 pub fn tps(&self, duration: &Duration) -> u64 {
118 let tps = self.count.checked_div(duration.as_secs() as usize);
119 tps.unwrap_or_default() as u64
120 }
121
122 pub fn average_latency(&self) -> Duration {
124 self.sum.checked_div(self.count as u32).unwrap_or_default()
125 }
126
127 pub fn stdev_latency(&self) -> Duration {
130 let first_term = if self.count == 0 {
132 0.0
133 } else {
134 self.squared_sum.as_secs_f64() / self.count as f64
135 };
136
137 let squared_avg = self.average_latency().as_secs_f64().powf(2.0);
139
140 let variance = if squared_avg > first_term {
142 0.0
143 } else {
144 first_term - squared_avg
145 };
146
147 let stdev = variance.sqrt();
149 Duration::from_secs_f64(stdev)
150 }
151
152 #[cfg(test)]
153 pub fn new_for_test() -> Self {
154 Self {
155 timestamp: Duration::from_secs(30),
156 buckets: HashMap::new(),
157 sum: Duration::from_secs(1265),
158 count: 1860,
159 squared_sum: Duration::from_secs(952),
160 }
161 }
162}
163
164type ScraperId = usize;
166
167#[derive(Serialize, Deserialize, Clone)]
168pub struct MeasurementsCollection<T> {
169 pub machine_specs: String,
171 pub commit: String,
173 pub parameters: BenchmarkParameters<T>,
175 pub scrapers: HashMap<ScraperId, Vec<Measurement>>,
177}
178
179impl<T: BenchmarkType> MeasurementsCollection<T> {
180 pub fn new(settings: &Settings, parameters: BenchmarkParameters<T>) -> Self {
182 Self {
183 machine_specs: settings.specs.clone(),
184 commit: settings.repository.commit.clone(),
185 parameters,
186 scrapers: HashMap::new(),
187 }
188 }
189
190 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
192 let data = fs::read(path)?;
193 let measurements: Self = serde_json::from_slice(data.as_slice())?;
194 Ok(measurements)
195 }
196
197 pub fn add(&mut self, scraper_id: ScraperId, measurement: Measurement) {
199 self.scrapers
200 .entry(scraper_id)
201 .or_default()
202 .push(measurement);
203 }
204
205 pub fn transaction_load(&self) -> usize {
207 self.parameters.load
208 }
209
210 pub fn benchmark_duration(&self) -> Duration {
212 self.scrapers
213 .values()
214 .filter_map(|x| x.last())
215 .map(|x| x.timestamp)
216 .max()
217 .unwrap_or_default()
218 }
219
220 pub fn aggregate_tps(&self) -> u64 {
222 let duration = self
223 .scrapers
224 .values()
225 .filter_map(|x| x.last())
226 .map(|x| x.timestamp)
227 .max()
228 .unwrap_or_default();
229 self.scrapers
230 .values()
231 .filter_map(|x| x.last())
232 .map(|x| x.tps(&duration))
233 .sum()
234 }
235
236 pub fn aggregate_average_latency(&self) -> Duration {
238 let last_data_points: Vec<_> = self.scrapers.values().filter_map(|x| x.last()).collect();
239 last_data_points
240 .iter()
241 .map(|x| x.average_latency())
242 .sum::<Duration>()
243 .checked_div(last_data_points.len() as u32)
244 .unwrap_or_default()
245 }
246
247 pub fn aggregate_stdev_latency(&self) -> Duration {
249 self.scrapers
250 .values()
251 .filter_map(|x| x.last())
252 .map(|x| x.stdev_latency())
253 .max()
254 .unwrap_or_default()
255 }
256
257 pub fn save<P: AsRef<Path>>(&self, path: P) {
259 let json = serde_json::to_string_pretty(self).expect("Cannot serialize metrics");
260 let mut file = PathBuf::from(path.as_ref());
261 file.push(format!("measurements-{:?}.json", self.parameters));
262 fs::write(file, json).unwrap();
263 }
264
265 pub fn display_summary(&self) {
267 let duration = self.benchmark_duration();
268 let total_tps = self.aggregate_tps();
269 let average_latency = self.aggregate_average_latency();
270 let stdev_latency = self.aggregate_stdev_latency();
271
272 let mut table = Table::new();
273 table.set_format(display::default_table_format());
274
275 table.set_titles(row![bH2->"Benchmark Summary"]);
276 table.add_row(row![b->"Benchmark type:", self.parameters.benchmark_type]);
277 table.add_row(row![bH2->""]);
278 table.add_row(row![b->"Nodes:", self.parameters.nodes]);
279 table.add_row(row![b->"Faults:", self.parameters.faults]);
280 table.add_row(row![b->"Load:", format!("{} tx/s", self.parameters.load)]);
281 table.add_row(row![b->"Duration:", format!("{} s", duration.as_secs())]);
282 table.add_row(row![bH2->""]);
283 table.add_row(row![b->"TPS:", format!("{total_tps} tx/s")]);
284 table.add_row(row![b->"Latency (avg):", format!("{} ms", average_latency.as_millis())]);
285 table.add_row(row![b->"Latency (stdev):", format!("{} ms", stdev_latency.as_millis())]);
286
287 display::newline();
288 table.printstd();
289 display::newline();
290 }
291}
292
293#[cfg(test)]
294mod test {
295 use std::{collections::HashMap, time::Duration};
296
297 use crate::{
298 benchmark::test::TestBenchmarkType, protocol::test_protocol_metrics::TestProtocolMetrics,
299 settings::Settings,
300 };
301
302 use super::{BenchmarkParameters, Measurement, MeasurementsCollection};
303
304 #[test]
305 fn average_latency() {
306 let data = Measurement {
307 timestamp: Duration::from_secs(10),
308 buckets: HashMap::new(),
309 sum: Duration::from_secs(2),
310 count: 100,
311 squared_sum: Duration::from_secs(0),
312 };
313
314 assert_eq!(data.average_latency(), Duration::from_millis(20));
315 }
316
317 #[test]
318 fn stdev_latency() {
319 let data = Measurement {
320 timestamp: Duration::from_secs(10),
321 buckets: HashMap::new(),
322 sum: Duration::from_secs(50),
323 count: 100,
324 squared_sum: Duration::from_secs(75),
325 };
326
327 assert_eq!(
329 data.squared_sum.checked_div(data.count as u32),
330 Some(Duration::from_secs_f64(0.75))
331 );
332 assert_eq!(data.average_latency().as_secs_f64().powf(2.0), 0.25);
334 let stdev = data.stdev_latency();
336 assert_eq!((stdev.as_secs_f64() * 10.0).round(), 7.0);
337 }
338
339 #[test]
340 fn prometheus_parse() {
341 let report = r#"
342 # HELP benchmark_duration Duration of the benchmark
343 # TYPE benchmark_duration gauge
344 benchmark_duration 30
345 # HELP latency_s Total time in seconds to return a response
346 # TYPE latency_s histogram
347 latency_s_bucket{workload=transfer_object,le=0.1} 0
348 latency_s_bucket{workload=transfer_object,le=0.25} 0
349 latency_s_bucket{workload=transfer_object,le=0.5} 506
350 latency_s_bucket{workload=transfer_object,le=0.75} 1282
351 latency_s_bucket{workload=transfer_object,le=1} 1693
352 latency_s_bucket{workload="transfer_object",le="1.25"} 1816
353 latency_s_bucket{workload="transfer_object",le="1.5"} 1860
354 latency_s_bucket{workload="transfer_object",le="1.75"} 1860
355 latency_s_bucket{workload="transfer_object",le="2"} 1860
356 latency_s_bucket{workload=transfer_object,le=2.5} 1860
357 latency_s_bucket{workload=transfer_object,le=5} 1860
358 latency_s_bucket{workload=transfer_object,le=10} 1860
359 latency_s_bucket{workload=transfer_object,le=20} 1860
360 latency_s_bucket{workload=transfer_object,le=30} 1860
361 latency_s_bucket{workload=transfer_object,le=60} 1860
362 latency_s_bucket{workload=transfer_object,le=90} 1860
363 latency_s_bucket{workload=transfer_object,le=+Inf} 1860
364 latency_s_sum{workload=transfer_object} 1265.287933130998
365 latency_s_count{workload=transfer_object} 1860
366 # HELP latency_squared_s Square of total time in seconds to return a response
367 # TYPE latency_squared_s counter
368 latency_squared_s{workload="transfer_object"} 952.8160642745289
369 "#;
370
371 let measurement = Measurement::from_prometheus::<TestProtocolMetrics>(report);
372 let settings = Settings::new_for_test();
373 let mut aggregator = MeasurementsCollection::<TestBenchmarkType>::new(
374 &settings,
375 BenchmarkParameters::default(),
376 );
377 let scraper_id = 1;
378 aggregator.add(scraper_id, measurement);
379
380 assert_eq!(aggregator.scrapers.len(), 1);
381 let data_points = aggregator.scrapers.get(&scraper_id).unwrap();
382 assert_eq!(data_points.len(), 1);
383
384 let data = &data_points[0];
385 assert_eq!(
386 data.buckets,
387 ([
388 ("0.1".into(), 0),
389 ("0.25".into(), 0),
390 ("0.5".into(), 506),
391 ("0.75".into(), 1282),
392 ("1".into(), 1693),
393 ("1.25".into(), 1816),
394 ("1.5".into(), 1860),
395 ("1.75".into(), 1860),
396 ("2".into(), 1860),
397 ("2.5".into(), 1860),
398 ("5".into(), 1860),
399 ("10".into(), 1860),
400 ("20".into(), 1860),
401 ("30".into(), 1860),
402 ("60".into(), 1860),
403 ("90".into(), 1860),
404 ("inf".into(), 1860)
405 ])
406 .iter()
407 .cloned()
408 .collect()
409 );
410 assert_eq!(data.sum.as_secs(), 1265);
411 assert_eq!(data.count, 1860);
412 assert_eq!(data.timestamp.as_secs(), 30);
413 assert_eq!(data.squared_sum.as_secs(), 952);
414 }
415}