sui_rpc_loadgen/
load_test.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::error::Error;
5use std::time::{Duration, Instant};
6use tokio::sync::mpsc::Sender;
7
8use tokio::sync::mpsc;
9use tracing::error;
10
11use crate::payload::{Command, Payload, Processor, SignerInfo};
12
13struct WorkerThread<R: Processor + Send + Sync + Clone> {
14    processor: R,
15    payload: Payload,
16}
17
18impl<R: Processor + Send + Sync + Clone> WorkerThread<R> {
19    async fn run(&self) -> usize {
20        let mut successful_commands = 0;
21        match self.processor.apply(&self.payload).await {
22            Ok(()) => successful_commands += 1,
23            Err(e) => error!("Thread returns error: {e}"),
24        }
25        successful_commands
26    }
27}
28
29pub struct LoadTestConfig {
30    // TODO: support multiple commands
31    pub command: Command,
32    pub num_threads: usize,
33    /// should divide tasks across multiple threads
34    pub divide_tasks: bool,
35    pub signer_info: Option<SignerInfo>,
36    pub num_chunks_per_thread: usize,
37    pub max_repeat: usize,
38}
39
40pub(crate) struct LoadTest<R: Processor + Send + Sync + Clone> {
41    pub processor: R,
42    pub config: LoadTestConfig,
43}
44
45impl<R: Processor + Send + Sync + Clone + 'static> LoadTest<R> {
46    pub(crate) async fn run(&self) -> Result<(), Box<dyn Error>> {
47        let start_time = Instant::now();
48        let payloads = self.processor.prepare(&self.config).await?;
49        let (tx, mut rx) = mpsc::channel(payloads.len());
50
51        self.run_workers(tx, payloads).await;
52
53        // Collect the results from the worker threads
54        let mut num_successful_commands = 0;
55        while let Some(num_successful) = rx.recv().await {
56            num_successful_commands += num_successful;
57        }
58
59        let elapsed_time = start_time.elapsed();
60        // TODO(chris): clean up this logic
61        let total_commands = num_successful_commands
62            * (self.config.max_repeat + 1)
63            * self.config.num_chunks_per_thread;
64
65        println!(
66            "Total successful commands: {}, total time {:?}, commands per second {:.2}",
67            total_commands,
68            elapsed_time,
69            get_tps(total_commands, elapsed_time),
70        );
71
72        self.processor.dump_cache_to_file(&self.config);
73
74        Ok(())
75    }
76
77    async fn run_workers(&self, tx: Sender<usize>, payloads: Vec<Payload>) {
78        println!("Running with {} threads...", payloads.len());
79        for payload in payloads.iter() {
80            let tx = tx.clone();
81            let worker_thread = WorkerThread {
82                processor: self.processor.clone(),
83                payload: payload.clone(),
84            };
85            tokio::spawn(async move {
86                let num_successful_commands = worker_thread.run().await;
87                tx.send(num_successful_commands).await.unwrap();
88            });
89        }
90    }
91}
92
93fn get_tps(num: usize, duration: Duration) -> f64 {
94    num as f64 / duration.as_secs_f64()
95}