1use std::{str::FromStr, time::Duration};
5
6use benchmark::{BenchmarkParametersGenerator, LoadType};
7use clap::Parser;
8use client::{ServerProviderClient, aws::AwsClient};
9use eyre::{Context, Result};
10use faults::FaultsType;
11use measurement::MeasurementsCollection;
12use orchestrator::Orchestrator;
13use protocol::narwhal::{NarwhalBenchmarkType, NarwhalProtocol};
14use settings::{CloudProvider, Settings};
15use ssh::SshConnectionManager;
16use testbed::Testbed;
17
18pub mod benchmark;
19pub mod client;
20pub mod display;
21pub mod error;
22pub mod faults;
23pub mod logs;
24pub mod measurement;
25mod monitor;
26pub mod orchestrator;
27pub mod protocol;
28pub mod settings;
29pub mod ssh;
30pub mod testbed;
31
32type Protocol = NarwhalProtocol;
37type BenchmarkType = NarwhalBenchmarkType;
38
39#[derive(Parser)]
40#[command(author, version, about = "Testbed orchestrator", long_about = None)]
41pub struct Opts {
42 #[clap(
45 long,
46 value_name = "FILE",
47 default_value = "crates/sui-aws-orchestrator/assets/settings.json",
48 global = true
49 )]
50 settings_path: String,
51
52 #[clap(subcommand)]
54 operation: Operation,
55}
56
57#[derive(Parser)]
58pub enum Operation {
59 Testbed {
61 #[clap(subcommand)]
62 action: TestbedAction,
63 },
64
65 Benchmark {
67 #[clap(long, default_value = "0", global = true)]
70 benchmark_type: String,
71
72 #[clap(long, value_name = "INT")]
74 committee: usize,
75
76 #[clap(long, value_name = "INT", default_value = "0", global = true)]
78 faults: usize,
79
80 #[clap(long, action, default_value = "false", global = true)]
82 crash_recovery: bool,
83
84 #[clap(long, value_parser = parse_duration, default_value = "60", global = true)]
86 crash_interval: Duration,
87
88 #[clap(long, value_parser = parse_duration, default_value = "600", global = true)]
90 duration: Duration,
91
92 #[clap(long, value_parser = parse_duration, default_value = "15", global = true)]
94 scrape_interval: Duration,
95
96 #[clap(long, action, default_value = "false", global = true)]
98 skip_testbed_update: bool,
99
100 #[clap(long, action, default_value = "false", global = true)]
102 skip_testbed_configuration: bool,
103
104 #[clap(long, action, default_value = "false", global = true)]
106 log_processing: bool,
107
108 #[clap(long, value_name = "INT", default_value = "0", global = true)]
111 dedicated_clients: usize,
112
113 #[clap(long, action, default_value = "false", global = true)]
115 skip_monitoring: bool,
116
117 #[clap(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119 timeout: Duration,
120
121 #[clap(long, value_name = "INT", default_value = "5", global = true)]
123 retries: usize,
124
125 #[clap(subcommand)]
127 load_type: Load,
128 },
129
130 Summarize {
132 #[clap(long, value_name = "FILE")]
134 path: String,
135 },
136}
137
138#[derive(Parser)]
139#[clap(rename_all = "kebab-case")]
140pub enum TestbedAction {
141 Status,
143
144 Deploy {
146 #[clap(long)]
148 instances: usize,
149
150 #[clap(long)]
154 region: Option<String>,
155 },
156
157 Start {
159 #[clap(long, default_value = "200")]
161 instances: usize,
162 },
163
164 Stop,
166
167 Destroy,
169}
170
171#[derive(Parser)]
172pub enum Load {
173 FixedLoad {
175 #[clap(
177 long,
178 value_name = "INT",
179 num_args(1..),
180 value_delimiter = ','
181 )]
182 loads: Vec<usize>,
183 },
184
185 Search {
187 #[clap(long, value_name = "INT", default_value = "250")]
189 starting_load: usize,
190 #[clap(long, value_name = "INT", default_value = "5")]
192 max_iterations: usize,
193 },
194}
195
196fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
197 let seconds = arg.parse()?;
198 Ok(Duration::from_secs(seconds))
199}
200
201#[tokio::main]
202async fn main() -> Result<()> {
203 color_eyre::install()?;
204 let opts: Opts = Opts::parse();
205
206 let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
208
209 match &settings.cloud_provider {
210 CloudProvider::Aws => {
211 let client = AwsClient::new(settings.clone()).await;
213
214 run(settings, client, opts).await
216 }
217 }
218}
219
220async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
221 let mut testbed = Testbed::new(settings.clone(), client)
223 .await
224 .wrap_err("Failed to create testbed")?;
225
226 match opts.operation {
227 Operation::Testbed { action } => match action {
228 TestbedAction::Status => testbed.status(),
230
231 TestbedAction::Deploy { instances, region } => testbed
233 .deploy(instances, region)
234 .await
235 .wrap_err("Failed to deploy testbed")?,
236
237 TestbedAction::Start { instances } => testbed
239 .start(instances)
240 .await
241 .wrap_err("Failed to start testbed")?,
242
243 TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
245
246 TestbedAction::Destroy => testbed
248 .destroy()
249 .await
250 .wrap_err("Failed to destroy testbed")?,
251 },
252
253 Operation::Benchmark {
255 benchmark_type,
256 committee,
257 faults,
258 crash_recovery,
259 crash_interval,
260 duration,
261 scrape_interval,
262 skip_testbed_update,
263 skip_testbed_configuration,
264 log_processing,
265 dedicated_clients,
266 skip_monitoring,
267 timeout,
268 retries,
269 load_type,
270 } => {
271 let username = testbed.username();
273 let private_key_file = settings.ssh_private_key_file.clone();
274 let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
275 .with_timeout(timeout)
276 .with_retries(retries);
277
278 let instances = testbed.instances();
279
280 let setup_commands = testbed
281 .setup_commands()
282 .await
283 .wrap_err("Failed to load testbed setup commands")?;
284
285 let protocol_commands = Protocol::new(&settings);
286 let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
287
288 let load = match load_type {
289 Load::FixedLoad { loads } => {
290 let loads = if loads.is_empty() { vec![200] } else { loads };
291 LoadType::Fixed(loads)
292 }
293 Load::Search {
294 starting_load,
295 max_iterations,
296 } => LoadType::Search {
297 starting_load,
298 max_iterations,
299 },
300 };
301
302 let fault_type = if !crash_recovery || faults == 0 {
303 FaultsType::Permanent { faults }
304 } else {
305 FaultsType::CrashRecovery {
306 max_faults: faults,
307 interval: crash_interval,
308 }
309 };
310
311 let generator = BenchmarkParametersGenerator::new(committee, load)
312 .with_benchmark_type(benchmark_type)
313 .with_custom_duration(duration)
314 .with_faults(fault_type);
315
316 Orchestrator::new(
317 settings,
318 instances,
319 setup_commands,
320 protocol_commands,
321 ssh_manager,
322 )
323 .with_scrape_interval(scrape_interval)
324 .with_crash_interval(crash_interval)
325 .skip_testbed_updates(skip_testbed_update)
326 .skip_testbed_configuration(skip_testbed_configuration)
327 .with_log_processing(log_processing)
328 .with_dedicated_clients(dedicated_clients)
329 .skip_monitoring(skip_monitoring)
330 .run_benchmarks(generator)
331 .await
332 .wrap_err("Failed to run benchmarks")?;
333 }
334
335 Operation::Summarize { path } => {
337 MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
338 }
339 }
340 Ok(())
341}