1use std::{
5 collections::{HashMap, VecDeque},
6 fs::{self},
7 marker::PhantomData,
8 path::PathBuf,
9 time::Duration,
10};
11
12use tokio::time::{self, Instant};
13
14use crate::monitor::Monitor;
15use crate::{
16 benchmark::{BenchmarkParameters, BenchmarkParametersGenerator, BenchmarkType},
17 client::Instance,
18 display, ensure,
19 error::{TestbedError, TestbedResult},
20 faults::CrashRecoverySchedule,
21 logs::LogsAnalyzer,
22 measurement::{Measurement, MeasurementsCollection},
23 protocol::{ProtocolCommands, ProtocolMetrics},
24 settings::Settings,
25 ssh::{CommandContext, CommandStatus, SshConnectionManager},
26};
27
28pub struct Orchestrator<P, T> {
30 settings: Settings,
32 instances: Vec<Instance>,
34 benchmark_type: PhantomData<T>,
36 instance_setup_commands: Vec<String>,
38 protocol_commands: P,
41 scrape_interval: Duration,
43 crash_interval: Duration,
45 ssh_manager: SshConnectionManager,
47 skip_testbed_update: bool,
49 skip_testbed_configuration: bool,
51 log_processing: bool,
53 dedicated_clients: usize,
56 skip_monitoring: bool,
58}
59
60impl<P, T> Orchestrator<P, T> {
61 const DEFAULT_SCRAPE_INTERVAL: Duration = Duration::from_secs(15);
63 const DEFAULT_CRASH_INTERVAL: Duration = Duration::from_secs(60);
65
66 pub fn new(
68 settings: Settings,
69 instances: Vec<Instance>,
70 instance_setup_commands: Vec<String>,
71 protocol_commands: P,
72 ssh_manager: SshConnectionManager,
73 ) -> Self {
74 Self {
75 settings,
76 instances,
77 benchmark_type: PhantomData,
78 instance_setup_commands,
79 protocol_commands,
80 ssh_manager,
81 scrape_interval: Self::DEFAULT_SCRAPE_INTERVAL,
82 crash_interval: Self::DEFAULT_CRASH_INTERVAL,
83 skip_testbed_update: false,
84 skip_testbed_configuration: false,
85 log_processing: false,
86 dedicated_clients: 0,
87 skip_monitoring: false,
88 }
89 }
90
91 pub fn with_scrape_interval(mut self, scrape_interval: Duration) -> Self {
93 self.scrape_interval = scrape_interval;
94 self
95 }
96
97 pub fn with_crash_interval(mut self, crash_interval: Duration) -> Self {
99 self.crash_interval = crash_interval;
100 self
101 }
102
103 pub fn skip_testbed_updates(mut self, skip_testbed_update: bool) -> Self {
105 self.skip_testbed_update = skip_testbed_update;
106 self
107 }
108
109 pub fn skip_testbed_configuration(mut self, skip_testbed_configuration: bool) -> Self {
111 self.skip_testbed_configuration = skip_testbed_configuration;
112 self
113 }
114
115 pub fn with_log_processing(mut self, log_processing: bool) -> Self {
117 self.log_processing = log_processing;
118 self
119 }
120
121 pub fn with_dedicated_clients(mut self, dedicated_clients: usize) -> Self {
123 self.dedicated_clients = dedicated_clients;
124 self
125 }
126
127 pub fn skip_monitoring(mut self, skip_monitoring: bool) -> Self {
129 self.skip_monitoring = skip_monitoring;
130 self
131 }
132
133 pub fn select_instances(
137 &self,
138 parameters: &BenchmarkParameters<T>,
139 ) -> TestbedResult<(Vec<Instance>, Vec<Instance>, Option<Instance>)> {
140 let available_instances: Vec<_> = self.instances.iter().filter(|x| x.is_active()).collect();
142 let minimum_instances = if self.skip_monitoring {
143 parameters.nodes + self.dedicated_clients
144 } else {
145 parameters.nodes + self.dedicated_clients + 1
146 };
147 ensure!(
148 available_instances.len() >= minimum_instances,
149 TestbedError::InsufficientCapacity(minimum_instances - available_instances.len())
150 );
151
152 let mut instances_by_regions = HashMap::new();
154 for instance in available_instances {
155 instances_by_regions
156 .entry(&instance.region)
157 .or_insert_with(VecDeque::new)
158 .push_back(instance);
159 }
160
161 let mut monitoring_instance = None;
163 if !self.skip_monitoring {
164 for region in &self.settings.regions {
165 if let Some(regional_instances) = instances_by_regions.get_mut(region) {
166 if let Some(instance) = regional_instances.pop_front() {
167 monitoring_instance = Some(instance.clone());
168 }
169 break;
170 }
171 }
172 }
173
174 let mut client_instances = Vec::new();
176 for region in self.settings.regions.iter().cycle() {
177 if client_instances.len() == self.dedicated_clients {
178 break;
179 }
180 if let Some(regional_instances) = instances_by_regions.get_mut(region)
181 && let Some(instance) = regional_instances.pop_front()
182 {
183 client_instances.push(instance.clone());
184 }
185 }
186
187 let mut nodes_instances = Vec::new();
189 for region in self.settings.regions.iter().cycle() {
190 if nodes_instances.len() == parameters.nodes {
191 break;
192 }
193 if let Some(regional_instances) = instances_by_regions.get_mut(region)
194 && let Some(instance) = regional_instances.pop_front()
195 {
196 nodes_instances.push(instance.clone());
197 }
198 }
199
200 if client_instances.is_empty() {
203 client_instances = nodes_instances.clone();
204 }
205
206 Ok((client_instances, nodes_instances, monitoring_instance))
207 }
208}
209
210impl<P: ProtocolCommands<T> + ProtocolMetrics, T: BenchmarkType> Orchestrator<P, T> {
211 async fn boot_nodes(
213 &self,
214 instances: Vec<Instance>,
215 parameters: &BenchmarkParameters<T>,
216 ) -> TestbedResult<()> {
217 let targets = self
219 .protocol_commands
220 .node_command(instances.clone(), parameters);
221
222 let repo = self.settings.repository_name();
223 let context = CommandContext::new()
224 .run_background("node".into())
225 .with_log_file("~/node.log".into())
226 .with_execute_from_path(repo.into());
227 self.ssh_manager
228 .execute_per_instance(targets, context)
229 .await?;
230
231 let commands = self
233 .protocol_commands
234 .nodes_metrics_command(instances.clone());
235 self.ssh_manager.wait_for_success(commands).await;
236
237 Ok(())
238 }
239
240 pub async fn install(&self) -> TestbedResult<()> {
242 display::action("Installing dependencies on all machines");
243
244 let working_dir = self.settings.working_dir.display();
245 let url = &self.settings.repository.url;
246 let basic_commands = [
247 "sudo apt-get update",
248 "sudo apt-get -y upgrade",
249 "sudo apt-get -y autoremove",
250 "sudo apt-get -y remove needrestart",
252 "sudo apt-get -y install build-essential libssl-dev",
256 "curl --proto \"=https\" --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y",
258 "echo \"source $HOME/.cargo/env\" | tee -a ~/.bashrc",
259 "source $HOME/.cargo/env",
260 "rustup default stable",
261 &format!("mkdir -p {working_dir}"),
263 &format!("(git clone {url} || true)"),
265 ];
266
267 let cloud_provider_specific_dependencies: Vec<_> = self
268 .instance_setup_commands
269 .iter()
270 .map(|x| x.as_str())
271 .collect();
272
273 let protocol_dependencies = self.protocol_commands.protocol_dependencies();
274
275 let command = [
276 &basic_commands[..],
277 &Monitor::dependencies()[..],
278 &cloud_provider_specific_dependencies[..],
279 &protocol_dependencies[..],
280 ]
281 .concat()
282 .join(" && ");
283
284 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
285 let context = CommandContext::default();
286 self.ssh_manager.execute(active, command, context).await?;
287
288 display::done();
289 Ok(())
290 }
291
292 pub async fn start_monitoring(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
294 let (clients, nodes, instance) = self.select_instances(parameters)?;
295 if let Some(instance) = instance {
296 display::action("Configuring monitoring instance");
297
298 let monitor = Monitor::new(instance, clients, nodes, self.ssh_manager.clone());
299 monitor.start_prometheus(&self.protocol_commands).await?;
300 monitor.start_grafana().await?;
301
302 display::done();
303 display::config("Grafana address", monitor.grafana_address());
304 display::newline();
305 }
306
307 Ok(())
308 }
309
310 pub async fn update(&self) -> TestbedResult<()> {
312 display::action("Updating all instances");
313
314 let commit = &self.settings.repository.commit;
318 let command = [
319 "git fetch -f",
320 &format!("(git checkout -b {commit} {commit} || git checkout -f {commit})"),
321 "(git pull -f || true)",
322 "source $HOME/.cargo/env",
323 "cargo build --release",
324 ]
325 .join(" && ");
326
327 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
328
329 let id = "update";
330 let repo_name = self.settings.repository_name();
331 let context = CommandContext::new()
332 .run_background(id.into())
333 .with_execute_from_path(repo_name.into());
334 self.ssh_manager
335 .execute(active.clone(), command, context)
336 .await?;
337
338 self.ssh_manager
340 .wait_for_command(active, id, CommandStatus::Terminated)
341 .await?;
342
343 display::done();
344 Ok(())
345 }
346
347 pub async fn configure(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
349 display::action("Configuring instances");
350
351 let (clients, nodes, _) = self.select_instances(parameters)?;
353
354 let command = self.protocol_commands.genesis_command(nodes.iter());
356 let repo_name = self.settings.repository_name();
357 let context = CommandContext::new().with_execute_from_path(repo_name.into());
358 let all = clients.into_iter().chain(nodes);
359 self.ssh_manager.execute(all, command, context).await?;
360
361 display::done();
362 Ok(())
363 }
364
365 pub async fn cleanup(&self, cleanup: bool) -> TestbedResult<()> {
367 display::action("Cleaning up testbed");
368
369 let mut command = vec!["(tmux kill-server || true)".into()];
371 for path in self.protocol_commands.db_directories() {
372 command.push(format!("(rm -rf {} || true)", path.display()));
373 }
374 if cleanup {
375 command.push("(rm -rf ~/*log* || true)".into());
376 }
377 let command = command.join(" ; ");
378
379 let active = self.instances.iter().filter(|x| x.is_active()).cloned();
381 let context = CommandContext::default();
382 self.ssh_manager.execute(active, command, context).await?;
383
384 display::done();
385 Ok(())
386 }
387
388 pub async fn run_nodes(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
390 display::action("Deploying validators");
391
392 let (_, nodes, _) = self.select_instances(parameters)?;
394
395 self.boot_nodes(nodes, parameters).await?;
397
398 display::done();
399 Ok(())
400 }
401
402 pub async fn run_clients(&self, parameters: &BenchmarkParameters<T>) -> TestbedResult<()> {
404 display::action("Setting up load generators");
405
406 let (clients, _, _) = self.select_instances(parameters)?;
408
409 let targets = self
411 .protocol_commands
412 .client_command(clients.clone(), parameters);
413
414 let repo = self.settings.repository_name();
415 let context = CommandContext::new()
416 .run_background("client".into())
417 .with_log_file("~/client.log".into())
418 .with_execute_from_path(repo.into());
419 self.ssh_manager
420 .execute_per_instance(targets, context)
421 .await?;
422
423 let commands = self.protocol_commands.clients_metrics_command(clients);
425 self.ssh_manager.wait_for_success(commands).await;
426
427 display::done();
428 Ok(())
429 }
430
431 pub async fn run(
433 &self,
434 parameters: &BenchmarkParameters<T>,
435 ) -> TestbedResult<MeasurementsCollection<T>> {
436 display::action(format!(
437 "Scraping metrics (at least {}s)",
438 parameters.duration.as_secs()
439 ));
440
441 let (clients, nodes, _) = self.select_instances(parameters)?;
443
444 let mut metrics_commands = self.protocol_commands.clients_metrics_command(clients);
446
447 metrics_commands.append(&mut self.protocol_commands.nodes_metrics_command(nodes.clone()));
450
451 let mut aggregator = MeasurementsCollection::new(&self.settings, parameters.clone());
452 let mut metrics_interval = time::interval(self.scrape_interval);
453 metrics_interval.tick().await; let faults_type = parameters.faults.clone();
456 let mut faults_schedule = CrashRecoverySchedule::new(faults_type, nodes.clone());
457 let mut faults_interval = time::interval(self.crash_interval);
458 faults_interval.tick().await; let start = Instant::now();
461 loop {
462 tokio::select! {
463 now = metrics_interval.tick() => {
465 let elapsed = now.duration_since(start).as_secs_f64().ceil() as u64;
466 display::status(format!("{elapsed}s"));
467
468 let stdio = self
469 .ssh_manager
470 .execute_per_instance(metrics_commands.clone(), CommandContext::default())
471 .await?;
472 for (i, (stdout, _stderr)) in stdio.iter().enumerate() {
473 let measurement = Measurement::from_prometheus::<P>(stdout);
474 aggregator.add(i, measurement);
475 }
476
477 if elapsed > parameters.duration .as_secs() {
478 break;
479 }
480 },
481
482 _ = faults_interval.tick() => {
484 let action = faults_schedule.update();
485 if !action.kill.is_empty() {
486 self.ssh_manager.kill(action.kill.clone(), "node").await?;
487 }
488 if !action.boot.is_empty() {
489 self.boot_nodes(action.boot.clone(), parameters).await?;
490 }
491 if !action.kill.is_empty() || !action.boot.is_empty() {
492 display::newline();
493 display::config("Testbed update", action);
494 }
495 }
496 }
497 }
498
499 let results_directory = &self.settings.results_dir;
500 let commit = &self.settings.repository.commit;
501 let path: PathBuf = [results_directory, &format!("results-{commit}").into()]
502 .iter()
503 .collect();
504 fs::create_dir_all(&path).expect("Failed to create log directory");
505 aggregator.save(path);
506
507 display::done();
508 Ok(aggregator)
509 }
510
511 pub async fn download_logs(
513 &self,
514 parameters: &BenchmarkParameters<T>,
515 ) -> TestbedResult<LogsAnalyzer> {
516 let (clients, nodes, _) = self.select_instances(parameters)?;
518
519 let commit = &self.settings.repository.commit;
521 let path: PathBuf = [
522 &self.settings.logs_dir,
523 &format!("logs-{commit}").into(),
524 &format!("logs-{parameters:?}").into(),
525 ]
526 .iter()
527 .collect();
528 fs::create_dir_all(&path).expect("Failed to create log directory");
529
530 let mut log_parsers = Vec::new();
532
533 display::action("Downloading clients logs");
535 for (i, instance) in clients.iter().enumerate() {
536 display::status(format!("{}/{}", i + 1, clients.len()));
537
538 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
539 let client_log_content = connection.download("client.log").await?;
540
541 let client_log_file = [path.clone(), format!("client-{i}.log").into()]
542 .iter()
543 .collect::<PathBuf>();
544 fs::write(&client_log_file, client_log_content.as_bytes())
545 .expect("Cannot write log file");
546
547 let mut log_parser = LogsAnalyzer::default();
548 log_parser.set_client_errors(&client_log_content);
549 log_parsers.push(log_parser)
550 }
551 display::done();
552
553 display::action("Downloading nodes logs");
554 for (i, instance) in nodes.iter().enumerate() {
555 display::status(format!("{}/{}", i + 1, nodes.len()));
556
557 let connection = self.ssh_manager.connect(instance.ssh_address()).await?;
558 let node_log_content = connection.download("node.log").await?;
559
560 let node_log_file = [path.clone(), format!("node-{i}.log").into()]
561 .iter()
562 .collect::<PathBuf>();
563 fs::write(&node_log_file, node_log_content.as_bytes()).expect("Cannot write log file");
564
565 let mut log_parser = LogsAnalyzer::default();
566 log_parser.set_node_errors(&node_log_content);
567 log_parsers.push(log_parser)
568 }
569 display::done();
570
571 Ok(LogsAnalyzer::aggregate(log_parsers))
572 }
573
574 pub async fn run_benchmarks(
576 &mut self,
577 mut generator: BenchmarkParametersGenerator<T>,
578 ) -> TestbedResult<()> {
579 display::header("Preparing testbed");
580 display::config("Commit", format!("'{}'", &self.settings.repository.commit));
581 display::newline();
582
583 self.cleanup(true).await?;
585
586 if !self.skip_testbed_update {
588 self.install().await?;
589 self.update().await?;
590 }
591
592 let mut i = 1;
594 let mut latest_committee_size = 0;
595 while let Some(parameters) = generator.next() {
596 display::header(format!("Starting benchmark {i}"));
597 display::config("Benchmark type", ¶meters.benchmark_type);
598 display::config("Parameters", ¶meters);
599 display::newline();
600
601 self.cleanup(true).await?;
603 self.start_monitoring(¶meters).await?;
605
606 if !self.skip_testbed_configuration && latest_committee_size != parameters.nodes {
608 self.configure(¶meters).await?;
609 latest_committee_size = parameters.nodes;
610 }
611
612 self.run_nodes(¶meters).await?;
614
615 self.run_clients(¶meters).await?;
617
618 let aggregator = self.run(¶meters).await?;
620 aggregator.display_summary();
621 generator.register_result(aggregator);
622 self.cleanup(false).await?;
626
627 if self.log_processing {
629 let error_counter = self.download_logs(¶meters).await?;
630 error_counter.print_summary();
631 }
632
633 i += 1;
634 }
635
636 display::header("Benchmark completed");
637 Ok(())
638 }
639}