sui_rpc_benchmark/json_rpc/
runner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::request_loader::JsonRpcRequestLine;
5use crate::config::BenchmarkConfig;
6/// This module implements the JSON RPC benchmark runner.
7/// The main function is `run_queries`, which runs the queries concurrently
8/// and records the overall and per-method stats.
9use anyhow::{Context as _, Result};
10use dashmap::DashMap;
11use phf::phf_map;
12use serde::Deserialize;
13use serde_json::Value;
14use std::{
15    collections::HashMap,
16    sync::{Arc, Mutex},
17    time::Instant,
18};
19use sui_futures::stream::TrySpawnStreamExt;
20use tokio::time::timeout;
21use tracing::{debug, info, warn};
22
23/// static map of method names to the index of their cursor parameter
24static METHOD_CURSOR_POSITIONS: phf::Map<&'static str, usize> = phf_map! {
25    // based on function headers in crates/sui-json-rpc-api/src/indexer.rs
26    "suix_getOwnedObjects" => 2,
27    "suix_queryTransactionBlocks" => 1,
28    // based on function headers in crates/sui-json-rpc-api/src/coin.rs
29    "suix_getCoins" => 2,
30    "suix_getAllCoins" => 1,
31};
32
33static METHOD_LENGTHS: phf::Map<&'static str, usize> = phf_map! {
34    // based on function headers in crates/sui-json-rpc-api/src/indexer.rs
35    "suix_getOwnedObjects" => 4,
36    "suix_queryTransactionBlocks" => 4,
37    // based on function headers in crates/sui-json-rpc-api/src/coin.rs
38    "suix_getCoins" => 4,
39    "suix_getAllCoins" => 3,
40};
41
42/// Statistics for a single JSON RPC method
43#[derive(Clone, Default)]
44pub struct PerMethodStats {
45    pub total_sent: usize,
46    pub total_errors: usize,
47    pub total_latency_ms: f64,
48}
49
50/// Aggregated statistics for all JSON RPC requests
51#[derive(Clone, Default)]
52pub struct JsonRpcStats {
53    pub total_sent: usize,
54    pub total_errors: usize,
55    pub total_latency_ms: f64,
56    pub per_method: HashMap<String, PerMethodStats>,
57}
58
59/// Tracks pagination state for active pagination requests
60/// The key is a tuple of method name and the params `Vec<Value>`, where the cursor parameter is set to `null`.
61/// The value is the cursor for the next page.
62#[derive(Default)]
63struct PaginationCursorState {
64    requests: DashMap<(String, Vec<Value>), Value>,
65}
66
67impl JsonRpcStats {
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    fn record_request(&mut self, method: &str, latency_ms: f64, is_error: bool) {
73        self.total_sent += 1;
74        self.total_latency_ms += latency_ms;
75        if is_error {
76            self.total_errors += 1;
77        }
78
79        let method_stats = self.per_method.entry(method.to_string()).or_default();
80        method_stats.total_sent += 1;
81        method_stats.total_latency_ms += latency_ms;
82        if is_error {
83            method_stats.total_errors += 1;
84        }
85    }
86}
87
88impl PaginationCursorState {
89    fn new() -> Self {
90        Self {
91            requests: DashMap::new(),
92        }
93    }
94
95    /// Returns the index of the cursor parameter for a method, if it exists;
96    /// Otherwise, it means no cursor transformation is needed for this method.
97    fn get_method_cursor_index(method: &str) -> Option<usize> {
98        METHOD_CURSOR_POSITIONS.get(method).copied()
99    }
100
101    fn get_method_key(
102        method: &str,
103        params: &[Value],
104    ) -> Result<(String, Vec<Value>), anyhow::Error> {
105        let cursor_idx = METHOD_CURSOR_POSITIONS
106            .get(method)
107            .with_context(|| format!("method {} not found in cursor positions", method))?;
108        let mut key_params = params.to_vec();
109        if let Some(param_to_modify) = key_params.get_mut(*cursor_idx) {
110            *param_to_modify = Value::Null;
111        } else {
112            let method_length = METHOD_LENGTHS
113                .get(method)
114                .with_context(|| format!("method {} not found in method lengths", method))?;
115            key_params.resize(*method_length, Value::Null);
116        }
117        Ok((method.to_string(), key_params))
118    }
119
120    fn update_params_cursor(
121        params: &mut Value,
122        cursor_idx: usize,
123        new_cursor: Option<&Value>,
124        method: &str,
125    ) -> Result<(), anyhow::Error> {
126        let params_array = params
127            .get_mut("params")
128            .and_then(|v| v.as_array_mut())
129            .with_context(|| format!("params not found or not an array for method {}", method))?;
130        // If the cursor parameter is not present, extend the array to include it.
131        if params_array.len() <= cursor_idx {
132            let method_length = METHOD_LENGTHS
133                .get(method)
134                .with_context(|| format!("method {} not found in method lengths", method))?;
135            params_array.resize(*method_length, Value::Null);
136        }
137        let param_to_modify = params_array.get_mut(cursor_idx).with_context(|| {
138            format!(
139                "Failed to access cursor parameter at index {} for method {}",
140                cursor_idx, method
141            )
142        })?;
143        *param_to_modify = match new_cursor {
144            Some(cursor) => cursor.clone(),
145            None => Value::Null,
146        };
147        Ok(())
148    }
149
150    /// Updates the stored cursor for a given method and parameters.
151    /// The new cursor value is read from the response of a successful previous request.
152    ///
153    /// # Arguments
154    /// * `key` - A tuple containing the method name and parameters
155    /// * `cursor` - The new cursor value to store, or None to remove the stored value
156    ///
157    /// # Returns
158    /// * `Option<Value>` - The stored cursor value if it exists, otherwise None
159    fn update(&self, key: (String, Vec<Value>), cursor: Option<Value>) {
160        if let Some(cursor) = cursor {
161            self.requests.insert(key, cursor);
162        } else {
163            self.requests.remove(&key);
164        }
165    }
166
167    /// Returns a stored cursor for a given method and parameters.
168    /// The cursor value is originally read from the response of a successful previous request.
169    ///
170    /// # Arguments
171    /// * `key` - A tuple containing the method name and parameters
172    ///
173    /// # Returns
174    /// * `Option<Value>` - The stored cursor value if it exists, otherwise None
175    fn get(&self, key: &(String, Vec<Value>)) -> Option<Value> {
176        self.requests.get(key).map(|entry| entry.clone())
177    }
178}
179
180pub async fn run_queries(
181    endpoint: &str,
182    requests: &[JsonRpcRequestLine],
183    config: &BenchmarkConfig,
184) -> Result<JsonRpcStats> {
185    let concurrency = config.concurrency;
186    let shared_stats = Arc::new(Mutex::new(JsonRpcStats::new()));
187    let pagination_state = Arc::new(PaginationCursorState::new());
188    let client = reqwest::Client::new();
189    let endpoint = endpoint.to_owned();
190
191    let duration = config.duration;
192    let methods_to_skip = config.json_rpc_methods_to_skip.clone();
193    info!("Skipping methods: {:?}", methods_to_skip);
194    let requests: Vec<_> = requests
195        .iter()
196        .filter(|r| !methods_to_skip.contains(&r.method))
197        // TODO: remove this hack when the SDK has removed all MatchAny & MatchAll related implementation.
198        // Skip suix_getOwnedObjects requests with MatchAny & MatchAll filters b/c it's not supported.
199        .filter(|r| {
200            !(r.method == "suix_getOwnedObjects"
201                && r.body_json
202                    .get("params")
203                    .and_then(|p| p.as_array())
204                    .and_then(|p| p.get(1))
205                    .and_then(|p| p.get("filter"))
206                    .and_then(|f| f.as_object())
207                    .map(|f| f.contains_key("MatchAny") || f.contains_key("MatchAll"))
208                    .unwrap_or(false))
209        })
210        .cloned()
211        .collect();
212    let total_requests = requests.len();
213    debug!(
214        "Starting benchmark with {} requests at concurrency {}",
215        total_requests, concurrency
216    );
217
218    let start_time = Instant::now();
219    let stats = shared_stats.clone();
220    let process_requests = async {
221        #[derive(Debug)]
222        enum BenchmarkError {
223            Other(anyhow::Error),
224        }
225
226        impl From<anyhow::Error> for BenchmarkError {
227            fn from(e: anyhow::Error) -> Self {
228                BenchmarkError::Other(e)
229            }
230        }
231
232        let result = futures::stream::iter(requests.into_iter())
233            .try_for_each_spawned(concurrency, |mut request_line| {
234                let client = client.clone();
235                let endpoint = endpoint.clone();
236                let pagination_state = pagination_state.clone();
237                let task_stats = stats.clone();
238                let params = request_line
239                    .body_json
240                    .get("params")
241                    .and_then(|v| v.as_array())
242                    .map(|a| a.to_vec())
243                    .unwrap_or_else(|| {
244                        // Some methods like rpc.discover might not have params
245                        debug!("No params found for method: {}, using empty array", request_line.method);
246                        Vec::new()
247                    });
248                async move {
249                    // Update the cursor parameter if the request uses pagination
250                    if let Some(cursor_idx) = PaginationCursorState::get_method_cursor_index(&request_line.method)
251                        && !params.is_empty() {
252                            let method_key = match PaginationCursorState::get_method_key(&request_line.method, &params) {
253                                Ok(key) => key,
254                                Err(e) => return Err(BenchmarkError::Other(e)),
255                            };
256                            if let Err(e) = PaginationCursorState::update_params_cursor(
257                                &mut request_line.body_json,
258                                cursor_idx,
259                                pagination_state.get(&method_key).as_ref(),
260                                &request_line.method,
261                            ) {
262                                return Err(BenchmarkError::Other(e));
263                            }
264                        }
265
266                    let now = Instant::now();
267                    debug!("Sending request for method: {} body: {:?}", request_line.method, request_line.body_json);
268                    let res = client
269                        .post(&endpoint)
270                        .json(&request_line.body_json)
271                        .send()
272                        .await;
273                    let elapsed_ms = now.elapsed().as_millis() as f64;
274
275                    // update pagination cursor if the request is successful.
276                    let mut is_error = true;
277                    if let Ok(resp) = res {
278                        let status = resp.status();
279                        let resp_text = match resp.text().await {
280                                    Ok(text) => text,
281                                    Err(e) => {
282                                        return Err(BenchmarkError::Other(anyhow::anyhow!(
283                                            "Failed to get response text for method {}: {}",
284                                            request_line.method, e
285                                        )));
286                                    }
287                                };
288
289                        // Debug log all the responses, but truncate long ones
290                        let response = if resp_text.len() > 1000 {
291                            format!("{}...", &resp_text[..997])
292                        } else {
293                            resp_text.to_string()
294                        };
295                        debug!(
296                            "Response for method: {}, request body: {}, status: {}, response body: {}",
297                            request_line.method, request_line.body_json, status, response
298                        );
299                        if status.is_success() {
300                            let supports_pagination = PaginationCursorState::get_method_cursor_index(&request_line.method).is_some();
301                            if supports_pagination {
302                                #[derive(Debug, Deserialize)]
303                                struct Body {
304                                    result: Result,
305                                }
306                                #[derive(Debug, Deserialize)]
307                                #[serde(rename_all = "camelCase")]
308                                struct Result {
309                                    has_next_page: bool,
310                                    next_cursor: Option<Value>,
311                                }
312
313
314                                let parse_result = serde_json::from_str::<Body>(&resp_text);
315                                debug!("Parsed response for method: {}, result: {:?}", request_line.method, parse_result);
316                                if let Ok(Body { result }) = parse_result {
317                                    let method_key = match PaginationCursorState::get_method_key(
318                                        &request_line.method,
319                                        &params,
320                                    ) {
321                                        Ok(key) => key,
322                                        Err(e) => return Err(BenchmarkError::Other(e)),
323                                    };
324                                    if result.has_next_page {
325                                        debug!("Updated pagination cursor for method: {}, has_next_page: true", 
326                                            request_line.method);
327                                        pagination_state.update(method_key, result.next_cursor);
328                                    } else {
329                                        pagination_state.update(method_key, None);
330                                    }
331                                    is_error = false;
332                                } else {
333                                    warn!(
334                                        method = request_line.method,
335                                        body = ?request_line.body_json,
336                                        error = ?parse_result.err(),
337                                        response = resp_text,
338                                        "Response received but JSON parsing failed"
339                                    );
340                                }
341                            } else {
342                                is_error = false;
343                            }
344                        } else {
345                            warn!(
346                                method = request_line.method,
347                                status = ?status,
348                                body = ?request_line.body_json,
349                                response = resp_text,
350                                "Response received but status is not success"
351                            );
352                        }
353                    } else {
354                        warn!(
355                            method = request_line.method,
356                            body = ?request_line.body_json,
357                            error = ?res,
358                            "Failed to get response"
359                        );
360                    }
361
362                    let mut stats = task_stats
363                        .lock()
364                        .expect("Thread holding stats lock panicked");
365                    stats.record_request(&request_line.method, elapsed_ms, is_error);
366                    Ok::<(), BenchmarkError>(())
367                }
368            })
369            .await;
370
371        match result {
372            Ok(()) => Ok(()),
373            Err(BenchmarkError::Other(e)) => Err(e),
374        }
375    };
376
377    if let Some(duration_val) = duration {
378        match timeout(duration_val, process_requests).await {
379            Ok(result) => result?,
380            Err(_) => debug!("Benchmark timed out after reaching duration limit"),
381        }
382    } else {
383        process_requests.await?;
384    }
385
386    let elapsed = start_time.elapsed();
387    let final_stats = shared_stats
388        .lock()
389        .expect("Thread holding stats lock panicked")
390        .clone();
391    info!(
392        "Benchmark completed in {:?}. Total requests: {}, errors: {}, avg latency: {:.2}ms",
393        elapsed,
394        final_stats.total_sent,
395        final_stats.total_errors,
396        if final_stats.total_sent > 0 {
397            final_stats.total_latency_ms / final_stats.total_sent as f64
398        } else {
399            0.0
400        }
401    );
402
403    Ok(final_stats)
404}