sui_rpc_benchmark/direct/
query_executor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/// This module executes enriched benchmark queries against the database.
5/// Each query's execution is timed and recorded via MetricsCollector.
6/// And the results are aggregated and reported via BenchmarkResult.
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10use bb8::Pool;
11use bb8_postgres::PostgresConnectionManager;
12use rand::SeedableRng;
13use rand::seq::SliceRandom;
14use sui_indexer_alt_framework::task::TrySpawnStreamExt;
15use tokio_postgres::{NoTls, types::ToSql};
16use tracing::info;
17use url::Url;
18
19use crate::config::BenchmarkConfig;
20use crate::direct::metrics::{BenchmarkResult, MetricsCollector};
21use crate::direct::query_enricher::{EnrichedBenchmarkQuery, SqlValue};
22
23pub struct QueryExecutor {
24    pool: Pool<PostgresConnectionManager<NoTls>>,
25    enriched_queries: Vec<EnrichedBenchmarkQuery>,
26    config: BenchmarkConfig,
27    metrics: MetricsCollector,
28}
29
30impl QueryExecutor {
31    pub async fn new(
32        db_url: &Url,
33        enriched_queries: Vec<EnrichedBenchmarkQuery>,
34        config: BenchmarkConfig,
35    ) -> Result<Self> {
36        let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
37        let pool = Pool::builder().build(manager).await?;
38
39        Ok(Self {
40            pool,
41            enriched_queries,
42            config,
43            metrics: MetricsCollector::default(),
44        })
45    }
46
47    async fn worker_task(
48        pool: Pool<PostgresConnectionManager<NoTls>>,
49        enriched_queries: Vec<EnrichedBenchmarkQuery>,
50        metrics: MetricsCollector,
51        deadline: Option<Instant>,
52    ) -> Result<()> {
53        let client = pool.get().await?;
54        let mut rng = rand::rngs::StdRng::from_entropy();
55        while deadline.is_none_or(|d| Instant::now() < d) {
56            let enriched = enriched_queries
57                .choose(&mut rng)
58                .context("No queries available")?;
59            let Some(row) = enriched.rows.choose(&mut rng) else {
60                // skip when the table is empty and thus no values to sample.
61                continue;
62            };
63
64            let params: Vec<Box<dyn ToSql + Sync + Send>> = row
65                .iter()
66                .map(|val| match val {
67                    SqlValue::Text(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
68                    SqlValue::Int4(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
69                    SqlValue::Int8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
70                    SqlValue::Float8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
71                    SqlValue::Bool(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
72                    SqlValue::Int2(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
73                    SqlValue::Bytea(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
74                })
75                .collect();
76            let param_refs: Vec<&(dyn ToSql + Sync)> = params
77                .iter()
78                .map(|p| p.as_ref() as &(dyn ToSql + Sync))
79                .collect();
80
81            let query_str = enriched.query.query_template.clone();
82
83            let start = Instant::now();
84            let result = client.query(&query_str, &param_refs[..]).await;
85
86            metrics.record_query(enriched.query.clone(), start.elapsed(), result.is_err());
87        }
88        Ok(())
89    }
90
91    pub async fn run(&self) -> Result<BenchmarkResult> {
92        info!(
93            "Running benchmark with {} concurrent clients",
94            self.config.concurrency
95        );
96
97        let start = Instant::now();
98        let deadline = self.config.duration.map(|duration| start + duration);
99        futures::stream::iter(self.enriched_queries.clone())
100            .try_for_each_spawned(self.config.concurrency, |query| {
101                QueryExecutor::worker_task(
102                    self.pool.clone(),
103                    vec![query],
104                    self.metrics.clone(),
105                    deadline,
106                )
107            })
108            .await?;
109
110        Ok(self.metrics.generate_report())
111    }
112}