sui_rpc_benchmark/direct/
query_enricher.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/// This module enriches query templates with real data from the database.
5/// This enrichment ensures that when we run the benchmark:
6/// - We use realistic data values that actually exist in the database:
7/// - We have a pool of valid values to randomly select from during execution.
8use anyhow::Result;
9use bb8::Pool;
10use bb8_postgres::PostgresConnectionManager;
11use parking_lot::Mutex;
12use std::sync::Arc;
13use sui_indexer_alt_framework::task::TrySpawnStreamExt;
14use tokio_postgres::{NoTls, Row, types::Type};
15use tracing::warn;
16use url::Url;
17
18use crate::direct::query_template_generator::QueryTemplate;
19
20#[derive(Clone, Debug)]
21pub enum SqlValue {
22    Text(Option<String>),
23    Int4(Option<i32>),
24    Int8(Option<i64>),
25    Float8(Option<f64>),
26    Bool(Option<bool>),
27    Int2(Option<i16>),
28    Bytea(Option<Vec<u8>>),
29}
30
31#[derive(Debug, Clone)]
32pub struct EnrichedBenchmarkQuery {
33    pub query: QueryTemplate,
34    pub rows: Vec<Vec<SqlValue>>,
35    pub types: Vec<Type>,
36}
37
38pub struct QueryEnricher {
39    pool: Pool<PostgresConnectionManager<NoTls>>,
40}
41
42impl QueryEnricher {
43    pub async fn new(db_url: &Url) -> Result<Self> {
44        let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
45        let pool = Pool::builder().build(manager).await?;
46        Ok(Self { pool })
47    }
48
49    fn row_to_values(row: &Row) -> Vec<SqlValue> {
50        (0..row.len())
51            .map(|i| match row.columns()[i].type_() {
52                &Type::TEXT | &Type::VARCHAR => SqlValue::Text(row.get(i)),
53                &Type::INT4 => SqlValue::Int4(row.get(i)),
54                &Type::INT8 => SqlValue::Int8(row.get(i)),
55                &Type::FLOAT8 => SqlValue::Float8(row.get(i)),
56                &Type::BOOL => SqlValue::Bool(row.get(i)),
57                &Type::INT2 => SqlValue::Int2(row.get(i)),
58                &Type::BYTEA => SqlValue::Bytea(row.get(i)),
59                ty => panic!("Unsupported type: {:?}", ty),
60            })
61            .collect()
62    }
63
64    pub async fn enrich_queries(
65        &self,
66        queries: Vec<QueryTemplate>,
67    ) -> Result<Vec<EnrichedBenchmarkQuery>> {
68        let enriched_queries = std::sync::Arc::new(Mutex::new(Vec::new()));
69        let pool = self.pool.clone();
70        let enriched_queries_clone = enriched_queries.clone();
71
72        futures::stream::iter(queries)
73            .try_for_each_spawned(10, move |query| {
74                let pool = pool.clone();
75                let enriched_queries = enriched_queries_clone.clone();
76                async move {
77                    let client = pool.get().await?;
78                    let sql = format!(
79                        "SELECT {} FROM {} WHERE {} IS NOT NULL LIMIT 1000",
80                        query.needed_columns.join(", "),
81                        query.table_name,
82                        query.needed_columns[0]
83                    );
84
85                    let rows = client.query(&sql, &[]).await?;
86                    let Some(first_row) = rows.first() else {
87                        warn!(
88                            table = query.table_name,
89                            "No sample data found for query on table, table is empty."
90                        );
91                        let enriched = EnrichedBenchmarkQuery {
92                            query: query.clone(),
93                            rows: Vec::new(),
94                            types: query.needed_columns.iter().map(|_| Type::TEXT).collect(), // default type
95                        };
96                        enriched_queries.lock().push(enriched);
97                        return Ok::<(), anyhow::Error>(());
98                    };
99                    let types = first_row
100                        .columns()
101                        .iter()
102                        .map(|c| c.type_().clone())
103                        .collect();
104                    let raw_rows = rows.iter().map(Self::row_to_values).collect();
105
106                    let enriched = EnrichedBenchmarkQuery {
107                        query: query.clone(),
108                        rows: raw_rows,
109                        types,
110                    };
111                    enriched_queries.lock().push(enriched);
112                    Ok::<(), anyhow::Error>(())
113                }
114            })
115            .await?;
116
117        Ok(Arc::try_unwrap(enriched_queries)
118            .map_err(|_| anyhow::anyhow!("Failed to try_unwrap Arc"))?
119            .into_inner())
120    }
121}