sui_aws_orchestrator/
benchmark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    fmt::{Debug, Display},
6    hash::Hash,
7    str::FromStr,
8    time::Duration,
9};
10
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12
13use crate::{faults::FaultsType, measurement::MeasurementsCollection};
14
15pub trait BenchmarkType:
16    Serialize
17    + DeserializeOwned
18    + Default
19    + Clone
20    + FromStr
21    + Display
22    + Debug
23    + PartialEq
24    + Eq
25    + Hash
26    + PartialOrd
27    + Ord
28    + FromStr
29{
30}
31
32/// The benchmark parameters for a run.
33#[derive(Serialize, Deserialize, Clone)]
34pub struct BenchmarkParameters<T> {
35    /// The type of benchmark to run.
36    pub benchmark_type: T,
37    /// The committee size.
38    pub nodes: usize,
39    /// The number of (crash-)faults.
40    pub faults: FaultsType,
41    /// The total load (tx/s) to submit to the system.
42    pub load: usize,
43    /// The duration of the benchmark.
44    pub duration: Duration,
45}
46
47impl<T: BenchmarkType> Default for BenchmarkParameters<T> {
48    fn default() -> Self {
49        Self {
50            benchmark_type: T::default(),
51            nodes: 4,
52            faults: FaultsType::default(),
53            load: 500,
54            duration: Duration::from_secs(60),
55        }
56    }
57}
58
59impl<T: BenchmarkType> Debug for BenchmarkParameters<T> {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(
62            f,
63            "{:?}-{:?}-{}-{}",
64            self.benchmark_type, self.faults, self.nodes, self.load
65        )
66    }
67}
68
69impl<T> Display for BenchmarkParameters<T> {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(
72            f,
73            "{} nodes ({}) - {} tx/s",
74            self.nodes, self.faults, self.load
75        )
76    }
77}
78
79impl<T> BenchmarkParameters<T> {
80    /// Make a new benchmark parameters.
81    pub fn new(
82        benchmark_type: T,
83        nodes: usize,
84        faults: FaultsType,
85        load: usize,
86        duration: Duration,
87    ) -> Self {
88        Self {
89            benchmark_type,
90            nodes,
91            faults,
92            load,
93            duration,
94        }
95    }
96}
97
98/// The load type to submit to the nodes.
99pub enum LoadType {
100    /// Submit a fixed set of loads (one per benchmark run).
101    Fixed(Vec<usize>),
102
103    /// Search for the breaking point of the L-graph.
104    // TODO: Doesn't work very well, use tps regression as additional signal.
105    #[allow(dead_code)]
106    Search {
107        /// The initial load to test (and use a baseline).
108        starting_load: usize,
109        /// The maximum number of iterations before converging on a breaking point.
110        max_iterations: usize,
111    },
112}
113
114/// Generate benchmark parameters (one set of parameters per run).
115// TODO: The rusty thing to do would be to implement Iter.
116pub struct BenchmarkParametersGenerator<T> {
117    /// The type of benchmark to run.
118    benchmark_type: T,
119    /// The committee size.
120    pub nodes: usize,
121    /// The load type.
122    load_type: LoadType,
123    /// The number of faulty nodes.
124    pub faults: FaultsType,
125    /// The duration of the benchmark.
126    duration: Duration,
127    /// The load of the next benchmark run.
128    next_load: Option<usize>,
129    /// Temporary hold a lower bound of the breaking point.
130    lower_bound_result: Option<MeasurementsCollection<T>>,
131    /// Temporary hold an upper bound of the breaking point.
132    upper_bound_result: Option<MeasurementsCollection<T>>,
133    /// The current number of iterations.
134    iterations: usize,
135}
136
137impl<T: BenchmarkType> Iterator for BenchmarkParametersGenerator<T> {
138    type Item = BenchmarkParameters<T>;
139
140    /// Return the next set of benchmark parameters to run.
141    fn next(&mut self) -> Option<Self::Item> {
142        self.next_load.map(|load| {
143            BenchmarkParameters::new(
144                self.benchmark_type.clone(),
145                self.nodes,
146                self.faults.clone(),
147                load,
148                self.duration,
149            )
150        })
151    }
152}
153
154impl<T: BenchmarkType> BenchmarkParametersGenerator<T> {
155    /// The default benchmark duration.
156    const DEFAULT_DURATION: Duration = Duration::from_secs(180);
157
158    /// make a new generator.
159    pub fn new(nodes: usize, mut load_type: LoadType) -> Self {
160        let next_load = match &mut load_type {
161            LoadType::Fixed(loads) => {
162                if loads.is_empty() {
163                    None
164                } else {
165                    Some(loads.remove(0))
166                }
167            }
168            LoadType::Search { starting_load, .. } => Some(*starting_load),
169        };
170        Self {
171            benchmark_type: T::default(),
172            nodes,
173            load_type,
174            faults: FaultsType::default(),
175            duration: Self::DEFAULT_DURATION,
176            next_load,
177            lower_bound_result: None,
178            upper_bound_result: None,
179            iterations: 0,
180        }
181    }
182
183    /// Set the benchmark type.
184    pub fn with_benchmark_type(mut self, benchmark_type: T) -> Self {
185        self.benchmark_type = benchmark_type;
186        self
187    }
188
189    /// Set crash-recovery pattern and the number of faulty nodes.
190    pub fn with_faults(mut self, faults: FaultsType) -> Self {
191        self.faults = faults;
192        self
193    }
194
195    /// Set a custom benchmark duration.
196    pub fn with_custom_duration(mut self, duration: Duration) -> Self {
197        self.duration = duration;
198        self
199    }
200
201    /// Detects whether the latest benchmark parameters run the system out of capacity.
202    fn out_of_capacity(
203        last_result: &MeasurementsCollection<T>,
204        new_result: &MeasurementsCollection<T>,
205    ) -> bool {
206        // We consider the system is out of capacity if the latency increased by over 5x with
207        // respect to the latest run.
208        let threshold = last_result.aggregate_average_latency() * 5;
209        let high_latency = new_result.aggregate_average_latency() > threshold;
210
211        // Or if the throughput is less than 2/3 of the input rate.
212        let last_load = new_result.transaction_load() as u64;
213        let no_throughput_increase = new_result.aggregate_tps() < (2 * last_load / 3);
214
215        high_latency || no_throughput_increase
216    }
217
218    /// Register a new benchmark measurements collection. These results are used to determine
219    /// whether the system reached its breaking point.
220    pub fn register_result(&mut self, result: MeasurementsCollection<T>) {
221        self.next_load = match &mut self.load_type {
222            LoadType::Fixed(loads) => {
223                if loads.is_empty() {
224                    None
225                } else {
226                    Some(loads.remove(0))
227                }
228            }
229            LoadType::Search { max_iterations, .. } => {
230                // Terminate the search.
231                if self.iterations >= *max_iterations {
232                    None
233
234                // Search for the breaking point.
235                } else {
236                    self.iterations += 1;
237                    match (&mut self.lower_bound_result, &mut self.upper_bound_result) {
238                        (None, None) => {
239                            let next = result.transaction_load() * 2;
240                            self.lower_bound_result = Some(result);
241                            Some(next)
242                        }
243                        (Some(lower), None) => {
244                            if Self::out_of_capacity(lower, &result) {
245                                let next =
246                                    (lower.transaction_load() + result.transaction_load()) / 2;
247                                self.upper_bound_result = Some(result);
248                                Some(next)
249                            } else {
250                                let next = result.transaction_load() * 2;
251                                *lower = result;
252                                Some(next)
253                            }
254                        }
255                        (Some(lower), Some(upper)) => {
256                            if Self::out_of_capacity(lower, &result) {
257                                *upper = result;
258                            } else {
259                                *lower = result;
260                            }
261                            Some((lower.transaction_load() + upper.transaction_load()) / 2)
262                        }
263                        _ => panic!("Benchmark parameters generator is in an incoherent state"),
264                    }
265                }
266            }
267        };
268    }
269}
270
271#[cfg(test)]
272pub mod test {
273    use std::{fmt::Display, str::FromStr};
274
275    use serde::{Deserialize, Serialize};
276
277    use crate::{
278        measurement::{Measurement, MeasurementsCollection},
279        settings::Settings,
280    };
281
282    use super::{BenchmarkParametersGenerator, BenchmarkType, LoadType};
283
284    /// Mock benchmark type for unit tests.
285    #[derive(
286        Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Default,
287    )]
288    pub struct TestBenchmarkType;
289
290    impl Display for TestBenchmarkType {
291        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292            write!(f, "TestBenchmarkType")
293        }
294    }
295
296    impl FromStr for TestBenchmarkType {
297        type Err = ();
298
299        fn from_str(_s: &str) -> Result<Self, Self::Err> {
300            Ok(Self {})
301        }
302    }
303
304    impl BenchmarkType for TestBenchmarkType {}
305
306    #[test]
307    fn set_lower_bound() {
308        let settings = Settings::new_for_test();
309        let nodes = 4;
310        let load = LoadType::Search {
311            starting_load: 100,
312            max_iterations: 10,
313        };
314        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
315        let parameters = generator.next().unwrap();
316
317        let collection = MeasurementsCollection::new(&settings, parameters);
318        generator.register_result(collection);
319
320        let next_parameters = generator.next();
321        assert!(next_parameters.is_some());
322        assert_eq!(next_parameters.unwrap().load, 200);
323
324        assert!(generator.lower_bound_result.is_some());
325        assert_eq!(
326            generator.lower_bound_result.unwrap().transaction_load(),
327            100
328        );
329        assert!(generator.upper_bound_result.is_none());
330    }
331
332    #[test]
333    fn set_upper_bound() {
334        let settings = Settings::new_for_test();
335        let nodes = 4;
336        let load = LoadType::Search {
337            starting_load: 100,
338            max_iterations: 10,
339        };
340        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
341        let first_parameters = generator.next().unwrap();
342
343        // Register a first result (zero latency). This sets the lower bound.
344        let collection = MeasurementsCollection::new(&settings, first_parameters);
345        generator.register_result(collection);
346        let second_parameters = generator.next().unwrap();
347
348        // Register a second result (with positive latency). This sets the upper bound.
349        let mut collection = MeasurementsCollection::new(&settings, second_parameters);
350        let measurement = Measurement::new_for_test();
351        collection.scrapers.insert(1, vec![measurement]);
352        generator.register_result(collection);
353
354        // Ensure the next load is between the upper and the lower bound.
355        let third_parameters = generator.next();
356        assert!(third_parameters.is_some());
357        assert_eq!(third_parameters.unwrap().load, 150);
358
359        assert!(generator.lower_bound_result.is_some());
360        assert_eq!(
361            generator.lower_bound_result.unwrap().transaction_load(),
362            100
363        );
364        assert!(generator.upper_bound_result.is_some());
365        assert_eq!(
366            generator.upper_bound_result.unwrap().transaction_load(),
367            200
368        );
369    }
370
371    #[test]
372    fn max_iterations() {
373        let settings = Settings::new_for_test();
374        let nodes = 4;
375        let load = LoadType::Search {
376            starting_load: 100,
377            max_iterations: 0,
378        };
379        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
380        let parameters = generator.next().unwrap();
381
382        let collection = MeasurementsCollection::new(&settings, parameters);
383        generator.register_result(collection);
384
385        let next_parameters = generator.next();
386        assert!(next_parameters.is_none());
387    }
388}