sui_aws_orchestrator/
main.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{str::FromStr, time::Duration};
5
6use benchmark::{BenchmarkParametersGenerator, LoadType};
7use clap::Parser;
8use client::{ServerProviderClient, aws::AwsClient};
9use eyre::{Context, Result};
10use faults::FaultsType;
11use measurement::MeasurementsCollection;
12use orchestrator::Orchestrator;
13use protocol::narwhal::{NarwhalBenchmarkType, NarwhalProtocol};
14use settings::{CloudProvider, Settings};
15use ssh::SshConnectionManager;
16use testbed::Testbed;
17
18pub mod benchmark;
19pub mod client;
20pub mod display;
21pub mod error;
22pub mod faults;
23pub mod logs;
24pub mod measurement;
25mod monitor;
26pub mod orchestrator;
27pub mod protocol;
28pub mod settings;
29pub mod ssh;
30pub mod testbed;
31
32/// NOTE: Link these types to the correct protocol. Either Sui or Narwhal.
33// use protocol::sui::{SuiBenchmarkType, SuiProtocol};
34// type Protocol = SuiProtocol;
35// type BenchmarkType = SuiBenchmarkType;
36type Protocol = NarwhalProtocol;
37type BenchmarkType = NarwhalBenchmarkType;
38
39#[derive(Parser)]
40#[command(author, version, about = "Testbed orchestrator", long_about = None)]
41pub struct Opts {
42    /// The path to the settings file. This file contains basic information to deploy testbeds
43    /// and run benchmarks such as the url of the git repo, the commit to deploy, etc.
44    #[clap(
45        long,
46        value_name = "FILE",
47        default_value = "crates/sui-aws-orchestrator/assets/settings.json",
48        global = true
49    )]
50    settings_path: String,
51
52    /// The type of operation to run.
53    #[clap(subcommand)]
54    operation: Operation,
55}
56
57#[derive(Parser)]
58pub enum Operation {
59    /// Get or modify the status of the testbed.
60    Testbed {
61        #[clap(subcommand)]
62        action: TestbedAction,
63    },
64
65    /// Run a benchmark on the specified testbed.
66    Benchmark {
67        /// Percentage of shared vs owned objects; 0 means only owned objects and 100 means
68        /// only shared objects.
69        #[clap(long, default_value = "0", global = true)]
70        benchmark_type: String,
71
72        /// The committee size to deploy.
73        #[clap(long, value_name = "INT")]
74        committee: usize,
75
76        /// Number of faulty nodes.
77        #[clap(long, value_name = "INT", default_value = "0", global = true)]
78        faults: usize,
79
80        /// Whether the faulty nodes recover.
81        #[clap(long, action, default_value = "false", global = true)]
82        crash_recovery: bool,
83
84        /// The interval to crash nodes in seconds.
85        #[clap(long, value_parser = parse_duration, default_value = "60", global = true)]
86        crash_interval: Duration,
87
88        /// The minimum duration of the benchmark in seconds.
89        #[clap(long, value_parser = parse_duration, default_value = "600", global = true)]
90        duration: Duration,
91
92        /// The interval between measurements collection in seconds.
93        #[clap(long, value_parser = parse_duration, default_value = "15", global = true)]
94        scrape_interval: Duration,
95
96        /// Whether to skip testbed updates before running benchmarks.
97        #[clap(long, action, default_value = "false", global = true)]
98        skip_testbed_update: bool,
99
100        /// Whether to skip testbed configuration before running benchmarks.
101        #[clap(long, action, default_value = "false", global = true)]
102        skip_testbed_configuration: bool,
103
104        /// Whether to download and analyze the client and node log files.
105        #[clap(long, action, default_value = "false", global = true)]
106        log_processing: bool,
107
108        /// The number of instances running exclusively load generators. If set to zero the
109        /// orchestrator collocates one load generator with each node.
110        #[clap(long, value_name = "INT", default_value = "0", global = true)]
111        dedicated_clients: usize,
112
113        /// Whether to forgo a grafana and prometheus instance and leave the testbed unmonitored.
114        #[clap(long, action, default_value = "false", global = true)]
115        skip_monitoring: bool,
116
117        /// The timeout duration for ssh commands (in seconds).
118        #[clap(long, action, value_parser = parse_duration, default_value = "30", global = true)]
119        timeout: Duration,
120
121        /// The number of times the orchestrator should retry an ssh command.
122        #[clap(long, value_name = "INT", default_value = "5", global = true)]
123        retries: usize,
124
125        /// The load to submit to the system.
126        #[clap(subcommand)]
127        load_type: Load,
128    },
129
130    /// Print a summary of the specified measurements collection.
131    Summarize {
132        /// The path to the settings file.
133        #[clap(long, value_name = "FILE")]
134        path: String,
135    },
136}
137
138#[derive(Parser)]
139#[clap(rename_all = "kebab-case")]
140pub enum TestbedAction {
141    /// Display the testbed status.
142    Status,
143
144    /// Deploy the specified number of instances in all regions specified by in the setting file.
145    Deploy {
146        /// Number of instances to deploy.
147        #[clap(long)]
148        instances: usize,
149
150        /// The region where to deploy the instances. If this parameter is not specified, the
151        /// command deploys the specified number of instances in all regions listed in the
152        /// setting file.
153        #[clap(long)]
154        region: Option<String>,
155    },
156
157    /// Start at most the specified number of instances per region on an existing testbed.
158    Start {
159        /// Number of instances to deploy.
160        #[clap(long, default_value = "200")]
161        instances: usize,
162    },
163
164    /// Stop an existing testbed (without destroying the instances).
165    Stop,
166
167    /// Destroy the testbed and terminate all instances.
168    Destroy,
169}
170
171#[derive(Parser)]
172pub enum Load {
173    /// The fixed loads (in tx/s) to submit to the nodes.
174    FixedLoad {
175        /// A list of fixed load (tx/s).
176        #[clap(
177            long,
178            value_name = "INT",
179            num_args(1..),
180            value_delimiter = ','
181        )]
182        loads: Vec<usize>,
183    },
184
185    /// Search for the maximum load that the system can sustainably handle.
186    Search {
187        /// The initial load (in tx/s) to test and use a baseline.
188        #[clap(long, value_name = "INT", default_value = "250")]
189        starting_load: usize,
190        /// The maximum number of iterations before converging on a breaking point.
191        #[clap(long, value_name = "INT", default_value = "5")]
192        max_iterations: usize,
193    },
194}
195
196fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
197    let seconds = arg.parse()?;
198    Ok(Duration::from_secs(seconds))
199}
200
201#[tokio::main]
202async fn main() -> Result<()> {
203    color_eyre::install()?;
204    let opts: Opts = Opts::parse();
205
206    // Load the settings files.
207    let settings = Settings::load(&opts.settings_path).wrap_err("Failed to load settings")?;
208
209    match &settings.cloud_provider {
210        CloudProvider::Aws => {
211            // Create the client for the cloud provider.
212            let client = AwsClient::new(settings.clone()).await;
213
214            // Execute the command.
215            run(settings, client, opts).await
216        }
217    }
218}
219
220async fn run<C: ServerProviderClient>(settings: Settings, client: C, opts: Opts) -> Result<()> {
221    // Create a new testbed.
222    let mut testbed = Testbed::new(settings.clone(), client)
223        .await
224        .wrap_err("Failed to create testbed")?;
225
226    match opts.operation {
227        Operation::Testbed { action } => match action {
228            // Display the current status of the testbed.
229            TestbedAction::Status => testbed.status(),
230
231            // Deploy the specified number of instances on the testbed.
232            TestbedAction::Deploy { instances, region } => testbed
233                .deploy(instances, region)
234                .await
235                .wrap_err("Failed to deploy testbed")?,
236
237            // Start the specified number of instances on an existing testbed.
238            TestbedAction::Start { instances } => testbed
239                .start(instances)
240                .await
241                .wrap_err("Failed to start testbed")?,
242
243            // Stop an existing testbed.
244            TestbedAction::Stop => testbed.stop().await.wrap_err("Failed to stop testbed")?,
245
246            // Destroy the testbed and terminal all instances.
247            TestbedAction::Destroy => testbed
248                .destroy()
249                .await
250                .wrap_err("Failed to destroy testbed")?,
251        },
252
253        // Run benchmarks.
254        Operation::Benchmark {
255            benchmark_type,
256            committee,
257            faults,
258            crash_recovery,
259            crash_interval,
260            duration,
261            scrape_interval,
262            skip_testbed_update,
263            skip_testbed_configuration,
264            log_processing,
265            dedicated_clients,
266            skip_monitoring,
267            timeout,
268            retries,
269            load_type,
270        } => {
271            // Create a new orchestrator to instruct the testbed.
272            let username = testbed.username();
273            let private_key_file = settings.ssh_private_key_file.clone();
274            let ssh_manager = SshConnectionManager::new(username.into(), private_key_file)
275                .with_timeout(timeout)
276                .with_retries(retries);
277
278            let instances = testbed.instances();
279
280            let setup_commands = testbed
281                .setup_commands()
282                .await
283                .wrap_err("Failed to load testbed setup commands")?;
284
285            let protocol_commands = Protocol::new(&settings);
286            let benchmark_type = BenchmarkType::from_str(&benchmark_type)?;
287
288            let load = match load_type {
289                Load::FixedLoad { loads } => {
290                    let loads = if loads.is_empty() { vec![200] } else { loads };
291                    LoadType::Fixed(loads)
292                }
293                Load::Search {
294                    starting_load,
295                    max_iterations,
296                } => LoadType::Search {
297                    starting_load,
298                    max_iterations,
299                },
300            };
301
302            let fault_type = if !crash_recovery || faults == 0 {
303                FaultsType::Permanent { faults }
304            } else {
305                FaultsType::CrashRecovery {
306                    max_faults: faults,
307                    interval: crash_interval,
308                }
309            };
310
311            let generator = BenchmarkParametersGenerator::new(committee, load)
312                .with_benchmark_type(benchmark_type)
313                .with_custom_duration(duration)
314                .with_faults(fault_type);
315
316            Orchestrator::new(
317                settings,
318                instances,
319                setup_commands,
320                protocol_commands,
321                ssh_manager,
322            )
323            .with_scrape_interval(scrape_interval)
324            .with_crash_interval(crash_interval)
325            .skip_testbed_updates(skip_testbed_update)
326            .skip_testbed_configuration(skip_testbed_configuration)
327            .with_log_processing(log_processing)
328            .with_dedicated_clients(dedicated_clients)
329            .skip_monitoring(skip_monitoring)
330            .run_benchmarks(generator)
331            .await
332            .wrap_err("Failed to run benchmarks")?;
333        }
334
335        // Print a summary of the specified measurements collection.
336        Operation::Summarize { path } => {
337            MeasurementsCollection::<BenchmarkType>::load(path)?.display_summary()
338        }
339    }
340    Ok(())
341}