sui_rpc_loadgen/
load_test.rs1use std::error::Error;
5use std::time::{Duration, Instant};
6use tokio::sync::mpsc::Sender;
7
8use tokio::sync::mpsc;
9use tracing::error;
10
11use crate::payload::{Command, Payload, Processor, SignerInfo};
12
13struct WorkerThread<R: Processor + Send + Sync + Clone> {
14 processor: R,
15 payload: Payload,
16}
17
18impl<R: Processor + Send + Sync + Clone> WorkerThread<R> {
19 async fn run(&self) -> usize {
20 let mut successful_commands = 0;
21 match self.processor.apply(&self.payload).await {
22 Ok(()) => successful_commands += 1,
23 Err(e) => error!("Thread returns error: {e}"),
24 }
25 successful_commands
26 }
27}
28
29pub struct LoadTestConfig {
30 pub command: Command,
32 pub num_threads: usize,
33 pub divide_tasks: bool,
35 pub signer_info: Option<SignerInfo>,
36 pub num_chunks_per_thread: usize,
37 pub max_repeat: usize,
38}
39
40pub(crate) struct LoadTest<R: Processor + Send + Sync + Clone> {
41 pub processor: R,
42 pub config: LoadTestConfig,
43}
44
45impl<R: Processor + Send + Sync + Clone + 'static> LoadTest<R> {
46 pub(crate) async fn run(&self) -> Result<(), Box<dyn Error>> {
47 let start_time = Instant::now();
48 let payloads = self.processor.prepare(&self.config).await?;
49 let (tx, mut rx) = mpsc::channel(payloads.len());
50
51 self.run_workers(tx, payloads).await;
52
53 let mut num_successful_commands = 0;
55 while let Some(num_successful) = rx.recv().await {
56 num_successful_commands += num_successful;
57 }
58
59 let elapsed_time = start_time.elapsed();
60 let total_commands = num_successful_commands
62 * (self.config.max_repeat + 1)
63 * self.config.num_chunks_per_thread;
64
65 println!(
66 "Total successful commands: {}, total time {:?}, commands per second {:.2}",
67 total_commands,
68 elapsed_time,
69 get_tps(total_commands, elapsed_time),
70 );
71
72 self.processor.dump_cache_to_file(&self.config);
73
74 Ok(())
75 }
76
77 async fn run_workers(&self, tx: Sender<usize>, payloads: Vec<Payload>) {
78 println!("Running with {} threads...", payloads.len());
79 for payload in payloads.iter() {
80 let tx = tx.clone();
81 let worker_thread = WorkerThread {
82 processor: self.processor.clone(),
83 payload: payload.clone(),
84 };
85 tokio::spawn(async move {
86 let num_successful_commands = worker_thread.run().await;
87 tx.send(num_successful_commands).await.unwrap();
88 });
89 }
90 }
91}
92
93fn get_tps(num: usize, duration: Duration) -> f64 {
94 num as f64 / duration.as_secs_f64()
95}