sui_aws_orchestrator/protocol/
narwhal.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    fmt::{Debug, Display},
6    path::PathBuf,
7    str::FromStr,
8};
9
10use crate::{
11    benchmark::{BenchmarkParameters, BenchmarkType},
12    client::Instance,
13    settings::Settings,
14};
15use serde::{Deserialize, Serialize};
16
17use super::{ProtocolCommands, ProtocolMetrics};
18
19const NUM_WORKERS: usize = 1;
20const BASE_PORT: usize = 5000;
21
22// Narwhal default metrics port.
23const DEFAULT_PORT: usize = 9184;
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct NarwhalBenchmarkType {
27    /// The size of each transaction in bytes
28    size: usize,
29}
30
31impl Debug for NarwhalBenchmarkType {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "{}", self.size)
34    }
35}
36
37impl Display for NarwhalBenchmarkType {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "tx size {}b", self.size)
40    }
41}
42
43impl FromStr for NarwhalBenchmarkType {
44    type Err = std::num::ParseIntError;
45
46    fn from_str(s: &str) -> Result<Self, Self::Err> {
47        Ok(Self {
48            size: s.parse::<usize>()?.min(1000000),
49        })
50    }
51}
52
53impl BenchmarkType for NarwhalBenchmarkType {}
54
55/// All configurations information to run a narwhal client or validator.
56pub struct NarwhalProtocol {
57    working_dir: PathBuf,
58}
59
60impl ProtocolCommands<NarwhalBenchmarkType> for NarwhalProtocol {
61    fn protocol_dependencies(&self) -> Vec<&'static str> {
62        vec![
63            // Install typical narwhal dependencies.
64            "sudo apt-get -y install curl git-all clang cmake gcc libssl-dev pkg-config libclang-dev",
65            "sudo apt-get -y install libpq-dev",
66        ]
67    }
68
69    fn db_directories(&self) -> Vec<PathBuf> {
70        let consensus_db = [&self.working_dir, &"db-*".to_string().into()]
71            .iter()
72            .collect();
73
74        let narwhal_config = [&self.working_dir].iter().collect();
75        vec![consensus_db, narwhal_config]
76    }
77
78    fn genesis_command<'a, I>(&self, instances: I) -> String
79    where
80        I: Iterator<Item = &'a Instance>,
81    {
82        let working_dir = self.working_dir.display();
83        let ips = instances
84            .map(|x| x.main_ip.to_string())
85            .collect::<Vec<_>>()
86            .join(" ");
87
88        let genesis = [
89            "cargo run --release --bin narwhal-node benchmark-genesis",
90            &format!(
91                " --working-directory {working_dir} --ips {ips} --num-workers {NUM_WORKERS} --base-port {BASE_PORT}"
92            ),
93        ]
94        .join(" ");
95
96        [
97            &format!("mkdir -p {working_dir}"),
98            "source $HOME/.cargo/env",
99            &genesis,
100        ]
101        .join(" && ")
102    }
103
104    fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
105    where
106        I: IntoIterator<Item = Instance>,
107    {
108        vec![]
109    }
110
111    fn node_command<I>(
112        &self,
113        instances: I,
114        parameters: &BenchmarkParameters<NarwhalBenchmarkType>,
115    ) -> Vec<(Instance, String)>
116    where
117        I: IntoIterator<Item = Instance>,
118    {
119        let working_dir = self.working_dir.clone();
120        let hosts: Vec<_> = instances.into_iter().collect();
121        // 2 ports used per authority so add 2 * num authorities to base port
122        let mut worker_base_port = BASE_PORT + (2 * hosts.len());
123
124        let transaction_addresses: Vec<_> = hosts
125            .iter()
126            .map(|instance| {
127                let transaction_address =
128                    format!("http://{}:{}", instance.main_ip, worker_base_port);
129                worker_base_port += 2;
130                transaction_address
131            })
132            .collect();
133
134        hosts
135            .into_iter()
136            .enumerate()
137            .map(|(i, instance)| {
138                let primary_keys: PathBuf = [&working_dir, &format!("primary-{i}-key.json").into()]
139                    .iter()
140                    .collect();
141                let primary_network_keys: PathBuf = [
142                    &working_dir,
143                    &format!("primary-{i}-network-key.json").into(),
144                ]
145                .iter()
146                .collect();
147                // todo: add logic for multiple workers
148                let worker_keys: PathBuf = [&working_dir, &format!("worker-{i}-key.json").into()]
149                    .iter()
150                    .collect();
151                let committee: PathBuf = [&working_dir, &"committee.json".to_string().into()]
152                    .iter()
153                    .collect();
154                let workers: PathBuf = [&working_dir, &"workers.json".to_string().into()]
155                    .iter()
156                    .collect();
157                let store: PathBuf = [&working_dir, &format!("db-{i}").into()].iter().collect();
158                let nw_parameters: PathBuf = [&working_dir, &"parameters.json".to_string().into()]
159                    .iter()
160                    .collect();
161
162                let run = [
163                    "sudo sysctl -w net.core.wmem_max=104857600 && ",
164                    "sudo sysctl -w net.core.rmem_max=104857600 && ",
165                    "ulimit -n 51200 && ", // required so we can scale the client
166                    "RUST_LOG=debug cargo run --release --bin narwhal-node run ",
167                    &format!(
168                        "--primary-keys {} --primary-network-keys {} ",
169                        primary_keys.display(),
170                        primary_network_keys.display()
171                    ),
172                    &format!(
173                        "--worker-keys {} --committee {} --workers {} ",
174                        worker_keys.display(),
175                        committee.display(),
176                        workers.display()
177                    ),
178                    &format!(
179                        "--store {} --parameters {} benchmark ",
180                        store.display(),
181                        nw_parameters.display()
182                    ),
183                    &format!(
184                        "--worker-id 0 --addr {} --size {} --rate {} --nodes {}",
185                        transaction_addresses[i],
186                        parameters.benchmark_type.size,
187                        parameters.load / parameters.nodes,
188                        transaction_addresses.join(","),
189                    ),
190                ]
191                .join(" ");
192                let command = ["source $HOME/.cargo/env", &run].join(" && ");
193
194                (instance, command)
195            })
196            .collect()
197    }
198
199    fn client_command<I>(
200        &self,
201        _instances: I,
202        _parameters: &BenchmarkParameters<NarwhalBenchmarkType>,
203    ) -> Vec<(Instance, String)>
204    where
205        I: IntoIterator<Item = Instance>,
206    {
207        // client is started in process with the primary/worker via node_command,
208        // so nothing to start here.
209        vec![]
210    }
211}
212
213impl NarwhalProtocol {
214    /// Make a new instance of the Narwhal protocol commands generator.
215    pub fn new(settings: &Settings) -> Self {
216        Self {
217            working_dir: [&settings.working_dir, &"narwhal_config".into()]
218                .iter()
219                .collect(),
220        }
221    }
222}
223
224impl ProtocolMetrics for NarwhalProtocol {
225    const BENCHMARK_DURATION: &'static str = "narwhal_benchmark_duration";
226    // TODO: Improve metrics used for benchmark summary.
227    // Currently the only route should be `SubmitTransaction` so this should be a
228    // good proxy for total tx
229    const TOTAL_TRANSACTIONS: &'static str = "worker_req_latency_by_route_count";
230    // Does not include the time taken for the tx to be included in the batch, only
231    // from batch creation to when the batch is fetched for execution
232    const LATENCY_BUCKETS: &'static str = "batch_execution_latency";
233    const LATENCY_SUM: &'static str = "batch_execution_latency_sum";
234    // Measuring client submit latency but this only factors in submission and
235    // not time to commit
236    const LATENCY_SQUARED_SUM: &'static str = "narwhal_client_latency_squared_s";
237
238    fn nodes_metrics_path<I>(&self, instances: I) -> Vec<(Instance, String)>
239    where
240        I: IntoIterator<Item = Instance>,
241    {
242        instances
243            .into_iter()
244            .map(|instance| {
245                let path = format!(
246                    "{}:{}{}",
247                    instance.main_ip,
248                    DEFAULT_PORT,
249                    mysten_metrics::METRICS_ROUTE
250                );
251                (instance, path)
252            })
253            .collect()
254    }
255
256    fn clients_metrics_path<I>(&self, _instances: I) -> Vec<(Instance, String)>
257    where
258        I: IntoIterator<Item = Instance>,
259    {
260        vec![]
261    }
262}