sui_rpc_benchmark/json_rpc/
runner.rs1use super::request_loader::JsonRpcRequestLine;
5use crate::config::BenchmarkConfig;
6use anyhow::{Context as _, Result};
10use dashmap::DashMap;
11use phf::phf_map;
12use serde::Deserialize;
13use serde_json::Value;
14use std::{
15 collections::HashMap,
16 sync::{Arc, Mutex},
17 time::Instant,
18};
19use sui_futures::stream::TrySpawnStreamExt;
20use tokio::time::timeout;
21use tracing::{debug, info, warn};
22
23static METHOD_CURSOR_POSITIONS: phf::Map<&'static str, usize> = phf_map! {
25 "suix_getOwnedObjects" => 2,
27 "suix_queryTransactionBlocks" => 1,
28 "suix_getCoins" => 2,
30 "suix_getAllCoins" => 1,
31};
32
33static METHOD_LENGTHS: phf::Map<&'static str, usize> = phf_map! {
34 "suix_getOwnedObjects" => 4,
36 "suix_queryTransactionBlocks" => 4,
37 "suix_getCoins" => 4,
39 "suix_getAllCoins" => 3,
40};
41
42#[derive(Clone, Default)]
44pub struct PerMethodStats {
45 pub total_sent: usize,
46 pub total_errors: usize,
47 pub total_latency_ms: f64,
48}
49
50#[derive(Clone, Default)]
52pub struct JsonRpcStats {
53 pub total_sent: usize,
54 pub total_errors: usize,
55 pub total_latency_ms: f64,
56 pub per_method: HashMap<String, PerMethodStats>,
57}
58
59#[derive(Default)]
63struct PaginationCursorState {
64 requests: DashMap<(String, Vec<Value>), Value>,
65}
66
67impl JsonRpcStats {
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 fn record_request(&mut self, method: &str, latency_ms: f64, is_error: bool) {
73 self.total_sent += 1;
74 self.total_latency_ms += latency_ms;
75 if is_error {
76 self.total_errors += 1;
77 }
78
79 let method_stats = self.per_method.entry(method.to_string()).or_default();
80 method_stats.total_sent += 1;
81 method_stats.total_latency_ms += latency_ms;
82 if is_error {
83 method_stats.total_errors += 1;
84 }
85 }
86}
87
88impl PaginationCursorState {
89 fn new() -> Self {
90 Self {
91 requests: DashMap::new(),
92 }
93 }
94
95 fn get_method_cursor_index(method: &str) -> Option<usize> {
98 METHOD_CURSOR_POSITIONS.get(method).copied()
99 }
100
101 fn get_method_key(
102 method: &str,
103 params: &[Value],
104 ) -> Result<(String, Vec<Value>), anyhow::Error> {
105 let cursor_idx = METHOD_CURSOR_POSITIONS
106 .get(method)
107 .with_context(|| format!("method {} not found in cursor positions", method))?;
108 let mut key_params = params.to_vec();
109 if let Some(param_to_modify) = key_params.get_mut(*cursor_idx) {
110 *param_to_modify = Value::Null;
111 } else {
112 let method_length = METHOD_LENGTHS
113 .get(method)
114 .with_context(|| format!("method {} not found in method lengths", method))?;
115 key_params.resize(*method_length, Value::Null);
116 }
117 Ok((method.to_string(), key_params))
118 }
119
120 fn update_params_cursor(
121 params: &mut Value,
122 cursor_idx: usize,
123 new_cursor: Option<&Value>,
124 method: &str,
125 ) -> Result<(), anyhow::Error> {
126 let params_array = params
127 .get_mut("params")
128 .and_then(|v| v.as_array_mut())
129 .with_context(|| format!("params not found or not an array for method {}", method))?;
130 if params_array.len() <= cursor_idx {
132 let method_length = METHOD_LENGTHS
133 .get(method)
134 .with_context(|| format!("method {} not found in method lengths", method))?;
135 params_array.resize(*method_length, Value::Null);
136 }
137 let param_to_modify = params_array.get_mut(cursor_idx).with_context(|| {
138 format!(
139 "Failed to access cursor parameter at index {} for method {}",
140 cursor_idx, method
141 )
142 })?;
143 *param_to_modify = match new_cursor {
144 Some(cursor) => cursor.clone(),
145 None => Value::Null,
146 };
147 Ok(())
148 }
149
150 fn update(&self, key: (String, Vec<Value>), cursor: Option<Value>) {
160 if let Some(cursor) = cursor {
161 self.requests.insert(key, cursor);
162 } else {
163 self.requests.remove(&key);
164 }
165 }
166
167 fn get(&self, key: &(String, Vec<Value>)) -> Option<Value> {
176 self.requests.get(key).map(|entry| entry.clone())
177 }
178}
179
180pub async fn run_queries(
181 endpoint: &str,
182 requests: &[JsonRpcRequestLine],
183 config: &BenchmarkConfig,
184) -> Result<JsonRpcStats> {
185 let concurrency = config.concurrency;
186 let shared_stats = Arc::new(Mutex::new(JsonRpcStats::new()));
187 let pagination_state = Arc::new(PaginationCursorState::new());
188 let client = reqwest::Client::new();
189 let endpoint = endpoint.to_owned();
190
191 let duration = config.duration;
192 let methods_to_skip = config.json_rpc_methods_to_skip.clone();
193 info!("Skipping methods: {:?}", methods_to_skip);
194 let requests: Vec<_> = requests
195 .iter()
196 .filter(|r| !methods_to_skip.contains(&r.method))
197 .filter(|r| {
200 !(r.method == "suix_getOwnedObjects"
201 && r.body_json
202 .get("params")
203 .and_then(|p| p.as_array())
204 .and_then(|p| p.get(1))
205 .and_then(|p| p.get("filter"))
206 .and_then(|f| f.as_object())
207 .map(|f| f.contains_key("MatchAny") || f.contains_key("MatchAll"))
208 .unwrap_or(false))
209 })
210 .cloned()
211 .collect();
212 let total_requests = requests.len();
213 debug!(
214 "Starting benchmark with {} requests at concurrency {}",
215 total_requests, concurrency
216 );
217
218 let start_time = Instant::now();
219 let stats = shared_stats.clone();
220 let process_requests = async {
221 #[derive(Debug)]
222 enum BenchmarkError {
223 Other(anyhow::Error),
224 }
225
226 impl From<anyhow::Error> for BenchmarkError {
227 fn from(e: anyhow::Error) -> Self {
228 BenchmarkError::Other(e)
229 }
230 }
231
232 let result = futures::stream::iter(requests.into_iter())
233 .try_for_each_spawned(concurrency, |mut request_line| {
234 let client = client.clone();
235 let endpoint = endpoint.clone();
236 let pagination_state = pagination_state.clone();
237 let task_stats = stats.clone();
238 let params = request_line
239 .body_json
240 .get("params")
241 .and_then(|v| v.as_array())
242 .map(|a| a.to_vec())
243 .unwrap_or_else(|| {
244 debug!("No params found for method: {}, using empty array", request_line.method);
246 Vec::new()
247 });
248 async move {
249 if let Some(cursor_idx) = PaginationCursorState::get_method_cursor_index(&request_line.method)
251 && !params.is_empty() {
252 let method_key = match PaginationCursorState::get_method_key(&request_line.method, ¶ms) {
253 Ok(key) => key,
254 Err(e) => return Err(BenchmarkError::Other(e)),
255 };
256 if let Err(e) = PaginationCursorState::update_params_cursor(
257 &mut request_line.body_json,
258 cursor_idx,
259 pagination_state.get(&method_key).as_ref(),
260 &request_line.method,
261 ) {
262 return Err(BenchmarkError::Other(e));
263 }
264 }
265
266 let now = Instant::now();
267 debug!("Sending request for method: {} body: {:?}", request_line.method, request_line.body_json);
268 let res = client
269 .post(&endpoint)
270 .json(&request_line.body_json)
271 .send()
272 .await;
273 let elapsed_ms = now.elapsed().as_millis() as f64;
274
275 let mut is_error = true;
277 if let Ok(resp) = res {
278 let status = resp.status();
279 let resp_text = match resp.text().await {
280 Ok(text) => text,
281 Err(e) => {
282 return Err(BenchmarkError::Other(anyhow::anyhow!(
283 "Failed to get response text for method {}: {}",
284 request_line.method, e
285 )));
286 }
287 };
288
289 let response = if resp_text.len() > 1000 {
291 format!("{}...", &resp_text[..997])
292 } else {
293 resp_text.to_string()
294 };
295 debug!(
296 "Response for method: {}, request body: {}, status: {}, response body: {}",
297 request_line.method, request_line.body_json, status, response
298 );
299 if status.is_success() {
300 let supports_pagination = PaginationCursorState::get_method_cursor_index(&request_line.method).is_some();
301 if supports_pagination {
302 #[derive(Debug, Deserialize)]
303 struct Body {
304 result: Result,
305 }
306 #[derive(Debug, Deserialize)]
307 #[serde(rename_all = "camelCase")]
308 struct Result {
309 has_next_page: bool,
310 next_cursor: Option<Value>,
311 }
312
313
314 let parse_result = serde_json::from_str::<Body>(&resp_text);
315 debug!("Parsed response for method: {}, result: {:?}", request_line.method, parse_result);
316 if let Ok(Body { result }) = parse_result {
317 let method_key = match PaginationCursorState::get_method_key(
318 &request_line.method,
319 ¶ms,
320 ) {
321 Ok(key) => key,
322 Err(e) => return Err(BenchmarkError::Other(e)),
323 };
324 if result.has_next_page {
325 debug!("Updated pagination cursor for method: {}, has_next_page: true",
326 request_line.method);
327 pagination_state.update(method_key, result.next_cursor);
328 } else {
329 pagination_state.update(method_key, None);
330 }
331 is_error = false;
332 } else {
333 warn!(
334 method = request_line.method,
335 body = ?request_line.body_json,
336 error = ?parse_result.err(),
337 response = resp_text,
338 "Response received but JSON parsing failed"
339 );
340 }
341 } else {
342 is_error = false;
343 }
344 } else {
345 warn!(
346 method = request_line.method,
347 status = ?status,
348 body = ?request_line.body_json,
349 response = resp_text,
350 "Response received but status is not success"
351 );
352 }
353 } else {
354 warn!(
355 method = request_line.method,
356 body = ?request_line.body_json,
357 error = ?res,
358 "Failed to get response"
359 );
360 }
361
362 let mut stats = task_stats
363 .lock()
364 .expect("Thread holding stats lock panicked");
365 stats.record_request(&request_line.method, elapsed_ms, is_error);
366 Ok::<(), BenchmarkError>(())
367 }
368 })
369 .await;
370
371 match result {
372 Ok(()) => Ok(()),
373 Err(BenchmarkError::Other(e)) => Err(e),
374 }
375 };
376
377 if let Some(duration_val) = duration {
378 match timeout(duration_val, process_requests).await {
379 Ok(result) => result?,
380 Err(_) => debug!("Benchmark timed out after reaching duration limit"),
381 }
382 } else {
383 process_requests.await?;
384 }
385
386 let elapsed = start_time.elapsed();
387 let final_stats = shared_stats
388 .lock()
389 .expect("Thread holding stats lock panicked")
390 .clone();
391 info!(
392 "Benchmark completed in {:?}. Total requests: {}, errors: {}, avg latency: {:.2}ms",
393 elapsed,
394 final_stats.total_sent,
395 final_stats.total_errors,
396 if final_stats.total_sent > 0 {
397 final_stats.total_latency_ms / final_stats.total_sent as f64
398 } else {
399 0.0
400 }
401 );
402
403 Ok(final_stats)
404}