sui_aws_orchestrator/
orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{HashMap, VecDeque},
6    fs::{self},
7    marker::PhantomData,
8    path::PathBuf,
9    time::Duration,
10};
11
12use tokio::time::{self, Instant};
13
14use crate::monitor::Monitor;
15use crate::{
16    benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17    client::Instance,
18    display, ensure,
19    error::{TestbedError, TestbedResult},
20    faults::CrashRecoverySchedule,
21    logs::LogsAnalyzer,
22    measurement::{Measurement, MeasurementsCollection},
23    protocol::{ProtocolCommands, ProtocolMetrics},
24    settings::Settings,
25    ssh::{CommandContext, CommandStatus, SshConnectionManager},
26};
27
28/// An orchestrator to run benchmarks on a testbed.
29pub struct Orchestrator<P, T> {
30    /// The testbed's settings.
31    settings: Settings,
32    /// The state of the testbed (reflecting accurately the state of the machines).
33    instances: Vec<Instance>,
34    /// The type of the benchmark parameters.
35    benchmark_type: PhantomData<T>,
36    /// Provider-specific commands to install on the instance.
37    instance_setup_commands: Vec<String>,
38    /// Protocol-specific commands generator to generate the protocol configuration files,
39    /// boot clients and nodes, etc.
40    protocol_commands: P,
41    /// The interval between measurements collection.
42    scrape_interval: Duration,
43    /// The interval to crash nodes.
44    crash_interval: Duration,
45    /// Handle ssh connections to instances.
46    ssh_manager: SshConnectionManager,
47    /// Whether to skip testbed updates before running benchmarks.
48    skip_testbed_update: bool,
49    /// Whether to skip testbed configuration before running benchmarks.
50    skip_testbed_configuration: bool,
51    /// Whether to downloading and analyze the client and node log files.
52    log_processing: bool,
53    /// Number of instances running only load generators (not nodes). If this value is set
54    /// to zero, the orchestrator runs a load generate collocated with each node.
55    dedicated_clients: usize,
56    /// Whether to forgo a grafana and prometheus instance and leave the testbed unmonitored.
57    skip_monitoring: bool,
58}
59
60impl<P, T> Orchestrator<P, T> {
61    /// The default interval between measurements collection.
62    const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
63    /// The default interval to crash nodes.
64    const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
65
66    /// Make a new orchestrator.
67    pub fn new(
68        settings: Settings,
69        instances: Vec<Instance>,
70        instance_setup_commands: Vec<String>,
71        protocol_commands: P,
72        ssh_manager: SshConnectionManager,
73    ) -> Self {
74        Self {
75            settings,
76            instances,
77            benchmark_type: PhantomData,
78            instance_setup_commands,
79            protocol_commands,
80            ssh_manager,
81            scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
82            crash_interval: Self::DEFAULT_CRASH_INTERVAL,
83            skip_testbed_update: false,
84            skip_testbed_configuration: false,
85            log_processing: false,
86            dedicated_clients: 0,
87            skip_monitoring: false,
88        }
89    }
90
91    /// Set interval between measurements collection.
92    pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
93        self.scrape_interval = scrape_interval;
94        self
95    }
96
97    /// Set interval with which to crash nodes.
98    pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
99        self.crash_interval = crash_interval;
100        self
101    }
102
103    /// Set whether to skip testbed updates before running benchmarks.
104    pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
105        self.skip_testbed_update = skip_testbed_update;
106        self
107    }
108
109    /// Whether to skip testbed configuration before running benchmarks.
110    pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
111        self.skip_testbed_configuration = skip_testbed_configuration;
112        self
113    }
114
115    /// Set whether to download and analyze the client and node log files.
116    pub fn with_log_processing(mut self, log_processing: bool) -> Self {
117        self.log_processing = log_processing;
118        self
119    }
120
121    /// Set the number of instances running exclusively load generators.
122    pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
123        self.dedicated_clients = dedicated_clients;
124        self
125    }
126
127    /// Set whether to boot grafana on the local machine to monitor the nodes.
128    pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
129        self.skip_monitoring = skip_monitoring;
130        self
131    }
132
133    /// Select on which instances of the testbed to run the benchmarks. This function returns two vector
134    /// of instances; the first contains the instances on which to run the load generators and the second
135    /// contains the instances on which to run the nodes.
136    pub fn select_instances(
137        &self,
138        parameters: &BenchmarkParameters<T>,
139    ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
140        // Ensure there are enough active instances.
141        let available_instances: Vec<_> = self.instances.iter().filter(|x| x.is_active()).collect();
142        let minimum_instances = if self.skip_monitoring {
143            parameters.nodes + self.dedicated_clients
144        } else {
145            parameters.nodes + self.dedicated_clients + 1
146        };
147        ensure!(
148            available_instances.len() >= minimum_instances,
149            TestbedError::InsufficientCapacity(minimum_instances - available_instances.len())
150        );
151
152        // Sort the instances by region.
153        let mut instances_by_regions = HashMap::new();
154        for instance in available_instances {
155            instances_by_regions
156                .entry(&instance.region)
157                .or_insert_with(VecDeque::new)
158                .push_back(instance);
159        }
160
161        // Select the instance to host the monitoring stack.
162        let mut monitoring_instance = None;
163        if !self.skip_monitoring {
164            for region in &self.settings.regions {
165                if let Some(regional_instances) = instances_by_regions.get_mut(region) {
166                    if let Some(instance) = regional_instances.pop_front() {
167                        monitoring_instance = Some(instance.clone());
168                    }
169                    break;
170                }
171            }
172        }
173
174        // Select the instances to host exclusively load generators.
175        let mut client_instances = Vec::new();
176        for region in self.settings.regions.iter().cycle() {
177            if client_instances.len() == self.dedicated_clients {
178                break;
179            }
180            if let Some(regional_instances) = instances_by_regions.get_mut(region)
181                && let Some(instance) = regional_instances.pop_front()
182            {
183                client_instances.push(instance.clone());
184            }
185        }
186
187        // Select the instances to host the nodes.
188        let mut nodes_instances = Vec::new();
189        for region in self.settings.regions.iter().cycle() {
190            if nodes_instances.len() == parameters.nodes {
191                break;
192            }
193            if let Some(regional_instances) = instances_by_regions.get_mut(region)
194                && let Some(instance) = regional_instances.pop_front()
195            {
196                nodes_instances.push(instance.clone());
197            }
198        }
199
200        // Spawn a load generate collocated with each node if there are no instances dedicated
201        // to excursively run load generators.
202        if client_instances.is_empty() {
203            client_instances = nodes_instances.clone();
204        }
205
206        Ok((client_instances, nodes_instances, monitoring_instance))
207    }
208}
209
210impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
211    /// Boot one node per instance.
212    async fn boot_nodes(
213        &self,
214        instances: Vec<Instance>,
215        parameters: &BenchmarkParameters<T>,
216    ) -> TestbedResult<()> {
217        // Run one node per instance.
218        let targets = self
219            .protocol_commands
220            .node_command(instances.clone(), parameters);
221
222        let repo = self.settings.repository_name();
223        let context = CommandContext::new()
224            .run_background("node".into())
225            .with_log_file("~/node.log".into())
226            .with_execute_from_path(repo.into());
227        self.ssh_manager
228            .execute_per_instance(targets, context)
229            .await?;
230
231        // Wait until all nodes are reachable.
232        let commands = self
233            .protocol_commands
234            .nodes_metrics_command(instances.clone());
235        self.ssh_manager.wait_for_success(commands).await;
236
237        Ok(())
238    }
239
240    /// Install the codebase and its dependencies on the testbed.
241    pub async fn install(&self) -> TestbedResult<()> {
242        display::action("Installing dependencies on all machines");
243
244        let working_dir = self.settings.working_dir.display();
245        let url = &self.settings.repository.url;
246        let basic_commands = [
247            "sudo apt-get update",
248            "sudo apt-get -y upgrade",
249            "sudo apt-get -y autoremove",
250            // Disable "pending kernel upgrade" message.
251            "sudo apt-get -y remove needrestart",
252            // The following dependencies:
253            // * build-essential: prevent the error: [error: linker `cc` not found].
254            // * libssl-dev - Required to compile the orchestrator, todo remove this dependency
255            "sudo apt-get -y install build-essential libssl-dev",
256            // Install rust (non-interactive).
257            "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
258            "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
259            "source $HOME/.cargo/env",
260            "rustup default stable",
261            // Create the working directory.
262            &format!("mkdir -p {working_dir}"),
263            // Clone the repo.
264            &format!("(git clone {url} || true)"),
265        ];
266
267        let cloud_provider_specific_dependencies: Vec<_> = self
268            .instance_setup_commands
269            .iter()
270            .map(|x| x.as_str())
271            .collect();
272
273        let protocol_dependencies = self.protocol_commands.protocol_dependencies();
274
275        let command = [
276            &basic_commands[..],
277            &Monitor::dependencies()[..],
278            &cloud_provider_specific_dependencies[..],
279            &protocol_dependencies[..],
280        ]
281        .concat()
282        .join(" && ");
283
284        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
285        let context = CommandContext::default();
286        self.ssh_manager.execute(active, command, context).await?;
287
288        display::done();
289        Ok(())
290    }
291
292    /// Reload prometheus on all instances.
293    pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
294        let (clients, nodes, instance) = self.select_instances(parameters)?;
295        if let Some(instance) = instance {
296            display::action("Configuring monitoring instance");
297
298            let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
299            monitor.start_prometheus(&self.protocol_commands).await?;
300            monitor.start_grafana().await?;
301
302            display::done();
303            display::config("Grafana address", monitor.grafana_address());
304            display::newline();
305        }
306
307        Ok(())
308    }
309
310    /// Update all instances to use the version of the codebase specified in the setting file.
311    pub async fn update(&self) -> TestbedResult<()> {
312        display::action("Updating all instances");
313
314        // Update all active instances. This requires compiling the codebase in release (which
315        // may take a long time) so we run the command in the background to avoid keeping alive
316        // many ssh connections for too long.
317        let commit = &self.settings.repository.commit;
318        let command = [
319            "git fetch -f",
320            &format!("(git checkout -b {commit} {commit} || git checkout -f {commit})"),
321            "(git pull -f || true)",
322            "source $HOME/.cargo/env",
323            "cargo build --release",
324        ]
325        .join(" && ");
326
327        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
328
329        let id = "update";
330        let repo_name = self.settings.repository_name();
331        let context = CommandContext::new()
332            .run_background(id.into())
333            .with_execute_from_path(repo_name.into());
334        self.ssh_manager
335            .execute(active.clone(), command, context)
336            .await?;
337
338        // Wait until the command finished running.
339        self.ssh_manager
340            .wait_for_command(active, id, CommandStatus::Terminated)
341            .await?;
342
343        display::done();
344        Ok(())
345    }
346
347    /// Configure the instances with the appropriate configuration files.
348    pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
349        display::action("Configuring instances");
350
351        // Select instances to configure.
352        let (clients, nodes, _) = self.select_instances(parameters)?;
353
354        // Generate the genesis configuration file and the keystore allowing access to gas objects.
355        let command = self.protocol_commands.genesis_command(nodes.iter());
356        let repo_name = self.settings.repository_name();
357        let context = CommandContext::new().with_execute_from_path(repo_name.into());
358        let all = clients.into_iter().chain(nodes);
359        self.ssh_manager.execute(all, command, context).await?;
360
361        display::done();
362        Ok(())
363    }
364
365    /// Cleanup all instances and optionally delete their log files.
366    pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
367        display::action("Cleaning up testbed");
368
369        // Kill all tmux servers and delete the nodes dbs. Optionally clear logs.
370        let mut command = vec!["(tmux kill-server || true)".into()];
371        for path in self.protocol_commands.db_directories() {
372            command.push(format!("(rm -rf {} || true)", path.display()));
373        }
374        if cleanup {
375            command.push("(rm -rf ~/*log* || true)".into());
376        }
377        let command = command.join(" ; ");
378
379        // Execute the deletion on all machines.
380        let active = self.instances.iter().filter(|x| x.is_active()).cloned();
381        let context = CommandContext::default();
382        self.ssh_manager.execute(active, command, context).await?;
383
384        display::done();
385        Ok(())
386    }
387
388    /// Deploy the nodes.
389    pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
390        display::action("Deploying validators");
391
392        // Select the instances to run.
393        let (_, nodes, _) = self.select_instances(parameters)?;
394
395        // Boot one node per instance.
396        self.boot_nodes(nodes, parameters).await?;
397
398        display::done();
399        Ok(())
400    }
401
402    /// Deploy the load generators.
403    pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
404        display::action("Setting up load generators");
405
406        // Select the instances to run.
407        let (clients, _, _) = self.select_instances(parameters)?;
408
409        // Deploy the load generators.
410        let targets = self
411            .protocol_commands
412            .client_command(clients.clone(), parameters);
413
414        let repo = self.settings.repository_name();
415        let context = CommandContext::new()
416            .run_background("client".into())
417            .with_log_file("~/client.log".into())
418            .with_execute_from_path(repo.into());
419        self.ssh_manager
420            .execute_per_instance(targets, context)
421            .await?;
422
423        // Wait until all load generators are reachable.
424        let commands = self.protocol_commands.clients_metrics_command(clients);
425        self.ssh_manager.wait_for_success(commands).await;
426
427        display::done();
428        Ok(())
429    }
430
431    /// Collect metrics from the load generators.
432    pub async fn run(
433        &self,
434        parameters: &BenchmarkParameters<T>,
435    ) -> TestbedResult<MeasurementsCollection<T>> {
436        display::action(format!(
437            "Scraping metrics (at least {}s)",
438            parameters.duration.as_secs()
439        ));
440
441        // Select the instances to run.
442        let (clients, nodes, _) = self.select_instances(parameters)?;
443
444        // Regularly scrape the client
445        let mut metrics_commands = self.protocol_commands.clients_metrics_command(clients);
446
447        // TODO: Remove this when narwhal client latency metrics are available.
448        // We will be getting latency metrics directly from narwhal nodes instead from the nw client
449        metrics_commands.append(&mut self.protocol_commands.nodes_metrics_command(nodes.clone()));
450
451        let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
452        let mut metrics_interval = time::interval(self.scrape_interval);
453        metrics_interval.tick().await; // The first tick returns immediately.
454
455        let faults_type = parameters.faults.clone();
456        let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
457        let mut faults_interval = time::interval(self.crash_interval);
458        faults_interval.tick().await; // The first tick returns immediately.
459
460        let start = Instant::now();
461        loop {
462            tokio::select! {
463                // Scrape metrics.
464                now = metrics_interval.tick() => {
465                    let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
466                    display::status(format!("{elapsed}s"));
467
468                    let stdio = self
469                        .ssh_manager
470                        .execute_per_instance(metrics_commands.clone(), CommandContext::default())
471                        .await?;
472                    for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
473                        let measurement = Measurement::from_prometheus::<P>(stdout);
474                        aggregator.add(i, measurement);
475                    }
476
477                    if elapsed > parameters.duration .as_secs() {
478                        break;
479                    }
480                },
481
482                // Kill and recover nodes according to the input schedule.
483                _ = faults_interval.tick() => {
484                    let  action = faults_schedule.update();
485                    if !action.kill.is_empty() {
486                        self.ssh_manager.kill(action.kill.clone(), "node").await?;
487                    }
488                    if !action.boot.is_empty() {
489                        self.boot_nodes(action.boot.clone(), parameters).await?;
490                    }
491                    if !action.kill.is_empty() || !action.boot.is_empty() {
492                        display::newline();
493                        display::config("Testbed update", action);
494                    }
495                }
496            }
497        }
498
499        let results_directory = &self.settings.results_dir;
500        let commit = &self.settings.repository.commit;
501        let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
502            .iter()
503            .collect();
504        fs::create_dir_all(&path).expect("Failed to create log directory");
505        aggregator.save(path);
506
507        display::done();
508        Ok(aggregator)
509    }
510
511    /// Download the log files from the nodes and clients.
512    pub async fn download_logs(
513        &self,
514        parameters: &BenchmarkParameters<T>,
515    ) -> TestbedResult<LogsAnalyzer> {
516        // Select the instances to run.
517        let (clients, nodes, _) = self.select_instances(parameters)?;
518
519        // Create a log sub-directory for this run.
520        let commit = &self.settings.repository.commit;
521        let path: PathBuf = [
522            &self.settings.logs_dir,
523            &format!("logs-{commit}").into(),
524            &format!("logs-{parameters:?}").into(),
525        ]
526        .iter()
527        .collect();
528        fs::create_dir_all(&path).expect("Failed to create log directory");
529
530        // NOTE: Our ssh library does not seem to be able to transfers files in parallel reliably.
531        let mut log_parsers = Vec::new();
532
533        // Download the clients log files.
534        display::action("Downloading clients logs");
535        for (i, instance) in clients.iter().enumerate() {
536            display::status(format!("{}/{}", i + 1, clients.len()));
537
538            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
539            let client_log_content = connection.download("client.log").await?;
540
541            let client_log_file = [path.clone(), format!("client-{i}.log").into()]
542                .iter()
543                .collect::<PathBuf>();
544            fs::write(&client_log_file, client_log_content.as_bytes())
545                .expect("Cannot write log file");
546
547            let mut log_parser = LogsAnalyzer::default();
548            log_parser.set_client_errors(&client_log_content);
549            log_parsers.push(log_parser)
550        }
551        display::done();
552
553        display::action("Downloading nodes logs");
554        for (i, instance) in nodes.iter().enumerate() {
555            display::status(format!("{}/{}", i + 1, nodes.len()));
556
557            let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
558            let node_log_content = connection.download("node.log").await?;
559
560            let node_log_file = [path.clone(), format!("node-{i}.log").into()]
561                .iter()
562                .collect::<PathBuf>();
563            fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
564
565            let mut log_parser = LogsAnalyzer::default();
566            log_parser.set_node_errors(&node_log_content);
567            log_parsers.push(log_parser)
568        }
569        display::done();
570
571        Ok(LogsAnalyzer::aggregate(log_parsers))
572    }
573
574    /// Run all the benchmarks specified by the benchmark generator.
575    pub async fn run_benchmarks(
576        &mut self,
577        mut generator: BenchmarkParametersGenerator<T>,
578    ) -> TestbedResult<()> {
579        display::header("Preparing testbed");
580        display::config("Commit", format!("'{}'", &self.settings.repository.commit));
581        display::newline();
582
583        // Cleanup the testbed (in case the previous run was not completed).
584        self.cleanup(true).await?;
585
586        // Update the software on all instances.
587        if !self.skip_testbed_update {
588            self.install().await?;
589            self.update().await?;
590        }
591
592        // Run all benchmarks.
593        let mut i = 1;
594        let mut latest_committee_size = 0;
595        while let Some(parameters) = generator.next() {
596            display::header(format!("Starting benchmark {i}"));
597            display::config("Benchmark type", &parameters.benchmark_type);
598            display::config("Parameters", &parameters);
599            display::newline();
600
601            // Cleanup the testbed (in case the previous run was not completed).
602            self.cleanup(true).await?;
603            // Start the instance monitoring tools.
604            self.start_monitoring(&parameters).await?;
605
606            // Configure all instances (if needed).
607            if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
608                self.configure(&parameters).await?;
609                latest_committee_size = parameters.nodes;
610            }
611
612            // Deploy the validators.
613            self.run_nodes(&parameters).await?;
614
615            // Deploy the load generators.
616            self.run_clients(&parameters).await?;
617
618            // Wait for the benchmark to terminate. Then save the results and print a summary.
619            let aggregator = self.run(&parameters).await?;
620            aggregator.display_summary();
621            generator.register_result(aggregator);
622            //drop(monitor);
623
624            // Kill the nodes and clients (without deleting the log files).
625            self.cleanup(false).await?;
626
627            // Download the log files.
628            if self.log_processing {
629                let error_counter = self.download_logs(&parameters).await?;
630                error_counter.print_summary();
631            }
632
633            i += 1;
634        }
635
636        display::header("Benchmark completed");
637        Ok(())
638    }
639}