sui_aws_orchestrator/protocol/
narwhal.rs1use std::{
5 fmt::{Debug, Display},
6 path::PathBuf,
7 str::FromStr,
8};
9
10use crate::{
11 benchmark::{BenchmarkParameters, BenchmarkType},
12 client::Instance,
13 settings::Settings,
14};
15use serde::{Deserialize, Serialize};
16
17use super::{ProtocolCommands, ProtocolMetrics};
18
19const NUM_WORKERS: usize = 1;
20const BASE_PORT: usize = 5000;
21
22const DEFAULT_PORT: usize = 9184;
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct NarwhalBenchmarkType {
27 size: usize,
29}
30
31impl Debug for NarwhalBenchmarkType {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(f, "{}", self.size)
34 }
35}
36
37impl Display for NarwhalBenchmarkType {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "tx size {}b", self.size)
40 }
41}
42
43impl FromStr for NarwhalBenchmarkType {
44 type Err = std::num::ParseIntError;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 Ok(Self {
48 size: s.parse::<usize>()?.min(1000000),
49 })
50 }
51}
52
53impl BenchmarkType for NarwhalBenchmarkType {}
54
55pub struct NarwhalProtocol {
57 working_dir: PathBuf,
58}
59
60impl ProtocolCommands<NarwhalBenchmarkType> for NarwhalProtocol {
61 fn protocol_dependencies(&self) -> Vec<&'static str> {
62 vec![
63 "sudo apt-get -y install curl git-all clang cmake gcc libssl-dev pkg-config libclang-dev",
65 "sudo apt-get -y install libpq-dev",
66 ]
67 }
68
69 fn db_directories(&self) -> Vec<PathBuf> {
70 let consensus_db = [&self.working_dir, &"db-*".to_string().into()]
71 .iter()
72 .collect();
73
74 let narwhal_config = [&self.working_dir].iter().collect();
75 vec![consensus_db, narwhal_config]
76 }
77
78 fn genesis_command<'a, I>(&self, instances: I) -> String
79 where
80 I: Iterator<Item = &'a Instance>,
81 {
82 let working_dir = self.working_dir.display();
83 let ips = instances
84 .map(|x| x.main_ip.to_string())
85 .collect::<Vec<_>>()
86 .join(" ");
87
88 let genesis = [
89 "cargo run --release --bin narwhal-node benchmark-genesis",
90 &format!(
91 " --working-directory {working_dir} --ips {ips} --num-workers {NUM_WORKERS} --base-port {BASE_PORT}"
92 ),
93 ]
94 .join(" ");
95
96 [
97 &format!("mkdir -p {working_dir}"),
98 "source $HOME/.cargo/env",
99 &genesis,
100 ]
101 .join(" && ")
102 }
103
104 fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
105 where
106 I: IntoIterator<Item = Instance>,
107 {
108 vec![]
109 }
110
111 fn node_command<I>(
112 &self,
113 instances: I,
114 parameters: &BenchmarkParameters<NarwhalBenchmarkType>,
115 ) -> Vec<(Instance, String)>
116 where
117 I: IntoIterator<Item = Instance>,
118 {
119 let working_dir = self.working_dir.clone();
120 let hosts: Vec<_> = instances.into_iter().collect();
121 let mut worker_base_port = BASE_PORT + (2 * hosts.len());
123
124 let transaction_addresses: Vec<_> = hosts
125 .iter()
126 .map(|instance| {
127 let transaction_address =
128 format!("http://{}:{}", instance.main_ip, worker_base_port);
129 worker_base_port += 2;
130 transaction_address
131 })
132 .collect();
133
134 hosts
135 .into_iter()
136 .enumerate()
137 .map(|(i, instance)| {
138 let primary_keys: PathBuf = [&working_dir, &format!("primary-{i}-key.json").into()]
139 .iter()
140 .collect();
141 let primary_network_keys: PathBuf = [
142 &working_dir,
143 &format!("primary-{i}-network-key.json").into(),
144 ]
145 .iter()
146 .collect();
147 let worker_keys: PathBuf = [&working_dir, &format!("worker-{i}-key.json").into()]
149 .iter()
150 .collect();
151 let committee: PathBuf = [&working_dir, &"committee.json".to_string().into()]
152 .iter()
153 .collect();
154 let workers: PathBuf = [&working_dir, &"workers.json".to_string().into()]
155 .iter()
156 .collect();
157 let store: PathBuf = [&working_dir, &format!("db-{i}").into()].iter().collect();
158 let nw_parameters: PathBuf = [&working_dir, &"parameters.json".to_string().into()]
159 .iter()
160 .collect();
161
162 let run = [
163 "sudo sysctl -w net.core.wmem_max=104857600 && ",
164 "sudo sysctl -w net.core.rmem_max=104857600 && ",
165 "ulimit -n 51200 && ", "RUST_LOG=debug cargo run --release --bin narwhal-node run ",
167 &format!(
168 "--primary-keys {} --primary-network-keys {} ",
169 primary_keys.display(),
170 primary_network_keys.display()
171 ),
172 &format!(
173 "--worker-keys {} --committee {} --workers {} ",
174 worker_keys.display(),
175 committee.display(),
176 workers.display()
177 ),
178 &format!(
179 "--store {} --parameters {} benchmark ",
180 store.display(),
181 nw_parameters.display()
182 ),
183 &format!(
184 "--worker-id 0 --addr {} --size {} --rate {} --nodes {}",
185 transaction_addresses[i],
186 parameters.benchmark_type.size,
187 parameters.load / parameters.nodes,
188 transaction_addresses.join(","),
189 ),
190 ]
191 .join(" ");
192 let command = ["source $HOME/.cargo/env", &run].join(" && ");
193
194 (instance, command)
195 })
196 .collect()
197 }
198
199 fn client_command<I>(
200 &self,
201 _instances: I,
202 _parameters: &BenchmarkParameters<NarwhalBenchmarkType>,
203 ) -> Vec<(Instance, String)>
204 where
205 I: IntoIterator<Item = Instance>,
206 {
207 vec![]
210 }
211}
212
213impl NarwhalProtocol {
214 pub fn new(settings: &Settings) -> Self {
216 Self {
217 working_dir: [&settings.working_dir, &"narwhal_config".into()]
218 .iter()
219 .collect(),
220 }
221 }
222}
223
224impl ProtocolMetrics for NarwhalProtocol {
225 const BENCHMARK_DURATION: &'static str = "narwhal_benchmark_duration";
226 const TOTAL_TRANSACTIONS: &'static str = "worker_req_latency_by_route_count";
230 const LATENCY_BUCKETS: &'static str = "batch_execution_latency";
233 const LATENCY_SUM: &'static str = "batch_execution_latency_sum";
234 const LATENCY_SQUARED_SUM: &'static str = "narwhal_client_latency_squared_s";
237
238 fn nodes_metrics_path<I>(&self, instances: I) -> Vec<(Instance, String)>
239 where
240 I: IntoIterator<Item = Instance>,
241 {
242 instances
243 .into_iter()
244 .map(|instance| {
245 let path = format!(
246 "{}:{}{}",
247 instance.main_ip,
248 DEFAULT_PORT,
249 mysten_metrics::METRICS_ROUTE
250 );
251 (instance, path)
252 })
253 .collect()
254 }
255
256 fn clients_metrics_path<I>(&self, _instances: I) -> Vec<(Instance, String)>
257 where
258 I: IntoIterator<Item = Instance>,
259 {
260 vec![]
261 }
262}