sui_rpc_benchmark/direct/
query_executor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This module executes enriched benchmark queries against the database.
/// Each query's execution is timed and recorded via MetricsCollector.
/// And the results are aggregated and reported via BenchmarkResult.
use std::time::Instant;

use anyhow::{Context, Result};
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use sui_indexer_alt_framework::task::TrySpawnStreamExt;
use tokio_postgres::{types::ToSql, NoTls};
use tracing::info;
use url::Url;

use crate::config::BenchmarkConfig;
use crate::direct::metrics::{BenchmarkResult, MetricsCollector};
use crate::direct::query_enricher::{EnrichedBenchmarkQuery, SqlValue};

pub struct QueryExecutor {
    pool: Pool<PostgresConnectionManager<NoTls>>,
    enriched_queries: Vec<EnrichedBenchmarkQuery>,
    config: BenchmarkConfig,
    metrics: MetricsCollector,
}

impl QueryExecutor {
    pub async fn new(
        db_url: &Url,
        enriched_queries: Vec<EnrichedBenchmarkQuery>,
        config: BenchmarkConfig,
    ) -> Result<Self> {
        let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
        let pool = Pool::builder().build(manager).await?;

        Ok(Self {
            pool,
            enriched_queries,
            config,
            metrics: MetricsCollector::default(),
        })
    }

    async fn worker_task(
        pool: Pool<PostgresConnectionManager<NoTls>>,
        enriched_queries: Vec<EnrichedBenchmarkQuery>,
        metrics: MetricsCollector,
        deadline: Option<Instant>,
    ) -> Result<()> {
        let client = pool.get().await?;
        let mut rng = rand::rngs::StdRng::from_entropy();
        while deadline.is_none_or(|d| Instant::now() < d) {
            let enriched = enriched_queries
                .choose(&mut rng)
                .context("No queries available")?;
            let Some(row) = enriched.rows.choose(&mut rng) else {
                // skip when the table is empty and thus no values to sample.
                continue;
            };

            let params: Vec<Box<dyn ToSql + Sync + Send>> = row
                .iter()
                .map(|val| match val {
                    SqlValue::Text(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Int4(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Int8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Float8(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Bool(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Int2(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                    SqlValue::Bytea(v) => Box::new(v) as Box<dyn ToSql + Sync + Send>,
                })
                .collect();
            let param_refs: Vec<&(dyn ToSql + Sync)> = params
                .iter()
                .map(|p| p.as_ref() as &(dyn ToSql + Sync))
                .collect();

            let query_str = enriched.query.query_template.clone();

            let start = Instant::now();
            let result = client.query(&query_str, &param_refs[..]).await;

            metrics.record_query(enriched.query.clone(), start.elapsed(), result.is_err());
        }
        Ok(())
    }

    pub async fn run(&self) -> Result<BenchmarkResult> {
        info!(
            "Running benchmark with {} concurrent clients",
            self.config.concurrency
        );

        let start = Instant::now();
        let deadline = self.config.duration.map(|duration| start + duration);
        futures::stream::iter(self.enriched_queries.clone())
            .try_for_each_spawned(self.config.concurrency, |query| {
                QueryExecutor::worker_task(
                    self.pool.clone(),
                    vec![query],
                    self.metrics.clone(),
                    deadline,
                )
            })
            .await?;

        Ok(self.metrics.generate_report())
    }
}