sui_rpc_benchmark/direct/
query_executor.rs1use std::time::Instant;
8
9use anyhow::{Context, Result};
10use bb8::Pool;
11use bb8_postgres::PostgresConnectionManager;
12use rand::SeedableRng;
13use rand::seq::SliceRandom;
14use sui_indexer_alt_framework::task::TrySpawnStreamExt;
15use tokio_postgres::{NoTls, types::ToSql};
16use tracing::info;
17use url::Url;
18
19use crate::config::BenchmarkConfig;
20use crate::direct::metrics::{BenchmarkResult, MetricsCollector};
21use crate::direct::query_enricher::{EnrichedBenchmarkQuery, SqlValue};
22
23pub struct QueryExecutor {
24 pool: Pool<PostgresConnectionManager<NoTls>>,
25 enriched_queries: Vec<EnrichedBenchmarkQuery>,
26 config: BenchmarkConfig,
27 metrics: MetricsCollector,
28}
29
30impl QueryExecutor {
31 pub async fn new(
32 db_url: &Url,
33 enriched_queries: Vec<EnrichedBenchmarkQuery>,
34 config: BenchmarkConfig,
35 ) -> Result<Self> {
36 let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
37 let pool = Pool::builder().build(manager).await?;
38
39 Ok(Self {
40 pool,
41 enriched_queries,
42 config,
43 metrics: MetricsCollector::default(),
44 })
45 }
46
47 async fn worker_task(
48 pool: Pool<PostgresConnectionManager<NoTls>>,
49 enriched_queries: Vec<EnrichedBenchmarkQuery>,
50 metrics: MetricsCollector,
51 deadline: Option<Instant>,
52 ) -> Result<()> {
53 let client = pool.get().await?;
54 let mut rng = rand::rngs::StdRng::from_entropy();
55 while deadline.is_none_or(|d| Instant::now() < d) {
56 let enriched = enriched_queries
57 .choose(&mut rng)
58 .context("No queries available")?;
59 let Some(row) = enriched.rows.choose(&mut rng) else {
60 continue;
62 };
63
64 let params: Vec<Box<dyn ToSql + Sync + Send>> = row
65 .iter()
66 .map(|val| match val {
67 SqlValue::Text(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
68 SqlValue::Int4(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
69 SqlValue::Int8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
70 SqlValue::Float8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
71 SqlValue::Bool(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
72 SqlValue::Int2(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
73 SqlValue::Bytea(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
74 })
75 .collect();
76 let param_refs: Vec<&(dyn ToSql + Sync)> = params
77 .iter()
78 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
79 .collect();
80
81 let query_str = enriched.query.query_template.clone();
82
83 let start = Instant::now();
84 let result = client.query(&query_str, ¶m_refs[..]).await;
85
86 metrics.record_query(enriched.query.clone(), start.elapsed(), result.is_err());
87 }
88 Ok(())
89 }
90
91 pub async fn run(&self) -> Result<BenchmarkResult> {
92 info!(
93 "Running benchmark with {} concurrent clients",
94 self.config.concurrency
95 );
96
97 let start = Instant::now();
98 let deadline = self.config.duration.map(|duration| start + duration);
99 futures::stream::iter(self.enriched_queries.clone())
100 .try_for_each_spawned(self.config.concurrency, |query| {
101 QueryExecutor::worker_task(
102 self.pool.clone(),
103 vec![query],
104 self.metrics.clone(),
105 deadline,
106 )
107 })
108 .await?;
109
110 Ok(self.metrics.generate_report())
111 }
112}